Optimizing Pinterest’s Data Ingestion Stack: Findings and Learnings

Pinterest Engineering
9 min readApr 28, 2022

--

Ping-Min Lin | Software Engineer, Logging Platform

Antique chalkboard showing formulas written in chalk https://unsplash.com/photos/5mZ_M06Fc9g?utm_source=unsplash&utm_medium=referral&utm_content=creditShareLink

At Pinterest, the Logging Platform team maintains the backbone of data ingestion infrastructure that ingests terabytes of data per day. When building the services powering these pipelines, it is extremely important that we build efficient systems considering how widespread and deep in the stack the systems are. Along our journey of continuous improvement, we’ve figured out basic but useful patterns and learnings that could be applied in general — and hopefully for you as well.

MemQ: Achieving memory-efficient batch data delivery using Netty

MemQ is the next-gen data ingestion platform built in-house and recently open-sourced by the Logging Platform team. When designing the service, we tried hard to maximize the efficiency of our resources, specifically we focused on reducing GC by using off-heap memory. Netty was chosen as our low-level networking framework due to its great balance between flexibility, performance and sophisticated out-of-the-box features. For example, we used ByteBuf heavily throughout the project. ByteBufs are the building blocks of data within Netty. They are similar to Java NIO ByteBuffers, but allow the developers much more control of the lifecycle of the objects by providing a “smart pointer” approach for customized memory management using manual reference counting. By using ByteBufs, we managed to transport messages with a single copy of data by passing off-heap network buffer pointers, further reducing cycles used on garbage collection.

Client write path starts from the client producer delivering byte array messages, getting decoded by Netty into Message instances and handling it with the PacketHandler, finally making it into the Batches within TopicProcessor. Each Batch will then be aggregated by BatchManager and once time or size thresholds have been reached, the BatchManager will upload the batch via StorageHandler to the storage destination (e.g. S3, FileSystem).

The typical journey of a message in the MemQ broker: Each message received from the network will be reconstructed via a length-encoded protocol that will be allocated into a ByteBuf that is off of the JVM heap (direct memory in Netty terms), and will be the only existing copy of the payload throughout the whole pipeline. This ByteBuf reference will be passed into the topic processor and put into a Batch along with other messages that are also waiting to be uploaded to the storage destination. Once the upload constraints are met, either due to the time threshold or the size threshold, the Batch will be dispatched. In the case of uploading to a remote object store like S3, the whole batch of messages will be kept in a CompositeByteBuf (which is a virtual wrapper ByteBuf consisting of multiple ByteBufs) and uploaded to the destination using the netty-reactor library, allowing us to create no additional copies of data within the processing path. By building on top of ByteBufs and other Netty constructs, we were able to iterate rapidly without sacrificing performance and avoid reinventing the wheel.

Singer: Leveraging asynchronous processing to reduce thread overheads

Singer has been around at Pinterest for a long time, reliably delivering messages to PubSub backends. With more and more use cases onboarded to Singer, we’ve started to hit bottlenecks on memory usage that led to frequent OOM issues and incidents. Singer has memory and CPU resources constrained on nearly all fleets at Pinterest to avoid impact on the host service e.g. our API serving layer. After inspecting the code and leveraging debugging tools such as VisualVM, Native Memory Tracking (NMT), and pmap, we noticed various potential improvements to be done, most notably reducing the number of threads. After performing NMT result analysis we noticed the number of threads and the memory used by the stack as a result of these threads (allocated due to the Singer executor and producer thread pools).

Taking a deeper look into the source of these threads, the majority of these threads come from the thread pools for each Kafka cluster Singer publishes to. The threads in these thread pools are used to wait for Kafka to complete writing messages to a partition and then report the status of the writes. While the threads do the job, each thread in the JVM (by default) will allocate 1MB of memory used for the thread’s stack.

