Rabbit Holes in Flink Operations (Part 2)

Lucio Wu
Airwallex Engineering
6 min readMar 9, 2022

Welcome to part 2 in our series of Flink operation stories. In the last blog, we shared two cases about the connection issues and OOM errors. In this blog, we will discuss how we triaged a performance issue in Flink.

Case 3 — The Lost Five Minutes In Job Initialization

With more and more jobs running in our Flink cluster, we noticed it would take an increasingly longer time for a job to start running in TaskManager.

From Flink web UI, we observed that a job was in Initialization status for about 5 minutes after it was submitted. After the job turned to Running status, it would sometimes be stuck in Waiting (for the task slots to be allocated) and couldn’t actually get to Running status. The problematic job then blocks all the subsequent jobs.

Our initial (naive) conclusion was that some connections among the ResourceManager, Dispatcher and TaskManager were broken. Although there were enough available task slots, the ResourceManager couldn’t request and allocate them.

With this hypothesis, we decided to restart the free TaskManagers one by one. When we restarted the problematic TaskManager, bingo! The job allocation process got back to normal and all the other jobs also started to run successfully.

Later, we did some post-analysis, and found the following exception from log:

org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under X.X.X.X:YYYY-29a9f6.

TaskManager probably blocked the whole allocation process. We noted it down in our run book and would try to identify and restart quickly in the future.

Next, we started to troubleshoot why it would take 5 minutes to initialize a job. This was like a detective story and we already had some clues:

  • We had several Flink clusters, but the job initialization process was notably much slower in one cluster.
  • That cluster has the most number of running jobs.
  • Every job had a corresponding ConfigMap in Kubernetes to save the JobMaster and other meta information.

To resolve this mysterious case, we tried to find more clues from different sources.

Logging is a valuable source of information. It gave the timeline of what happened after the job submission. For example:

2022–01–11 07:53:24,239 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] — Submitting job c72e8c2978778a0a7e81fcb40ba654872022–01–11 07:53:24,792 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] — Added JobGraph(jobId: c72e8c2978778a0a7e81fcb40ba65487) to KubernetesStateHandleStore{configMapName=’ml-flink-dispatcher-leader’}.2022–01–11 07:53:24,796 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] — Create KubernetesLeaderElector ml-flink-c72e8c2978778a0a7e81fcb40ba65487-jobmanager-leader with lock identity 49ff0eee-33cd-4dc0–93fb-bfc844c351ea.2022–01–11 07:53:24,823 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] — New leader elected 49ff0eee-33cd-4dc0–93fb-bfc844c351ea for ml-flink-c72e8c2978778a0a7e81fcb40ba65487-jobmanager-leader.2022–01–11 07:59:01,204 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] — Starting DefaultLeaderElectionService with KubernetesLeaderElectionDriver{configMapName=’ml-flink-c72e8c2978778a0a7e81fcb40ba65487-jobmanager-leader’}.

The log confirmed the JobGraph was received normally. By looking at the timestamp in Kubernetes, we noticed the ConfigMap (ml-flink-c72e8c2978778a0a7e81fcb40ba65487-jobmanager-leader) was actually created right after 07:53:24. However, something made DefaultLeaderElectionService wait for 5 minutes after the ConfigMap was created.

However, in a normal Flink cluster, it only takes a few seconds to start the DefaultLeaderElectionService after creating the ConfigMap. We also compared the other logs between the slow cluster and the normal cluster. The slow cluster had lots of log lines such as:

INFO org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver [] — Creating a new watch on ConfigMap ml-flink-ea2ee813ac3deb87011f05fca17dd4c9-jobmanager-leader

Thus, our first guess was that the problem was related to ConfigMap. Meanwhile, we found two other clues:

  1. The ConfigMaps of the canceled jobs were not cleaned up, so there were many unused ConfigMaps in Kubernetes
  2. Someone met a similar issue in a Flink email thread and the poster resolved it by removing the unused ConfigMaps

As a result, we tried cleaning the unused ConfigMaps too. Unfortunately, it didn’t help in our case.

We had to continue investigating what really happened in the lost 5 minutes. Besides logging, a thread dump could also provide insight into what was happening. We submitted a new job, captured a thread dump and found something interesting:

Some thread was blocked and waiting to lock an object

