Faster Flink adoption with self-service diagnosis tool at Pinterest

Pinterest Engineering
Pinterest Engineering Blog
9 min readSep 17, 2021

Fanshu Jiang & Lu Niu | Software Engineers, Stream Processing Platform Team

At Pinterest, stream data processing powers a wide range of real-time use cases. In recent years, the platform powered by Flink has proven to be of great value to the business by providing near real-time content activation and metrics reporting, with the potential to unlock more use cases in the future. However, to take advantage of that potential, we needed to address the issue of developer velocity.

It can take weeks to go from writing the first line of code to a stable data flow in production. Troubleshooting and tuning Flink jobs can be particularly time-consuming, due to the number of logs and metrics to investigate and the variety of configs available to tune. Sometimes, it requires a deep understanding of Flink internals to find the root cause of issues during development. This can not only affect developer velocity and create a subpar Flink onboarding experience, but also requires significant platform support, causing restrictions to scalability of streaming use cases.

To make investigation easier and faster, we built out a Flink diagnosis tool, DrSquirrel, to surface and aggregate job symptoms, provide insights into the root cause, and suggest a solution with actionable steps. The tool has resulted in significant productivity gains for developers and the platform team since its release.

What is challenging about Flink job troubleshooting?

Massive pool of scattered logs and metrics, only a few of which matter

For troubleshooting, engineers usually:

  • scroll through a wall of JM/TM logs from YARN UI
  • check dozens of job/server metric dashboards
  • search and verify job configs
  • click through the Flink Web UI job DAG to find details like checkpoint alignment, data skew and backpressure

However 90% of the stats we spend time on are either benign or simply unrelated to the root cause. Having a one-stop-shop that aggregates only useful information and surfaces only what matters to troubleshooting saves enormous amounts of time.

Here are the bad metrics, now what?

This is a commonly asked question once stakeholders identify bad metrics, because more reasoning is required to get the root cause. For example, checkpoint timeout could mean incorrect timeout configuration, but also could be a consequence of backpressure, slow s3 upload, bad GC, or data skew; Lost TaskManager logs could mean bad node, but oftentimes is a result of either heap or RocksDB statebackend OOM. It takes time to understand all that reasoning and thoroughly verify each possible cause. However, 80% of the issue-fixing follows a pattern. This made us wonder — as a platform team, should we analyze the stats programmatically and tell stakeholders what to tune without having them do the reasoning?

Troubleshooting doc is far from enough

We provide a troubleshooting doc to customers. However, with the growing number of troubleshooting use cases, the doc is getting too long to quickly spot the relevant diagnosis and instructions for an issue. Engineers also have to manually apply if-else diagnosis logic to determine the root cause. This has added much friction to self-serve diagnosis, and the reliance on the platform team for troubleshooting remains. Besides, the doc is not great at call-to-action whenever the platform pushes a new job health requirement. We realize that a better tool is needed to efficiently share troubleshooting takeaways and enforce cluster-wise job health requirements.

Dr. Squirrel, a self-service diagnosis tool for troubleshooting

Given the above challenges, we built out DrSquirrel — a diagnosis tool for fast issue detection and troubleshooting guidance designed to:

  • cut down the troubleshooting time from hours to minutes
  • reduce the tools developers need for investigations from many to one; and
  • lower the required Flink internal knowledge for troubleshooting from intermediate to little

In a nutshell, we aggregate useful information in one place, perform job health checks, flag unhealthy ones explicitly, and provide root cause analysis and actionable steps to help fix the issues. Let’s take a look at some feature highlights.

More efficient ways to view logs

For each job run, Dr. Squirrel highlights exceptions that directly trigger restarts (i.e. TaskManager lost, OOM) to help quickly find the relevant exceptions to focus on from a massive pool of logs. It also collects all warnings, errors, and info logs that contain a stack trace in separate sections. For each log, Dr. Squirrel checks the content to see if an error keyword can be found, then provides a link to our step-by-step solution in the troubleshooting guide.

Dr.Squirrel suggests a solution to an issue when an error keyword is identified in the log.
Dr.Squirrel suggestion

All logs are searchable using the search bar. On top of that, Dr. Squirrel provides two ways to view logs more efficiently — Timeline view and Unique exception view. As shown below, the Timeline view allows you to view logs chronologically with class name and pre-populated ElasticSearch link if more details are needed.

Timeline view of logs with class name and pre-populated ElasticSearch link
Timeline view of logs

With one click, we can switch to the Unique Exception view, where the same exceptions are grouped in one row with metadata such as first, last, and total occurrence. This simplifies the process of identifying the most frequent exceptions.

Unique exception view shows same exceptions in one row with its first, last, and total occurrence
Unique exception view

Job health at a glance

Dr. Squirrel provides a health check page that enables engineers, whether beginners or experts, to tell confidently whether the job is healthy. Instead of showing plain metric dashboards, Dr. Squirrel monitors each metric for 1 hour and flags explicitly if it passes our platform stability requirements. This is an efficient and scalable way for the platform team to communicate and enforce what is considered stable.

The health check page consists of multiple sections, each focusing on a different aspect of job health. Quick browsing through these sections is all needed to get a good idea of the overall job health:

  • Basic Job Stats section monitors basic stats such as throughput, rate of full restarts, checkpoint size/duration, consecutive checkpoint failure, max parallelism over the past 1h. When metrics fail the health check, they are marked as Failed and ranked at the top.
Basic Job stats section is a table with each row representing one health check item such as reasonable checkpoint size, healthy throughput, no restarts, etc.
Basic Job stats section
  • Backpressured Tasks tracks the backpressure situation of each operator at fine granularity. No backpressure within a minute is visualized as a green square, otherwise a red square. 60 squares for each operator, representing the backpressure situation of the past 1 hour. This makes it easy to identify how frequently backpressure happens and which operator starts the earliest.
