How we monitor thousands of Spark data pipelines

Thanan Sapthamrong
14 min readMay 2, 2023

--

Hi, I’m Thanan, a Senior Data Platform Engineer at LINE MAN Wongnai (LMWN). The responsibility of the Data Platform Engineer is to ensure the reliability and availability of data platforms and services, and also make an improvement to the overall system as well. So, this blog will take you to see one of the improvement journeys that we implemented to monitor thousands of data pipelines running on Apache Spark.

Overview of Batch Data Pipelines

Let’s talk about the overall batch data pipeline system. In 2023, we are mainly using Apache Airflow as the main workflow orchestrator to support pipelines from different parties such as data engineers, data scientists, and analytics engineers. Currently, there are 3 Airflow Instances as following

  1. DAP Airflow is the Airflow that our data engineers and data scientists mostly use for ETL stuff. The data is collected from several internal and external sources such as databases from go-live applications in our company, object storage from GCS and AWS, Google Sheets from the business sides, and 3rd party APIs. We also use this Airflow to support the pipelines for creating the precomputed table as well (a table that uses another table(s) to generate new data by joining and/or aggregation)
  2. DS Airflow is the Airflow that our data scientists use for data science works only such as feature engineering, model training & prediction, finding insights (data exploration and analysis)
  3. DBT Airflow is the Airflow for our analytics engineers. They will focus on working with SQL to create the precomputed tables. The pipelines are built on top of DBT (Data Build Tool) in order to leverage the feature of data modeling reusability.
Overall Batch Data Pipelines of LINE MAN Wongnai in 2023

Based on this picture, we have 83.9% of DAGs (Direct Acyclic Graph), or 386 DAGs that process the data through Apache Spark from every Airflow platform. In addition, if we count the total Airflow Task Instance. we will end up with a total number of 2,560 Task Instances. This comes to the conclusion that right now, we have 2,560 Spark data pipelines in our data platform.

So, What’s the problem (s) ?

A large number of data pipelines also comes with a big need for data pipeline maintenance and monitoring as well. If we don’t take care of them well and don’t have a good methodology to monitor the health of these pipelines. It will definitely affect our business stakeholders in just a few hours. Thinking about this case, we have pipelines that start running at midnight every day, and one of them got failure or was delayed. This will result in making the downstream pipelines got issues too. Finally, the business guys could take action slower than before or make a worse decision due to wrong data / missing data.

So, getting back to our topic. What’s happening with data pipelines between Airflow and Spark? What are the issues that could trouble the data pipelines? The workflow between Airflow and Spark is briefed below

  1. Airflow starts sending jobs to the Spark cluster through spark-submit CLI or Livy REST API
  2. The Spark cluster receives the jobs and jobs start running
  3. Airflow keeps waiting for running results from Spark
  4. When the Spark jobs finish (or fail), Airflow collects the log.
Pipeline Flows Between Apache Airflow and Apache Spark

Based on this workflow, the key problem is the lacking of Pipeline Observability. We don’t know what actually happens with our Spark jobs. Do they work well or just workable? We can only observe the issues from the Airflow side only by the errors that are sent to our Slack monitoring channel. As the screenshot below, we will know that this pipeline failed. Then, we will fix the error based on the Airflow log or Spark Web UI by adjusting the Spark configuration or fixing the data bugs. However, we still don’t know if is there any pipeline that is going to have a problem soon or how many pipelines that have room for improvement. If we know this, It would help us a lot in order to reduce the risk of data delay or get rid of common error patterns, and we will also have an opportunity to speed up our pipelines as well.

Airflow Error when Spark Job Failed

The current process to speed up or fix the errors is required our engineers to look up the Spark Web UI for each pipeline. The Spark Web UI also has its own limitation that it has only 14 days of retention. As I mentioned to you before that we have thousands of pipelines, It is difficult and time-consuming to find out which pipelines need fixing and improvement.

The common issues that we have found from Spark data pipelines so far are below