The Java Heap entry indicates JVM-managed heap usage. The Thread entry represents the thread stack. Arena contains the off-heap/direct memory portion managed outside of the JVM heap.

A Singer NMT report showing the different memory regions a JVM process allocates. The Thread entry represents the thread stack. Arena contains the off-heap/direct memory portion managed outside of the JVM heap.

Even with lazy allocation of the stack memory on the underlying operating systems until the thread is actually used, this still quickly adds up to hundreds of MBs of the process’ memory. When there are a lot of log streams publishing to multiple partitions on different clusters, the memory used by thread stacks can be easily comparable to the 800MB default heap size of Singer and eats into the resources of the application.

Table with data: // each partition is a bucket List<Future<KafkaWritingTaskResult>> bucketFutures = new ArrayList<>(); for (KafkaWritingTaskFuture f : committableBuckets.values()) { Future<KafkaWritingTaskResult> bucketFuture = clusterThreadPool.submit(() -> { List<RecordMetadata> recordMetadataList = new ArrayList<>(); for (Future<RecordMetadata> future : task.getRecordMetadataList()) { // We will get TimeoutException if the wait timed out RecordMetadata r

Each submission of KafkaWriteTask will occupy a thread. Full code can be found here

By closely examining the usage of these threads, it quickly becomes clear that most of these threads are doing non-blocking operations such as updating metrics, and are perfectly suitable for asynchronous processing using CompletableFutures provided starting in Java 8. The CompletableFuture allows us to resolve the blocking calls by chaining stages asynchronously, thus replacing the usage of these threads that had to wait until the results to come back from Kafka. By utilizing the callback in the KafkaProducer.send(record, callback) method, we rely on the Kafka producer’s network client to be completely in control of the multiplexing of networking.

Table with data: // each partition is a bucket List<Future<KafkaWritingTaskResult>> bucketFutures = new ArrayList<>(); for (KafkaWritingTaskFuture f : committableBuckets.values()) { Future<KafkaWritingTaskResult> bucketFuture = clusterThreadPool.submit(() -> { List<RecordMetadata> recordMetadataList = new ArrayList<>(); for (Future<RecordMetadata> future : task.getRecordMetadataList()) { // We will get TimeoutException if the wait timed out RecordMetadata r

A brief example of the result code after using CompletableFutures. Full code can be found here

Once we convert the original logic into several chained non-blocking stages, it becomes obvious to use a single common thread pool to handle them regardless of the logstream, so we use the common ForkJoinPool that is already at our disposal from JVM. This dramatically reduces the thread usage for Singer, from a couple hundred threads to virtually no additional threads. This improvement demonstrates the power of asynchronous processing and how network-bound applications can benefit from it.

Kafka and Singer: Balancing performance and efficiency with controllable variance

Operating our Kafka clusters has always been a delicate balance between performance, fault tolerance, and efficiency. Our logging agent Singer, at the front line of publishing messages to Kafka, is a crucial component that plays a heavy role in these factors, especially in routing the traffic by deciding which partitions we deliver data to for a topic.

The Default Partitioner: Evenly Distributed Traffic

In Singer, logs from a machine would be picked up and routed to the corresponding topic it belongs to and published to that topic in Kafka. In the early days, Singer would publish uniformly to all the partitions that topic has in a round-robin fashion using our default partitioner. For example, if there were 3000 messages on a particular host that needed to be published to a 30 partition topic, each partition would roughly receive 100 messages. This worked pretty well for most of the use cases and has a nice benefit where all partitions receive the same amount of messages, which is great for the consumers of these topics since the workload is evenly distributed amongst them.

3 Producers and 3 Partitions are fully connected: each of the 3 producer writes to all 3 partitions

DefaultPartitioner: Producers and Partitions are fully connected

The Single Partition Partitioner: In Favor of the Law of Large Numbers

Each of the 3 producers connects exactly to one of the other partition with no partition getting connected to the same producer.

SinglePartitionPartitioner: Ideal scenario where connections are evenly distributed