Backpressured Task section tracks backpressure of each operator of the past 1hour
Backpressured Task section
  • GC Old Gen Time section has the same visualization as backpressure to provide an overview of whether the GC is occurring too often and could potentially affect throughput or checkpoint. With the same visualization, it becomes obvious whether GC and backpressure happen at the same time and whether GC may potentially cause backpressure.
GC old gen section provides overview of whether GC is occurring for each container
GC old gen section
  • JobManager/TaskManager Memory Usage tracks the YARN container memory usage, which is the resident set size (RSS) memory of the Flink Java process we collect through daemon running on the worker nodes. RSS memory is more accurate because it includes all sections in the Flink memory model as well as memory that’s not tracked by Flink, such as JVM process stack, threads metadata, or memory allocated from user code through JNI. We mark the configured max JM/TM memory in the graph, as well as 90% usage threshold to help users quickly spot which containers are close to OOM.
JM/TM memory graph is a line chart with each line demonstrating the memory usage of one container. The graph has two red horizontal lines — one represents the max container memory, the other represents 90% of the max memory. A filter is provided to only show containers that exceed the 90% threshold. This helps users know which containers are close to OOM.
JM/TM memory graph
  • CPU% Usage section surfaces the containers that use more CPU capacity than the vcores they are assigned to. This helps monitor and avoid “Noisy neighbor” issues in the multi-tenant Hadoop cluster. Very high CPU% usage could result in one user’s workload impacting the performance and stability of another user’s workload.
CPU% usage section is a table with each row representing a container and it’s CPU% usage stats such as min, max and P90 of the past 1hour. The last column shows “Unhealthy” or “Healthy” depending on whether the CPU% usage is higher than what’s provided with the configured vcores.
CPU% usage section

Effective configurations

Flink jobs can be configured at different levels, such as in-code configurations at execution level, job properties file, command line arguments at client level and flink-conf.yaml at system level. It’s not uncommon for engineers to configure the same parameter at different levels for testing or hotfixing. With the override hierarchy, it is not obvious what value is eventually taking effect. To address this issue, we built a configuration library that figures out effective configuration values that the job is running with and surfaces these configurations to Dr. Squirrel.

Queryable cluster-wise job healthiness

Provided with abundant job stats, Dr. Squirrel becomes a resource center to learn cluster-wise job healthiness and find insights into platform improvements. For example, what are the top 10 restart root causes or what percentage of jobs run into memory issues or backpressure.

Architecture

As seen in the features above, metrics and logs are gathered all into one place. To collect them in a scalable way, we added a MetricReporter and KafkaLog4jAppender to our Flink custom build to continuously send metrics and logs to kafka topics. The KafkaLog4jAppender also serves to filter out logs that matter to us — warnings, errors, and info logs that come with a stacktrace. Following that is FlinkJobWatcher — a Flink job that joins metrics and logs that come from the same job after a series of parsing and transformation. FlinkJobWatcher then creates a snapshot of job health every 5 min and sends it to the JobSnapshot Kafka topic.

The growing number of Flink use cases have been introducing massive amounts of logs and metrics. FlinkJobWatcher as a Flink job handles the increasing data scale perfectly and keeps the throughput on par with the number of use cases with easy parallelism tuning.

Our Flink custom build sends metrics and logs separately to two Kafka topics. A Flink job called FlinkJobWatcher combines job metrics and logs into one job snapshot, then sends it to the JobSnapshot Kafka topic.
Our Flink custom build

Once the JobSnapshot is available, more data needs to be fetched and merged into the JobSnapshot. For this purpose, we built a RESTful service using dropwizard that keeps reading from the JobSnapshot topic and pulls external data via RPC. The external data sources include YARN ResourceManager to get static data such as username and launch time, Flink REST API to get configurations, an internal tool called Automated Canary Analysis(ACA) to compare time series metrics against a threshold with fine-grained criteria, and a couple of other internal tools that allow us to surface custom metrics like RSS memory and CPU% usage, which are collected from a daemon running on the worker nodes. A nice UI is also built out with React to make job health easy to explore.

Dr. Squirrel web service reads from the JobSnapshot topic, then merges in more job info from calling external data sources such as Flink REST API and YARN ResourceManager. The web service exposes APIs to the frontend built with React to allow users to explore job health more easily.
Dr. Squirrel web service

Future Work

We will continue improving Dr. Squirrel with better job diagnosis capability to help us move one step closer to fully self-serve onboarding:

  • Capacity planning: monitor and evaluate throughput, usage of memory and vcores to find the most efficient resource settings.
  • Integration with CICD: we are running a CICD pipeline to automatically verify and push changes from dev to prod. Dr.Squirrel will be integrated with CICD to provide more confidence about the job health situation as CICD pushes out new changes.
  • Alert & notification: notify job owner or platform team with a health report summary.
  • Per-job cost estimate: show cost estimate of each job based on resource usage for budget planning and awareness.

Acknowledgment

Shoutout to Hannah Chen, Nishant More, and Bo Sun for their contributions to this project. Many thanks to Ping-Min Lin for setting up the initial UI work and Teja Thotapalli for the infra setup on the SRE side. We also want to thank Ang Zhang, Chunyan Wang, Dave Burgess for their support and all our customer teams for providing valuable feedback and troubleshooting scenarios to help us make the tool powerful.

Read more about Stream Processing at Pinterest

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog, and visit our Pinterest Labs site. To view and apply to open opportunities, visit our Careers page.

--

--