Handling Data Skew in Apache Spark: Techniques, Tips and Tricks to Improve Performance

Suffyan Asad
12 min readJan 30, 2023

Introduction

This article is about detecting and handling data-skew in Apache Spark, and it will cover the following topics:

  • What is data-skew, and how it affects performance of Spark Jobs, focusing data-skew in join operations.
  • Detecting Data Skew using the Spark UI.
  • Some examples of handling data skew using both alterations to data as well as built-in optimization features available in newer versions of Spark.
Data Skew and straggling tasks

Data Skew — causes and consequences

Spark has data loaded into memory in the form of partitions. Ideally, the data in the partitions should be uniformly distributed. Data skew is when one or some partitions have significantly more data compared to other partitions. Data-skew is usually the result of operations that require re-partitioning the data, mostly join and grouping (GroupBy) operations.

Join operations (except Boradcast-Hash join) can cause data-skew because they require moving data with same join keys in both datasets being joined to the same executor. This re-partitioning of the data by the column(s) being joined can cause a data-skew if the data is unevenly distributed among different values of the partition key column(s).

Grouping and aggregating on one or more columns requires all data for each value of the column to be present on the same executor, and this requires re-partitioning of the data by grouping columns. If distribution of the data is uneven for unique values of the grouping columns, it can result in a data skew.

But why is it so bad? It depends on the extent of the data-skew, but if one or more partitions have significantly more data that the rest, the result can be performance bottlenecks, and wastage of resources. It can cause the following problems:

  • Straggling tasks: The tasks that are processing these larger partitions can lag-behind others due to the amount of work they have to perform, and can prevent the whole job from moving forward with next stages while they complete their work. This can result in delays as well as wastage of resources on the cluster because most of the resources become idle waiting for these straggling tasks to complete.
  • Spills to Disk and Out of Memory errors: If data in those partitions do not fit in the memory of the executors that are processing them, Garbage-collection may become a problem, or, Spark may have to spill the data on the disk, further slowing down these tasks. While the resources on the cluster maybe adequate for the data overall, this skewness can cause them to become inadequate for some partitions, resulting in decreased performance. In worst case, this can result in Out of Memory exceptions and cause jobs to fail.

Source Code and Example Data

Source code is available at the GitHub repository:

The data used in the code examples is the TIC trip record data:

Data dictionary is available at the following link: https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf

The data has been modified to generate data skew in the code examples. The trips data is Yellow Taxi Trip Records for January 2022 — June 2022, and the locations data is Taxi Zone Lookup Table. The code that adds the skew in pickup locations is:

Adding skew by making one pickup location bigger

The code adds skew by replacing multiple pickup locations in the data with one pickup location — 237.

Detecting Data Skew — Putting the Spark UI to use

Data-skew is difficult to detect using the query plan, because it explains the steps that are performed in a job, and it does not give insights into the data-distribution after each task.