Data Skew — Due to Spark’s working mechanism being based on the cluster and it has many workers help process data parallelly. If one worker spends more time than the others. It could relate to the Data Skew issue. For example, as in the picture, we have 200 Spark tasks to be computed. 199 tasks are already completed within 10 minutes (assume), and there is one task left that spends 47 hours and it is still running.

Data Skew in Apache Spark ref: https://www.clairvoyant.ai/blog/optimizing-the-skew-in-spark

Data Spill — This issue happens when the execution memory of the Spark worker is not enough, and Spark will first try to use the other parts of the memory block. If the memory is still not enough. Spark will use the disk to store the data instead. This results in having more spending time reading/writing data to disk. As we know that RAM is always faster than Disk. So, this issue can be also another problem that could degrade our pipeline’s performance.

Memory Spill and Disk Spill in Spark Web UI

Over/Under Resource Utilization — When we have lots of jobs, it takes time to find out which jobs request how much of resources in terms of CPU core and memory. Some jobs might request a large number of resources. In fact, it can use just a few resources to fit its own job, and we can still get the same pipeline runtime. The one major issue when requesting lots of resources is the total resource capacity will be insufficient at that time. This can also lead to data delay too since the other pipelines have to wait for the available resources.

So, how we can know that these issues happened with our pipelines? Let’s go to the next section

The Solution, Spark Listener

We decide to use Spark Listener to solve the problems. But first, what’s Spark Listener?

Spark Listener is an interface that provides developer API in order to listen to Spark callback events. We can make use of Spark callback events to collect statistics of each event, and export useful statistics to somewhere else for furthur analysis, monitoring purpose, etc. Spark Web UI also uses this listener to export the data to display on the web UI as well.

Therefore, we decided to implement our own Spark Listener based on Java in order to export statistics of every Spark pipeline in our data platforms. We will mainly focus on the statistics calculation of Spark Task and Spark Stage. But what are Job, Stage, and Task in Spark? Let’s see the below explanation.

  1. Spark Job — When we call actions like save, collect, count, or functions that retrieve results from Spark. The Spark Driver will create a Job. Jobs will be able to run in parallel.
  2. Spark Stage — In every Spark Job, It can contain many stages. Each stage contains transformation logic such as filter, join, sort, repartition, etc.
  3. Spark Task — It’s an execution unit in Spark (the smallest unit in Spark) that is a part of the Spark Stage. One Spark Stage can comprise many tasks. Spark Executor computes this unit in parallel. Each unit computes the partitioned data in memory.
