Lessons from debugging a tricky direct memory leak
Sanchay Javeria | Software Engineer, Ads Data Infrastructure
To support metrics reporting for ads from external advertisers and real-time ad budget calculations at Pinterest, we run streaming pipelines using Apache Flink. These jobs have guaranteed an overall 99th percentile availability to our users; however, every once in a while some tasks get hit with nasty direct out-of-memory (OOM) errors on multiple operators that look something like this:
As is the case with most failures in a distributed system, this often leads to cascading failures elsewhere leaving a trail of red herrings in its wake. Pinterest’s Flink platform supports automatic job retries when task failures exceed a configurable threshold, so due to the infrequency of these exceptions we generally let automatic restarts from the most recent checkpoint handle fault tolerance. However, towards the end of last year, we initiated the consolidation of our clusters and readjusted memory allocation across all our jobs to achieve better resource efficiency. As an unintended consequence, we began receiving pages for these direct memory OOMs early this year, leading to outages and downstream service impact. It became increasingly evident that this issue needed to be looked at. In this post, we will outline our diagnostic process and share insights that can be generalized to debugging any large-scale distributed system where judiciously placed print statements alone are not sufficient to debug issues.
The first piece of the puzzle was to separate the symptoms from the cause. During this incident, we noticed high back pressure on several operators along with task failures accompanied by the stack trace above. At first glance, it also looked like the container ran out of memory while allocating direct memory for network buffers used for channel I/O. This led to our first set of action items — simulating task failures and high back pressure on a dev instance of our job while monitoring its effects on direct memory consumption to establish a causal relationship between the two events.
But first, we needed a band-aid solution to prevent frequent pages to the on-call engineer while we address the underlying root cause of the issue. To do this, it was helpful to revisit Flink’s memory model.
As seen in Figure 1, Flink’s direct memory configuration can be split into three parts — framework off-heap, task off-heap and network memory. Framework off-heap memory is reserved for Flink’s internal operations and data structures. Unsure whether the OOM is caused by an application level memory leak, we increased both task off-heap and network memory from 2G to 5G. This was intentionally generous to buy us enough time to fix the issue.
Simulating back pressure
Since our Flink job has just one output sink operator, simulating back pressure was as easy as adding a long pause in the main thread using a
Thread.sleep(). Since the sink operator won’t process any input records, the output buffers of all upstream operators will fill up quickly causing significant back pressure.
Figure 2 captures the back pressured state of the application after some time across the various operators. This invariably led to direct memory OOMs again on the back pressured nodes which led to repeated task failures.
Simulating task retries
At Pinterest, Flink applications are submitted to YARN’s ResourceManager, which allocates job tasks to containers on machines managed by YARN NodeManagers. To simulate task retries, we shutdown a random sample of containers using
yarn container -signal [container-id] GRACEFUL_SHUTDOWN while monitoring the application’s direct memory consumption.
The graph in Figure 3 illustrates the impact of simulating task failures on direct memory consumption. It shows a noticeable increase in memory consumption precisely at the time of issuing shutdowns. This eventually led to OOM errors, and when a quorum of containers on the same operator was shut down, it caused back pressure on the upstream nodes. The staircase pattern in the graph is particularly intriguing because it is a telltale sign of a memory leak and suggests that somewhere in the code allocated direct memory was not released properly.
To narrow down the scope of the problem, we found whether it was caused by a platform bug or an application logic issue. To do this, we repeated the task retry simulation on a separate application not running our job logic. We aimed to observe if a similar pattern in direct memory consumption would emerge, indicating the possibility of a platform bug.
As seen in Figure 4, we didn’t observe any noticeable spikes in direct memory consumption in a different Flink application. This served as compelling evidence indicating that the memory leak originated from a bug in our application code.
Debugging application code
Our Flink application consists of several thousand lines of code. A helpful approach for debugging such a large codebase is to use the “peel the onion” approach, wherein we break down the code into smaller components with the goal of reproducing the issue. A very simplified view of our application looks something like this:
The first layer reads different topics from Kafka, deserializes them into internal objects and feeds into the second layer which joins the output and performs some transformations. This layer also makes some RPC calls to an external KVStore for downstream services and finally feeds into the third layer, which performs more transformations and outputs the event to Druid. The three layers fence the group of operators that use direct memory, and we can now individually remove certain operators and try to reproduce the issue by manually simulating task retries. This way we can isolate the culprit operator and apply a fix.
Removing Layer 2 and Layer 3 operators
In Figure 5, certain operators in Layer 2 make RPC calls to an external KVStore with a very large payload. We were suspicious that these large objects could result in an OOM error if the Thrift’s
DirectByteBuffer failed to reserve sufficient direct memory for network I/O.
Layer 3 also utilizes off-heap memory to store currency exchange rates for various countries by downloading this information from an external datastore. This computation was previously done on-heap but was putting tremendous pressure on the heap memory. The file storing the exchange rates was periodically downloaded from the datastore, parsed to extract useful information, transformed into a hashmap and finally swapped with the older (immutable) hashmap. The older hashmap was then moved to the heap’s old gen which meant that it wasn’t freed until the next full GC trigger. Due to the data size and full-GC infrequency of the online application, we instead moved to an off-heap solution using ChronicleMap. However, a bug in freeing up this memory could very well result in OOMs overtime. Hence, we started by removing these and simulating arbitrary task retries in the remaining operators while monitoring the impact on direct memory consumption.
As expected, we did not notice any blips in direct memory consumption. We can now narrow down our hunt for the source of the memory leak to the remaining operators.
Removing Layer 3 operators
Next, we removed the Layer 3 operators that utilize
ChronicleMap for application logic and repeated the same exercise of simulating task retries.
As shown in Figure 7, we noticed a minor blip but no definitive staircase pattern to conclude a memory leak in the remaining operators. This was an interesting finding because as opposed to our initial hunch, we could not find evidence of a memory leak in the operators that talk to the external KVStore via RPC calls.
Removing Layer 2 operators
Next, we isolate the Layer 3 operators by removing the Layer 2 operators that also utilize direct memory.
Aha! We’ve managed to reproduce the issue in our leaner application code which looks eerily similar to the pattern observed in Figure 3. This was conclusive evidence that the direct memory leak originated from the application code in Layer 3 utilizing direct memory.
Upon investigating the problematic operator, we found that the reference to the
ChronicleMap was being removed but the associated memory wasn’t freed, resulting in a memory leak. This memory was not released till the next full GC trigger which is especially problematic in online services like ours where rare garbage collection is targeted.
To understand this better, it is prudent to talk about Flink’s task lifecycle and the internal restart mechanism in case of terminations due to task failures. In this case the JVM does not crash, rather Flink execution jumps to the
close()method on the affected operators. After the restart, Flink would then call the
open() method defined in the operator code. If the logic references an object (like the
ChronicleMap) that lives outside of this lifecycle, the code might be leaking memory inadvertently.
After fixing the leak, we simulated task retries once again and monitored its impact on direct memory consumption.
As observed in Figure 9, we noticed a smooth memory consumption pattern as opposed to the elevating staircase pattern observed in Figure 3.
While the fix was specific to our application logic, the overarching takeaway is the procedure for finding the root cause. With the example of this war story, we just went over the nine debugging tenets as described by David J. Agans in his book “Debugging: The 9 Indispensable Rules for Finding Even the Most Elusive Software and Hardware Problems”:
- Understand the System
- Make it Fail (and make it fail fast)
- Quit Thinking and Look
- Divide and Conquer
- Change One Thing at a Time
- Keep an Audit Trail
- Check the Plug
- Get a Fresh View
- If You Didn’t Fix it, It Ain’t Fixed
We would like to thank Divye Kapoor from Pinterest’s Stream Processing Platform for helping with all the platform related questions and issues, and Naehee Kim, Filip Jaros, Insu Lee, Kuang He and Weihong Wang for supporting this effort and putting up with the numerous alerts while we resolved this issue.