As Pinterest grew, we had fleets expanding to thousands of hosts, and this evenly-distributed approach started to cause some issues to our Kafka brokers: high connections counts and large amounts of produce requests started to elevate the brokers’ CPU usage, and spreading out the messages means that the batch sizes are smaller for each partition, or lower efficiency of the compression, resulting in higher aggregated network traffic. To tackle this, we implemented a new partitioner: the SinglePartitionPartitioner. This partitioner solves the issue by forcing Singer to only write to one random partition per topic per host, reducing the fanout from all brokers to a single broker. This partition remains the same throughout the producer’s lifetime until Singer restarts.

For pipelines that had a large producer fleet and relatively uniform message rates across hosts, this was extremely effective: The law of large numbers worked in our favor, and statistically if the number of producers is significantly larger than partitions, each partition will still receive a similar amount of traffic. Connection count went down from (number of brokers serving the topic) times (number of producers) to only (number of producers), which could be up to a hundred times less for larger topics. Meanwhile, batching up all messages per producer to a single partition improved compression ratios by at least 10% in most use cases.

3 producers each connecting to one of the 3 partitions, but one partition has 2 producers connected and one partition has no producers connected to it.

SinglePartitionPartitioner: Skewed scenario where there are too few producers vs. partitions

The Fixed Partitions Partitioner: Configurable variance for adjusting trade-offs

Despite coming up with this new solution, there were still some pipelines that lie in the middle ground where both solutions are subpar, such as when the number of producers is not large enough to outnumber the number of partitions. In this case, the SinglePartitionPartitioner would introduce significant skew between partitions: some partitions will have multiple producers writing to them, and some are assigned very few or even no producers. This skew could cause unbalanced workloads for the downstream consumers, and also increases the burden for our team to manage the cluster, especially when storage is tight. We thus recently introduced a new partitioner that can be used on these cases, and even cover the original use cases: the FixedPartitionsPartitioner, which basically allows us to not only publish to one fixed partition like the SinglePartitionPartitioner, but randomly across a fixed number of partitions.

This approach is somewhat similar to the concept of virtual nodes in consistent hashing, where we artificially create more “effective producers” to achieve a more continuous distribution. Since the number of partitions for each host can be configured, we can tune it to the sweet spot where the efficiency and performance are both at desired levels. This partitioner could also help with “hot producers” by spreading traffic out while still maintaining a reasonable connection count. Although a simple concept, it turns out that having the ability to configure the degree of variance could be a powerful tool to manage trade-offs.

In this case, each producer connects to 2 partitions randomly. Although there are some skews, the number of partitions each producer is connected with has less variance while also having less connections compared to a fully connected scenario.

FixedPartitionsPartitioner: Less skew while still keeping connection count lower than the default

SinglePartitionPartitioner has the highest compression ratio but highest skew between partitions. As the number of fixed partitions go up, the compression ratio and skew both gradually decrease, with the default partitioner having the worst compression ratio, but also the most uniform workload.

Relative compression ratio and request rate skew with different number of fixed partitions on a 120 partition topic on 30 brokers

Conclusion and Acknowledgements

These learnings are just a few examples of improvements the Logging Platform team has been making. Despite their seemingly different nature, the ultimate goal of all these improvements was to achieve better results for our team and our customers. We hope that these findings are inspiring and could spark a few ideas for you.

None of the content in this article could have been delivered without the in-depth discussions and candid feedback from Ambud Sharma, Eric Lopez, Henry Cai, Jeff Xiang and Vahid Hashemian on the Logging Platform team. We also deeply appreciate the great support from external teams that provided support and input on the various improvements we’ve been working on. As we strive for continuous improvement within our architecture, we hope we will be able to share more interesting findings in our pursuit of perfecting our system.

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog, and visit our Pinterest Labs site. To view and apply to open opportunities, visit our Careers page.

--

--