“KubernetesLeaderElector-ExecutorService-thread-1” #40690381 daemon prio=5 os_prio=0 cpu=4.96ms elapsed=257.60s tid=0x00007f8de99e6800 nid=0x2cf66c waiting for monitor entry [0x00007f8dea53c000]java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:200)- waiting to lock <0x00000000c5c0d638> (a java.lang.Object)at org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver$LeaderCallbackHandlerImpl.isLeader(KubernetesLeaderElectionDriver.java:208)And another thread locked this object but also was in waiting state.“cluster-io-thread-6” #467 daemon prio=5 os_prio=0 cpu=896900.77ms elapsed=1388791.08s tid=0x00007f8df330d800 nid=0x2de waiting on condition [0x00007f8dd9fef000]java.lang.Thread.State: WAITING (parking)at jdk.internal.misc.Unsafe.park(java.base@11.0.13/Native Method)- parking to wait for <0x00000000c5c0d538> (a java.util.concurrent.CountDownLatch$Sync)at java.util.concurrent.locks.LockSupport.park(java.base@11.0.13/LockSupport.java:194)at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.13/AbstractQueuedSynchronizer.java:885)at org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.start(DefaultLeaderElectionService.java:89)- locked <0x00000000c5c0d638> (a java.lang.Object)at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:154)- locked <0x00000000c5c0d690> (a java.lang.Object)

The thread of DefaultLeaderElectionService seemed to be blocked and couldn’t elect the leader. Next we checked the Flink v1.12 code, and found some code:

Listing 1: DefaultLeaderElectionService.java

Listing 2: KubernetesLeaderElectionDriver.java

Another piece of code rang a bell:

Listing 3: KubernetesLeaderElectionDriver.java

Remember that we had observed lots of “Creating a new watch on Configmap” in log. So according to the code and log, it indicated KubernetesTooOldResourceVersionException was thrown frequently. It seems there was some concurrent connection issue on ConfigMaps, which also resulted in the leader election problem.

With all the information above, we suspected this issue was related to the Flink Kubernetes-based High Availability (HA) service and got interested in how the HA service works for the Flink components. The blog — How to natively deploy Flink on Kubernetes with High-Availability (HA) — explained the mechanism. We also started to look into whether there were known issues related to Flink jobs running with Kubernetes HA.

Finally we found a jira that was very similar to our issue. We figured out that Flink components (e.g. ResourceManager, JobManager, Dispatcher, RestEndpoint) have separate leader election services and ConfigMaps. In particular, every job has its own election service and ConfigMap. The concurrent requests limitation determines how many ConfigMaps can be watched by the Flink Kubernetes Client at the same time.This jira also mentioned the workaround which is to increase kubernetes.max.concurrent.requests as every job needs 3 concurrent requests.

We were now very close to the fix! The next step was to test the workaround in different environments. We tried to reproduce the issue in the test environment, and then increased the kubernetes.max.concurrent.requests based on the number of total Flink jobs. We submitted a job again and it was exciting that the job was initialized almost immediately! After doing more testing to ensure there was no side-effect, we applied the fix in the production environment too and the result was good.

Now, we are confident that a lot more Flink jobs could be running to meet our future business needs.

Lessons Learned

  • It’s important to understand the mechanism of how the distributed system interacts with each other. A small configuration could lead to big problems.
  • An apples-to-apples test environment is very helpful to reproduce the issues in the production environment. System testing (including performance testing, reliability testing etc) would help catch this kind of issues, especially resource exhaustion or race condition issues.

Summary

The cases represent three kinds of issues during system operation: errors and exceptions, lack of resources, and performance issues.

Issues with exceptions and errors are usually obvious compared to other kinds of issues. Engineers could search the errors online or ask other experienced engineers for hints and tips. But a reminder that we should still dig into the issues and make sure the root cause is what we think it is.

Resource exhaustion is a bit tricky. Not only could it be due to the transaction/requests volume, but also the different types of resources — cpu, memory & others could impact each other. Observability is also very important in this case, especially in Kubernetes world

  • Pods may be terminated because of OOM, so it is hard to find the crime scene
  • Nodes may be auto-scaled and pods may be re-assigned

Performance issues are the most challenging ones, because sometimes there are no obvious metrics or logging. In order to resolve this, both deduction and induction need to be used. With log, thread/memory dump and code, deductive reasoning would lead us to the root cause. Also, observing from different environments would offer more ideas.

However, incidents will always happen, so our journey will never end.

Lucio Wu is a Senior DevOps Engineer at Airwallex.

Many thanks to Tianshi Zhu for reviewing and providing input to this blog.

--

--

Lucio Wu
Airwallex Engineering

Senior DevOps Engineer of Risk Engineering at Airwallex | Biz-Dev-Ops | SRE | Reader