Rabbit Holes in Flink Operations (Part 1)

Lucio Wu
Airwallex Engineering
6 min readMar 9, 2022

Apache Flink is a great distributed data processing framework for stateful computations, as well as over bounded and unbounded data streams. However, like all open-source software, it can be challenging to operate Flink and ensure it is running smoothly. The following blog series is not trying to persuade people to stay away from Flink, but rather, share the errors, resource exhaustion and weird performance issues that we have run into.

By sharing insights into the lessons we have learned, we hope to help you get further with Flink.

Case 1: Blob Server Connection Explosion

Our story begins with Flink on virtual machines (VMs). Initially, we ran a tiny Flink cluster consisting of one JobManager and three TaskManagers on 3 VMs. One day, jobs on the cluster could not fail over to new TaskManagers. Meanwhile we observed lots of log lines as below:

Failed to fetch BLOB, dfa36106a7e057a8cb8b60f3e5a69a2b/p-d5e13fb34e38c716503415b7f7fab62152e8b2a8–96f094f8ea5830204642d25a2e0f0e5d from platform-flink-1/X.X.X.X:YYYY and store it under /tmp/blobStore-f78822e9-a91a-43b2–8156–928fd4ec5aa8/incoming/temp-00046293

The blob server is a component of JobManager and it takes care of creating the directory structure to store blobs or temporarily cache them.

At first, we thought the blob server was down. But after a quick check, we found that the server was actually listening to the port which is shown in the log. With the netstat and ss tool, finally we noticed lots of connections to the JobManager’s blob server port.

In Flink documentation, we found one parameter about the blob connection config, “blob.fetch.num-concurrent”, which defines the maximum number of concurrent blob fetch requests served by the JobManager. And the default value is 50.

One possible cause is that the blob server was stuck in too many connections (time_wait state). So, when a TaskManager tried to connect to the blob server because of job restart or failover, new connections to the blob server could not be established.

We had to restart the JobManager with a higher value for the config to mitigate the issue. Then continuously monitor this metric to see whether we need to adjust the config further.

Lessons Learned

  • For any running system, different kinds of errors and exceptions are inevitable. The root cause could be bugs in code, mis-configurations, network issues and so on. It is better to prepare multiple debug tools so that engineers could quickly diagnose what was wrong with the system.
  • High-availability is a must-have for JobMangers in production environments.

Case 2 — JobManager and TaskManager Out-Of-Memory

As we continue to grow the Flink cluster, we found a lot of scaling limitations using virtual machines. So, we migrated to Kubernetes completely in about two months, and the number of Flink TaskManagers increased quickly due to business needs.

However, it’s difficult to set proper configs for Kubernetes pods where Flink jobs run.

Out-of-memory (OOM) is a very typical challenge for data applications when the volume grows, especially when we are using Kubernetes to run these data applications. Let me share some fun experiences of OOM we have observed in running Flink on Kubernetes.

OOM Due to a Flink Bug

The first OOM error we encountered was the JVM metaspace OOM in JobManager.

We observed this issue in the Flink cluster where batch jobs were submitted. In our monitoring system, we also noticed a pattern; the used metaspace size and the number of loaded classes would increase each time a batch job had been submitted. (See Figure 1 and Figure 2)

Figure 1: the used JVM metaspace

Figure 2: number of loaded classes

Since those jobs would be submitted everyday — but the classes would not be unloaded — OOM would eventually happen in JVM metaspace. After some searching, we found that it was a known Flink issue. Since batch jobs aren’t in our critical path, we decided to work around this issue by restarting the batch Flink cluster periodically.

Lessons Learned

  • It’s very important to monitor JVM metrics, such as heap size, metaspace, threads count, loaded classes and so on. Observing any pattern changes across different metrics over a long period of time, say 7 days, will help a lot in problem analysis.

OOM Due to Mis-configuration

The second OOM problem was caused by the inconsistency of memory settings in Kubernetes and Flink.

Our Flink cluster is deployed by a Helm chart we had developed. In the Helm values.yaml file, Kubernetes pod memory limits, Flink taskmanager.numberOfTaskSlots and taskmanager.process.size are configurable parameters with default values.

We reduced the taskmanager.numberOfTaskSlots and the memory limits of Kubernetes pods, but didn’t update the taskmanager.process.size accordingly. As a result, its default value in the Helm chart still took effect and it was more than the memory limits of Kubernetes pods. This mistake was hidden at first while we were applying the configuration. But after some jobs were running for a few hours and the used heap size continued to grow, some Flink TaskManager’s pods were killed due to OOM.

Lessons Learned

  • The memory configuration of Flink in Kubernetes is subject to a combination of taskmanager.numberOfTaskSlots, taskmanager.process.size and Kubernetes pod memory limits. It is error-prone to configure multiple parameters independently which will impact each other. It’s a typical bad code smell.
  • We later improved the Flink helm chart to avoid similar issues by calculating the taskmanager.process.size based on Kubernetes pod memory limits and taskmanager.numberOfTaskSlots.

OOM Due to Bugs in Flink Jobs

In our Flink cluster deployment, we didn’t use RocksDB, so most of the memory will be used for the JVM heap.

Two Flink jobs reported OOM in the JVM heap, meanwhile the full Garbage Collection (GC) was also triggered frequently. Since all TaskManagers have the same configuration, we were curious about what was wrong about the two jobs. We started to check the other metrics across all the jobs.

As shown in Figure 3, we noticed the checkpoint size of the two OOM’ed jobs were much larger than the rest. Besides, we also observed checkpoint failure in those two jobs.

Figure 3: Last Checkpoint Size

As the OOMs were happening in the production environment and the end-to-end latency for the two jobs started to increase, we decided to capture the heap dump for further analysis and temporarily increase the memory for the TaskManagers first to mitigate the issue.

Later, in the failure analysis, we discussed with developers about the logic of the two jobs and analysed the heap dump. We found Flink would not proactively remove windows that had expired. The removal needed to be triggered by an event coming after a window’s expiration time. As a result, we had accumulated a lot of expired windows in the memory, which led to OOM. We were not able to fix the root cause, but improved the jobs by reducing the data stored in the windows.

Lessons Learned

  • Again, observability is very important. More metrics equals more context. More context will help identify the root cause and mitigate the issue. It is also better to have more debugging tools to capture as much useful information as possible. For example, without the heap dump, it is difficult for developers to find what’s happening inside Flink.
  • According to Flink’s memory model documentation, Flink memory is composed of multiple memory areas such as heap memory, managed memory, direct memory and so on. We should continue to monitor the usage of different memory areas and tune according to our use case. Eg. reduce the managed memory as our jobs hardly use it.

In case 1 and case 2, we could see obvious error messages, but what if the system is just slow without error messages? Check out the next blog in this series of Rabbit Holes in Flink Operations.

Lucio Wu is a Senior DevOps Engineer at Airwallex.

--

--

Lucio Wu
Airwallex Engineering

Senior DevOps Engineer of Risk Engineering at Airwallex | Biz-Dev-Ops | SRE | Reader