Revisiting the performance improvements in Apache Spark 3

Adaptive Query Execution and Dynamic Partition Pruning under the hood

Radoslav Vlaskovski
Axel Springer Tech
7 min readJul 8, 2022

--

Apache Spark logo reference: https://spark.apache.org

Motivation

Apache Spark is one of the most popular engines for data processing. It provides APIs for SQL operations, Machine Learning and stream processing. When it comes to data processing there is hardly anything you can’t do with Spark. The most recent major version brought some valuable improvements in the optimization of the execution plan for batch processing. This blog post will revisit how these new features work and how effective they are.

My team, Business Integration from Axel Springer National Media & Tech, uses the Spark SQL API to process large amounts of data daily. The data is processed on multiple clusters with varying computational power. We apply different data transformations such as map, filter, join and aggregation. We have no user defined functions, because UDFs need their own user-defined rules to work well with Catalyst (Sparks Execution Plan Optimizer). To transform the query language to a set of operations for execution, Spark creates the following set of execution plans:

  • Parsed logical plan — created when the code is correctly parsed.
  • Optimized logical plan — Catalyst further optimizes the logical plan.
  • Physical plan — Spark computes an execution plan for the cluster.

Improving the effectiveness of plan optimization was the focus for the latest major update of Spark. In this blog post I will focus on its two biggest features, namely Adaptive Query Execution and Dynamic Partition Pruning.

Adaptive Query Execution

The main highlight of Apache Spark 3.x is Adaptive Query Execution (AQE) — a technique to optimize the execution graph at runtime. During execution Spark collects statistics for the already executed steps. Then based on these statistics, it adjusts the execution plan. This happens in the following ways:

Dynamically switching join strategies

Spark uses the runtime statistics to determine the appropriate join strategy. This is done to avoid sort-merge join in favor of either broadcast or shuffled hash join. However, the Spark documentation recommends the developers to always specify the join type, when possible.

Dynamically optimizing skew joins

Data skew occurs when data is unevenly distributed among partitions in the cluster. AQE detects skewness and creates smaller tasks to prevent it. For example, if we have a join with 4 tasks, where one takes a long time because of skewness, then AQE will split it into 5 evenly-distributed tasks. This will make better use of the computational resources. Later on I provide an example for this feature of AQE.

Coalescing of shuffle partitions

During aggregation AQE recognizes small partitions and brings them together to be executed as one task. This removes the overhead of creating separate tasks for smaller partitions similar to the optimization for skew joins.

AQE in action

To understand and see the results of AQE, I designed a test script. For the purpose I randomly generated two data sets with identical structure. Each data set has 1 million data points saved in the Apache Parquet format. The data has a column with randomly generated UUIDs and a linearly increasing integers starting from 1. The data is unevenly partitioned, with two partitions consisting each of 5% of the data and the rest 90% in the third and last partition. This skewness of the data means, that if we join the two data sets based on their partition column, we will observe the problem solved with AQE. Here is a sample of the generated data as CSV:

The test script joins the two data sets and then writes them out in CSV. Through this example I plan to analyze what happens to the execution plan when AQE is enabled.

Running the code without AQE resulted in Spark creating over 200 tasks. Most of them were very fragmented as seen from the screenshot below. With so many tasks, the cluster spends a lot of time on serialization and deserialization, instead of executing calculations. This can be mitigated by merging smaller tasks together.

Apache Spark tasks for skewed join without AQE

This is exactly what happens when applying AQE to the same script. The result was that there were only 13 continuous tasks, which did not waste so much time on serialization and deserialization. They were also completely independent of each other and there was no time spent waiting between executions as shown in the image below.

Apache Spark tasks for skewed join with AQE

Dynamic Partition Pruning

Dynamic Partition Pruning (DPP) is an optimization to the execution of a join between a small and large data set. If a filter is applied to the smaller data set, then, thanks to DPP, Apache Spark will also apply it to the larger data set. The optimization happens on two of the planning stages:

Logical Plan — filtering operations are executed during scan, so that it is applied as early as possible.

Physical Plan — filter the larger table during scan based on filtering criteria applied to the smaller broadcasted table.

Applying DPP to optimize the execution of join operations is only possible if all of the following criteria is met:

  • A filter is applied to the smaller data frame
  • The join criteria is defined using equals
  • One of the data frames is broadcasted
  • The join column on the larger data frame is also a partitioning column
  • If the larger data frame is on the left then the join type is either “inner”, “left-semi” or “right outer”
  • DPP is predicted by Spark to be more beneficial

The following execution graph shows roughly how DPP works. The smaller table is broadcasted only once, but it is used for filtering the scan of the data for the larger table and later for the join. With this broadcast the larger table is filtered and therefore less rows are joined.

Simplified Spark execution plan for broadcast join with DPP

DPP optimizes a set of very special use cases as is clear from all the requirements, that need to be met for it to optimize the plan. The following snippet shows the simple Spark script, which I used to analyze the execution plans optimized with DPP. I used data similar to the data for the AQE test, but with a lot less data for one of the data sets.

Performance improvement in production

Now that it is clear how DPP and AQE work, let us observe out how they improve performance on an application cluster in production. Our team deploys four different Apache Spark clusters of varying input size and processing power. The clusters execute Spark scripts in a consecutive fashion. The execution of each script with certain input data is called a step. Some clusters have fewer execution steps, but with larger input data sets. Others have more steps, but with smaller input data sets. The following image shows the differences between the average execution time for a step in each cluster. The red bar represents the execution time for Spark 2 and the blue one for Spark 3 with AQE and DPP enabled. The following results were observed:

Cluster 1 runs on 5 machines and has fewer steps, but each with a relatively large input size. The scripts executed on Cluster 1 have a significant amount of joins and aggregation. Thus, we observed a significant performance increase of 39%.

Cluster 2 has almost the same setup except the input data is finer grained and thus there are far more steps. The improvement in the mean execution time is far smaller at around 22%, because the conducted joins and aggregations had far less data to work with. However, since it deploys more steps, the overall improvement was almost comparable to that of Cluster 1.

Cluster 3 has only 2 machines and compared to the others has fewer steps and fewer data. However, it also has a very large improvement of nearly 38%.

Cluster 4 runs scripts, which mainly have mapping and mathematical operations. We did not observe a significant improvement in the execution time, but even here Spark 3 performed better although by a small margin.

Average execution time per cluster and Apache Spark version

Conclusion

It is evident, that AQE is highly effective in optimizing the execution plan in Apache Spark clusters. It accounts for all the performance improvements, which we observed in our system. It is however complicated to understand, how exactly Spark merges the tasks using AQE. This might leave some teams thinking of AQE as a magical black box. Hopefully, this blog post helped you understand better how AQE operates.

DPP on the other hand improves only a very specific use case, but it applies a clever optimization for it. The lack of this use case in our production scripts prevented me from exploring its impact on the performance. Overall Spark 3 was a great update and the magical improvement of between 20% and 40% was very impressive.

References

[*] Adaptive Query Execution, Spark Documentation

[*] Dynamic Partition Pruning in Spark 3.0, Anjali Sharma

Kudos to my team for all the help and support:

Business Integration @ Axel Springer National Media & Tech

--

--

Radoslav Vlaskovski
Axel Springer Tech

Software Engineer, Data and Web, Axel Springer, MSc CS @ TU Berlin