Relationship of Spark Job, Spark Stage, and Stage Task (ref: https://stackoverflow.com/questions/42263270/what-is-the-concept-of-application-job-stage-and-task-in-spark)

The workflow of this Spark Listener starts from collecting data in each Spark callback event and processing the data into useful statistics for each Spark Stage. The examples of information that we collect in each Spark Stage are a list of Spark configuration, how much data spilled in memory and disk, percentile in different range values (P50/P75/P90) of runtime spent in Spark Tasks, total written bytes and records, Error (if pipeline failed), and skewness of every runtime of Spark Tasks in each Spark Stage. This will be calculated until the pipeline is finished. Finally, the listener combines all the statistics of every Spark Stage and sends the data to the DTP Interval Server through REST API.

DTP Internal Server is one of another service implemented by our platform team. This service is for platform logging purpose. The service is a server that receives requests via REST API and pass the payload to Apache Kafka. Then the data will be synced into Hive table every 2 hours.

Getting back to the skewness calculation of each Spark Stage, there are 3 methods used for skewness measurement as described below

  1. Standard Deviation (SD) — If the runtime of A Spark Task exceeds the 2*SD or 3*SD of the SD of its Spark Stage. We count this as a skew task.
  2. Skewness FormulaThis is straight forward method. This method measures a skewness of distribution from given input values and returns a numeric value.
  3. Max(Spark Task Runtime) — P90(Spark Task Runtime) > 10 minutes —If the duration between the maximum task’s runtime and the task’s runtime at P90 is more than 10 minutes. We also count this as skew as well.

We use many methods to find the skewness due to we really don’t know what are the suitable methods for our collected data. Each method has also its own drawback. For example, the third method won’t be able to detect skewness if the runtime at P90 and maximum runtime do not exceed the threshold.

When we have the data stored in the Hive table. Now, we are ready to find insight through querying and we can also calculate the SLO too. Moreover, we don’t have to worry about retention anymore. It can store a long period of data for several years.

Conceptual Flow of Our Spark Listener

During we deployed the listener, we also add the extra Spark Configurations from every Airflow platform. Thus, we will also use this to trace back to the source pipeline’s logic.

  1. spark.dap.source — where is the source platform or source client that submits jobs
  2. spark.dap.platformEnv — Environment of the source platform or source client
  3. spark.dap.appId — The source pipeline that contains Spark source code logic. We use Airflow macro to fill this information by using this format dag_id — task_id — run_id

Example of spark-submit command when we use the Spark Listener, together with extra Spark configuration

spark-submit \
--conf \"spark.extraListeners=dap.LMWNSparkListener\" \
--conf \"spark.jars=dap-spark-listener-1.0.0.jar\" \
--conf \"spark.dap.source=dap-airflow\" \
--conf \"spark.dap.platformEnv=production\" \
--conf \"spark.dap.appId=wongnai_import--transform_and_update_wn_restaurant--scheduled__2023-04-13T02:00:00+00:00\" \
script.py

Let’s Get into Monitoring

It’s almost the final part. This section will take you on a tour of how we monitor our Spark pipelines. First of all, we set the SLOs (Service Level Objectives) as below.

  1. Runtime — How many pipelines have runtime exceeds the time limit? The time limit is based on the cron expression of Airflow. For example, the runtime of the @hourly pipeline should be less than 70% of 1 hour or 42 minutes, and the runtime of the 30 */2 * * * pipeline should be less than 1 hour.
  2. Skew — We classify skew pipelines by combining the answers from 3 skewness detection methods (same as the concept of ensemble technique in data science). If one of the answers tells us that this pipeline is skew. The number of this SLO will be reduced.
  3. Spill — This SLO covers the area of spill in memory and disk. Even if a pipeline has spill in memory only 0.1GB. We will also count it as a spill pipeline.
  4. Failed Apps — the last SLO is about failure. It’s easy to understand. If a pipeline has a failure. The SLO reduces. That’s it.

As the below picture, these pie charts represent the 4 SLOs. In addition, we also use the data from spark.dap.appId to join with the DAG’s metadata in order to get the DAG’s tier. DAG’s tier is a priority of DAG defined by our data engineers which also represents the data’s priority as well. When we separate the SLOs by DAG’s tier. We can have a view of priority. For example, tier-1 needs to be fixed first, follow by tier-2, tier-3, and tier-4.

SLOs of Spark Batch Data Pipelines

In the snapshot of our SLOs, we faced a low number of SLO in the runtime aspect. There are lots of spills that happened with our pipelines. On the other hand, the skew SLO is almost close to 100%. For the SLO of Failed Apps, it should be 100% for the ideal case which means that we got no error at all. In fact, it does not. Hence, how can we improve and increase these numbers? this brings us to the second monitoring part which is more detailed at the DAG level and Task Instance level.

The first case is the pivot table that displays the list of DAGs whose runtime exceeds the time limit and counts them in a daily manner. You will see that there is one pipeline that has a very very red color and occurs almost every day. This is a pipeline that runs every 30 minutes. Its runtime is close to the next DAG run e.g. 28–29 mins which is why the runtime exceeds. It should be less than 70% of a half hour or 21 minutes. However, this shows that it’s the tier-3 pipeline. So, we can focus on runtime’s improvement in tier-1 pipelines first due to it has more impact on business stakeholders.

Pivot Table of Pipelines Exceed Time Limit per Day

The second one is still based on the pivot table. The below table shows the list of DAGs together with their Task Instances that have spill issues, whether it’s from memory or disk.

The Pivot Table of Pipelines has the Spill Issue

The third table is the summary of errors of each DAG and Task Instance. We can know how many same errors happen with the specific pipeline. Moreover, we can also see the full error and use YARN Application Id to search in Spark History Server for more debugging purposes.

Pivot Table of Failure Reason Summary per DAG and Task Instance
A Table of Failure Reason

For the skew pipelines, since we collect the data at Spark Stage granularity, we can see which id of Spark Stage and Spark Job has skew. Then, there is YARN Application Id for tracing back the issues in Spark History Server.

A Table of Pipelines that have Spark Skew Stages

Besides the table visualizations, we also have a graph that helps us find which pipelines use lots of resources but produce few output data. As the below figure, the scatter plot shows that there are several pipelines that use 100 Spark executors to write data less than 5GB. Some of them can only use 5 Spark executors to fit their job and runtime.

Scatter Plot between the number of executors and written bytes

Lastly, we can find pipelines that write the non-optimal files to HDFS. This issue is a case that a pipeline writes too big file(s). The file size should be close to the HDFS Block Size. In our case, it is 256MB. So, if we write too huge file(s). It can lead to spill issues, Out-of-Memory issues, and slow down the runtime due to we gather the distributed data in the Spark cluster into only a few workers. So, one worker may not be able to handle that.

List of DAGs and Task Instances that produce non-optimal file sizes

Example Results after We have Monitoring

Let’s see the real improvement case after we had the monitoring

1st Case — This case is one of our pipelines that runs on a daily basis. The pipeline executes a query through SparkSQL to create a precomputed table. The pipeline runs smoothly every day without any retry. The issue is the pipeline has a skew issue, and it takes more than 2 hours to finish. We detected this issue from the above pivot table and we changed one line of code of Spark’s join strategy to enable the Broadcast Hash Join. Finally, the overall runtime has been reduced to less than 30 minutes approximately.

Case#1 — Before and After We Have Monitoring

2nd Case — It’s our pipeline that has hourly DAG runs. We found that the pipeline has a lot of Airflow Retry in some runs, and the root cause is the insufficient resource of Spark workers and the job itself does not write the optimal file size to the HDFS. Therefore, this makes Airflow tries to run the same job 2–3 times in order to make it completed. Moreover, the retry mechanism works well and it does not reach the retry limit. Then, it has no error sent to our Slack channel at all. After we adjust the resources and apply the repartitioning mechanism to adjust the number of written files. It results that there is no second retry happens.

Case#2 — Before and After We Have Monitoring

Final Thought

Coming to the final thought. After our team has the Spark Listener to help export the pipeline’s statistics and also has the Dashboards / Graphes to help us go direct to the points. Our team has started to fix the problems easier and faster. Not also fixed the known issues, but we also got the unknown issues like the second case as well. As the Data Platform Engineer, we got a solution that can support every Spark user from different teams at the same time. Going to the overview of the platform, we also have the SLOs to give us the overall scores of our platforms as well. So, the pipeline observability increases and it affects the end of the data consumer which is our business stakeholders can ensure their decision-making based on the data.

One last thing, we currently have 8,000 Spark pipeline runs per day or 600 Spark pipeline runs per hour. There are 16 Terabytes of written data per day or translated into 50 billion records. With this scale of data engineering work, we still need talents to join our team, not just only Data Platform Engineers to ensure the stability and reliability of the platforms. But you can be Data Engineer to help manage our ETL pipelines and increase the data quality to support our company. Or if you are proficient at converting complex business contexts into SQL. You can join the Business Intelligence team. If you are interested. Please go to https://careers.lmwn.com/data-and-analytics

Thanks for your reading :D

--

--