A Glimpse into the Redesigned Goku-Ingestor vNext at Pinterest
Better performance, lower cost and less code complexity
Xiao Li, Kapil Bajaj, Monil Mukesh Sanghavi and Zhenxiao Luo
In the dynamic arena of real-time analytics, the need for precision and speed is non-negotiable. Pinterest’s real-time metrics asynchronous data processing pipeline, powering Pinterest’s time series database Goku, stood at the crossroads of opportunity. The mission was clear: identify bottlenecks, innovate relentlessly, and propel our real-time analytics processing capabilities into an era of unparalleled efficiency.
The Goku-Ingestor is an asynchronous data processing pipeline that performs multiplexing of metrics data. It performs data validation, denylist processing, sharding, deserializing multiple metrics formats, and serializing the data into a customized Time Series Database (TSDB) format that can be used by downstream storage engine: Goku.
Goku-Ingestor has been running and evolving for close to a decade. It did the work fairly well despite some caveats that became pain points for the real-time analytics platform.
High Fleet Cost for Perceived Throughput
We measure the throughput of Goku-Ingestor using data points per min. Generally Goku-Ingestor has a throughput of 2.5 billion — 5 billion data points per minute. To achieve the throughput, Goku-Ingestor uses thousands of memory optimized EC2 instances and incurs a higher infra cost, despite per host throughput is less than 0.5 mbps.
In the initial months of 2023, certain problems arose as a result of Goku-Ingestor’s performance, leading to some instances where data loss occurred within the metrics system for a brief duration of time. This posed challenges for the real-time analytics team and observability team, forcing us to grapple with the situation. Moreover, it necessitated a response from our internal customers who had to deal with the absence of data or potentially misleading alerts..
Identifying the Problem
The approach began with a comprehensive analysis of past incidents of missing metrics data. A high number of them were related to extremely high garbage collection (GC) overhead. Average GC pauses in some high volume clusters can be as high as 10 seconds per minute. When there is a full GC, it leads to full halt to the data processing pipeline and causes both back-pressure for upstream kafka clusters and cascading failure for downstream TSDB.
After conducting a meticulous and thorough architecture review, several crucial design concerns regarding the utilization of the concurrency model came to light. Specifically, a notable issue was the recurring creation of new threads for every batch of raw metrics awaiting processing. The data point life cycle consisted of multiple stages, including reading, de-serialization, processing, validation, and denylist checking, followed by the reconstruction of TSDB format data points, serialization, and writing. Regrettably, this practice resulted in the production of numerous duplications of the data point at each step. Subsequently, we realized through memory profiling that certain efforts were redundant and relied on costly APIs.
Ideally, a data point should have a short lifespan in memory as they are typically single-use objects. However, due to the architecture’s excessive creation of objects, frequent young garbage collection (GC) pauses occur. To assess the frequency of these GC pauses, we measure the time interval between each young collection.
Pyoung = Seden / Ralloc
where Pyoung is the period between young GC, Seden is the size of Eden and Ralloc is the rate of memory allocations (bytes per second).
In order to maximize throughput, a TSDB data processing pipeline aims to optimize its performance. The metrics reader component of the service fetches large amounts of metrics starting from the last Kafka offset. However, the heavy memory allocation by the worker component leads to a significant increase in Ralloc resulting in frequent short pauses Pyoung. These pauses occur due to the stop-the-world (STW) nature of young generation GC. Although previous attempts to address this issue involved the introduction of G1 garbage collection, certain challenges still remain unresolved:
- G1GC still relies on STW pause to do the heavy lifting work.
- The nature of large objects creation repetitively (queue of new objects) can cause fragmentation.
- G1 employs a larger number of shorter pauses instead of a fewer number of longer JVM freezes, so the cumulative GC pauses can actually be higher.
The accumulative memory heavy operation leads to short GC intervals and causes constant interruptions of the service. The result is that the Goku-ingestor is always busy waiting for the garbage collector.
While there are multiple concerns with the design, the predominant issue remains the inefficient utilization of memory, leading to frequent occurrences of high latency, data freshness problems, backpressure, and cascading failures. Even with a slight increase in traffic, it becomes necessary to expand the fleet’s size in order to accommodate the added memory and GC overhead.
Goku-Ingestor vNext Architecture
The goal of Goku-Ingestor vNext architecture design is to achieve the highest throughput and least amount of GC pauses with the minimal amount of hosts.
For qualitative analysis purposes, assume THfleet is the throughput of fleet and n is the number of hosts. The accumulative throughput of fleet is:
Ndp is the total number of DPs processed on host j assuming it is a constant per host. Tservice(j) is the time used for the service to process Ndp(j) data points on host j.
To further analyze time used by service Tservice on a single host j, we have
where Ndp(j) is the number of data points processed by host j. Tmetric is the time used to process a metric, Tdp is the time used to process a data point, and i=1mTtemp(i) is the cumulative time used to process m intermediate temporary data structures. OHgc is the average overhead of garbage collection pauses per object. Kreader, Kwriterand Kworker denotes number of actors (reader, writer, or worker).
The goal for improving the data processing pipeline is to reduce the overhead of service Tservice. It is obvious that the logical solution is to reduce the number of temporary object m. Meanwhile, ideally, Kreader, Kwriterand Kworker should be adjustable to fine tuning the throughput.
To achieve this, Goku-Ingestor vNext architecture aims to minimize the resources created and copied to achieve higher throughput with minimized GC overhead. A 3 thread pool executor model is used and only 2 shared resources were created. All runnables are scheduled to run at different fixed intervals to remove repetitive runnable objects creation, a concurrent ConsumerRecords queue to temporarily hold polled results. Objects are removed immediately after being dequeued by a worker thread. The worker thread performs heavy lifting work that is CPU intensive: deserialization, validation, denylist check, construct data points, sorting, and partitioning. Once the data points are sorted and partitioned, they are deposited into a concurrent buffer waiting to be sent by writers threads.
By conducting additional memory profiling, we were able to identify the most expensive APIs in the service. One API that stood out was the string.split function, which was frequently used due to its convenience in parsing raw metric string objects into dimensional data compatible with the TSDB.
The string.split function played a crucial role in transforming the raw metric strings into structured dimensional data by breaking them apart based on delimiters. However, during the profiling process, it became evident that the execution of this function was a significant contributor to memory utilization and performance bottlenecks.
As a result, we made algorithmic improvements to eliminate the use of the expensive API, resulting in impressive reductions in GC pause times. These changes optimized the processing of raw metric strings and improved overall system performance and stability.
As shown in the figure above, Goku-Ingestor vNext running in dev cluster shadows m10n cluster running in legacy ingestion pipeline. Thanks to the removal of String::split, we further reduced GC pause overhead by 50% from API optimization.
After implementing Goku-Ingestor vNext, the impact was truly remarkable. The redesigned system not only achieved comparable throughput for clusters experiencing heavy traffic loads but also resulted in a significant reduction in required EC2 instances. In fact, the reduction was staggering, with 50% to 65% fewer instances needed.
One notable improvement was seen in the garbage collection cycle pause time, which is known to cause performance degradation. With Goku-Ingestor vNext, the pause time was slashed down to just 10% to 25% of its original duration. This reduction in pause time further contributed to the overall performance enhancements delivered by the system.
Overall, the transformative effects of Goku-Ingestor vNext cannot be overstated. It not only improved system throughput but also saved on resources by reducing the number of EC2 instances required. Additionally, the optimization of the garbage collection cycle played a vital role in minimizing performance degradation.
Result with 50% Reduction of Hosts
Result with 67% Reduction of Hosts
Financial and Reliability Benefits
Beyond the immediate gains in performance, the financial implications were substantial. The redesigned goku-Ingestor resulted in 65% reduction of our annual fleet cost from reduced EC2 instances alone, a testament to the efficiency gains and resource optimization achieved through the overhaul. Simultaneously, the reliability of the entire system saw a drastic improvement, ensuring a more stable and resilient data processing pipeline.
As an ongoing effort to refine the performance profile of goku-Ingestor vNext, future works will delve into further memory optimizations with the aim of reducing string copies. By scrutinizing and fine-tuning memory management strategies, we anticipate achieving additional efficiencies in string handling, which may result in further reductions in both resource consumption and processing overhead.
Thrift Integration for Enhanced Parsing
Leveraging the structured data serialization capabilities of Apache Thrift presents a promising avenue for optimizing the parsing of incoming data. The manual string parsing currently employed can be replaced with a more efficient and streamlined process through the utilization of Thrift structures. This future endeavor aims not only to enhance the overall parsing efficiency but also to provide a more robust and extensible framework for handling diverse data formats.
Coordination and Smart Worker Load Balancing
One observation we have made is that despite using consistent hashing for workload distribution, the traffic workload across partitions is often unbalanced. To address this issue, we are planning to develop a mechanism that allows workers to intelligently identify and prioritize heavy hitters. By identifying the partitions that contribute the most to the overall workload, workers can be allocated accordingly to optimize throughput.
The journey from identifying bottlenecks to implementing a simplified design of the Goku-Ingestor vNext has not only elevated the performance metrics but also exemplified the power of strategic engineering interventions. The success of goku-Ingestor vNext serves as a beacon for those navigating the complexities of data processing pipelines, showcasing that innovation and optimization are key pillars in the pursuit of a robust and cost-effective infrastructure.
We are excited to continue pushing the boundaries of what’s possible in the world of real time analytics data processing at Pinterest.
Ambud Sharma (Sr. Staff Software Engineer | Data Engineering) has provided great insights into the design, kafka limitations and helped with our profiling efforts.
Vahid Hashemian (Staff Software Engineer | Logging Platform) helped tremendously with our testing efforts and KTLO issues.