To inspect the data distribution after each stage, Spark UI (accessible at http://localhost:4040 by default) can be used. Lets look at an example calculation that computes the average distance traveled by pickup zone, and by borough. This can be implemented using the following code:

Calculation of average trip distance by pickup zone and borough implementation

Also, to observe the data skew and its consequences, Adaptive Query Execution (AQE) needs to be disabled. This is a newer feature built-into Spark and is useful in preventing skew, which will be covered later in this article. Additionally, broadcast joins should be disabled as well. If broadcast joins are not disabled, the location details data frame is small enough that it can be broadcasted, whereas the scenario to be demonstrated occurs when both datasets are large enough to require sort-merge join.

Following is the code to create the Spark Session, and the main function to execute join operations:

Spark session creation with AQE and broadcast joins disabled, and main function to run the example

This job takes around 35–38 seconds on my computer. If we examine the query plan by accessing Spark UI, we get the following:

Jobs and their duration in Spark UI

The job has been divided into 4 stages, and the last two take the longest: 15 seconds and 10 seconds. Clicking on job 2 to examine its stages, we get the following:

Stages of Job 2 with total tasks and time taken by each stage

We can see that the stage 4 takes the longest: 8 seconds, followed by stage 2: 4 seconds. Lets examine stage 4 further. To do this, the Stages view can be accessed by clicking on stage 4, where the task breakdown, duration of each task, and summary statistics of all 201 tasks of this stage can be examined. Opening the Event Timeline, we see data skew:

Data skew — one task is taking significantly longer than all other tasks, delaying the completion of this stage

One task taking significantly more time than all other task. It is in fact taking 7 seconds, whereas the median is 50 ms:

Summary statistics of all tasks of stage 4

Generally, if there is a significant difference between Median statistic and 75th Percentile or Max statistics, it is a strong indicator of data skew. Ideally, the partitions should be as balanced as possible.

The long running task

Also, the number of records processed by this task is significantly higher compared to other tasks. This is a clear example of a task that is running significantly longer than most other tasks due to data skew, and is causing this stage to wait for this task to complete. This is a straggling task, and straggling tasks commonly cause delays as well as other problems mentioned earlier.

Addressing data skew

There are several ways to handle data-skew. There are generic techniques that can be applied such as using the broadcast join where possible, or breaking up the skewed join column, as well as techniques that depend on the data, such as including another column in the join to better distribute the skewed data.

Dividing large partitions — using a derived column

One way to better distribute the data is to add a column on both sides of the join. In this example, I’ll do this in the following way:

  • In the zone lookup data, which is smaller, repeat each row n times with a different value of a new column, which can have values 0 to n-1. This new column is called location_id_alt.
  • In the rides data, which is bigger, and is the one with the data skew, add values 0 to n-1 based on another column. In this example, I choose the pickup timestamp (tpep_pickup_datetime) column, convert it to day of the year using the dayofyear Spark SQL function, and take a mod n. Now, both sides have an additional column, with values from 1 to n.
  • Adding this new column in the join will distribute all partitions, including the one with more data, into more partitions. Additionally, the computation of the new column doesn’t involve anything that would require a shuffle to compute, there is no rank or row number operation on a window.

The join involving the derived columns can be implemented as the following code:

Adding additional derived columns to the join to split partitions

And running it gives the following steps in the Spark UI:

Updated job execution time

We can instantly see that the time for job 2 has dropped by 3–4 seconds, similarly, the last job (job 3) takes 2–3 seconds less. Next, looking into the stage 4 of job 2 gives the following task execution breakdown:

Execution times of the stages of the job are now reduced

Stage 4 now runs much faster, opening it like last time reveals that there is no single significantly long straggling task keeping the whole stage waiting for completion while all other tasks have completed their work:

Better data distribution among tasks

The summary statistics also reveal that the longest task is now 2 seconds, and median is 0.1 seconds. This indicates better distribution of data. In real-world scenarios, such distribution will be more efficient compared to the previous example, with no straggling tasks:

Summary statistics showing better distribution of work and shorter maximum task execution time

The overall execution time of this updated job is 32–35 seconds. This is a decrease of 2–3 seconds overall, which may not seem much on this amount of data, but with bigger data and bigger clusters, the benefit would increase significantly.

There is still lack of uniformity, and this can be further improved in following ways:

  • If it is possible to run summary statistics on the DataFrame, or if they are available to the system otherwise (can be passed externally from an upstream system for example), the breakdown can be made based on number of rows for each value of grouping column instead of a uniform single number. This will provide a more even distribution of the data.
  • If using newer versions of Spark (3.0 or newer, ideally 3.2 or newer), or if it is possible to use newer versions of Spark, it is possible to utilize Adaptive Query Execution (AQE) to further improve data distribution and handle skew.

Using Adaptive Query Execution (AQE)

Adaptive Query Execution is available in Spark 3.0 or newer, and is enabled by default for Spark 3.2.0 and newer versions. AQE uses “statistics to choose the more efficient query execution plan” as described in the documentation linked below. There are many features in AQE, the ones useful in this case have been be covered.

A great guide to using AQE for performance optimization can be found at:

Additionally, DataBricks has published an in-depth article on AQE and how it can be used for performance optimization:

AQE has a feature to handle skew by splitting long tasks, called skewJoin. This can be enabled by setting the property spark.sql.adaptive.skewJoin.enabled to True. Additionally, there are two additional parameters to tune skewJoin in AQE:

  • spark.sql.adaptive.skewJoin.skewedPartitionFactor (default value: 5). This adjusts the factor by which if medium partition size is multiplied, partitions are considered as skewed partitions if they are larger than that.
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default value 256MB). This is the minimum size of skewed partition, and it marks partitions as skewed if they larger than the value set for this parameter and also are marked as skewed by the previous spark.sql.adaptive.skewJoin.skewedPartitionFactor param.

Because this code example is working with a relatively small data, and the median partition size is less than 81KB, we’ll set the size parameter to 256KB, and the factor parameter to 3:

Enabling skewJoin to split large partitions

And then execute the first function (join_on_skewed_data) that does not add any additional column, we get an execution time between 33 and 35 seconds. Looking at the Spark SQL Query Plan (accessed by selecting the SQL/DataFrame icon on the top navigation bar, and then selecting the Query ID 1), we can see that after the shuffle step, an AQEShuffleRead step has been added, and it has detected one skewed partition, and has split it into two partitions:

The AQE step that detects and splits are skewed partition

The processing has more jobs overall now, but examining stage 6 of job 4, we see that now the large partition has been broken into two partitions:

The split partition is still larger than other partitions and the two tasks take more time compared to the rest

This is still not ideal, but the longest two tasks took 5 seconds to complete.

Next, there is another feature in AQE called Coalesce Partitions (spark.sql.adaptive.coalescePartitions.enabled). This is also enabled by default, and we initially turned it off for demonstration. When enabled, Spark will tune the number of shuffle partitions based on statistics of data and processing resources, and it will also merge smaller partitions into larger partitions, reducing shuffle time and data transfer. Lets see it in action by enabling full AQE:

Enabling AQE coalescePartitions when creating the spark session

And running the same function with no derived column, processing takes around 32–34 seconds. If we examine the jobs, we can instantly see a decrease in the number of tasks for all steps. They were around 400 before (number of shuffle partitions is set to 201), and now they are 4. This indicates that AQE has merged the data into fewer partitions:

Fewer partitions and tasks when coalesce partitions is enabled

Looking at the SQL query plan, we can see that the AQE step now includes both split and merge steps:

AQE splitting and merging partitions

The AQE step splits the larger partition into two, as before, and merges the smaller partitions into 2 partitions. The total number of partitions is now 4.

And examining the stage 6 of job 4 from completed tasks, we get the following picture:

Stage completed in 4 tasks

There are two tasks that took 1 second, and two tasks took 6 seconds. We can see that AQE did a great job of merging smaller partitions. Let’s help AQE by splitting the larger partition.

Running the join_on_skewed_data_with_subsplit function with AQE enabled, the processing time has improved improved to around 31–32 seconds. Examining the Query Plan, the partitions were merged into 9 partitions, and no partition was split, which is expected, because by adding the derived column to the DataFrame and the joins, each shuffle partition has been made smaller.

The AQE step

The biggest improvement is in the distribution of data among the tasks i.e. the resultant partition size. The 9 partitions are of roughly equal size, and all except one take the same 3 seconds to process:

Balanced partitions and tasks execution time achieved using AQE and adding derived column to the join operation

This is close to ideal distribution of data among partitions, and the best distribution of data so far, considering that Spark is using 8 cores, and 4GB of RAM in total.

In conclusion, AQE in Spark can help significantly in managing data-skew, but analyzing performance of the jobs, specially long-running jobs may provide opportunities to identify and mitigate data skew for better utilization of resources and better overall performance.

Key takeaways

  • Data skew is when one or a few partitions have significantly more data than the rest of the partitions. It can result in Staggering tasks leading to slow overall performance, spills to disk, and if the data is too much, out of memory exceptions.
  • Data skew can occur when data needs to be re-partitioned by one or more columns, specially in joins. It can happen sometimes in grouping operations as well.
  • Data Skew is difficult to identify using query plans, and it can be detected using the Spark UI, by inspecting the statistics of tasks in stages and identifying staggering tasks.
  • Data skew can be fixed by subdividing the larger partitions, and this can be accomplished often using adding other columns (existing or derived). Columns can be added to join operations to make the larger partitions smaller. Having a good understanding of the dataset, and its column values can lead to good insights to solving skewed data. Similarly, in grouping operations, multi-stage grouping with gradually reduced number of columns can be utilized to obtain the final result.
  • Built-in Spark features can be handy in solving data skew issues. Using broadcast joins wherever possible, and enabling Adaptive Query Optimization (AQE) is often helpful in solving data-skew problems.

--

--

Suffyan Asad

Data Engineer | Passionate about data processing at scale | Fulbright and George Washington University alum | https://pk.linkedin.com/in/suffyan-asad-421711126