A hitchhiker’s guide to Spark’s AQE — exploring dynamically coalescing shuffle partitions

João Pedro Santos
BiLD Journal
Published in
13 min readFeb 2, 2021

In this series of articles, I will walk you through a brief overview of the exciting new changes that the Apache Spark engine has gone through with its 3.0 release, more specifically the Adaptive Query Execution (AQE).

In this article, we will look into the motivations, challenges, and solutions that AQE seeks to solve. Also, we will address one topic in more detail — dynamically coalescing shuffle partitions.

So what is Apache Spark?

If you are reading this article, chances are that you already know what Apache Spark is (and if you do, please skip this intro). However, I wouldn’t be methodical enough if I didn’t include a small introduction for those who stumbled into this article by chance and might be interested to read (I know the chances are low but I’ll take them nonetheless!!).

Apache Spark is one of the most popular open-source distributed data processing engine designed specifically for large-scale data processing. It is an engine that is so flexible that you can work with it either on-premises or in the cloud through multiple vendors.

The main design features of Spark are:

  • Speed;
  • Simplicity;
  • Modularity;
  • Extensibility.

Speed is built by having an optimized framework that takes advantage of high-performing CPU’s and memory, efficient multithreading and parallel processing of UNIX-based systems, and a unique query engine that constructs its query computations as directed acyclic graph (DAG).

A DAG has the distinctiveness that it can be broken down into tasks and each task can, in turn, be parallelized across workers. Also, since version 2.0, Spark’s speed property is also augmented by having whole-stage code generation built-in the engine, which is a physical query optimization that aggregates multiple physical operators into a single optimized Java function and removes unnecessary virtual calls.

To achieve simplicity, Spark provides a higher-level rich set of transformations and operations built on top of distributed data structures (RDD’s) that are easy to use and provide a much friendlier programming model for big data application development.

Along with its apparent simplicity, Spark is flexible enough to support a number of different programming languages such as Scala, Java, Python, SQL, and R and modular enough to provide a unified library with core modules as components (Spark SQL, Structured Streaming, MLLib, and GraphX) that run on the same engine. An easy way to think about Spark’s modularity is to compare it with a swiss army knife: for different purposes, you have different tools in the same package.

Because Spark’s main focus is on the in-memory computation, where the data is stored is not really an issue. Spark’s DataFrame readers and writers can be modified to connect to a large number of external data sources, which means that the system is extensible and adaptive.

A brief history of AQE

The idea of adaptive execution/query planning has been an academic research topic for many years, but in the context of Spark, it was first introduced by Spark 1.6 albeit with limited functionality and extensibility.

If you have read the introduction, you already know that one of the key aspects behind Spark’s design is speed and high-performance. This further “need for speed” (pun intended) and high performance is the exact motivation behind AQE’s conception.

In order to be high performant, the Spark engine needed to be smart(er) about its query planning phase, which means that it needed to tackle the biggest architecture limitations at the time:

  • The first limitation was that the Spark engine required execution instructions for a job to set in advance, meaning that it would be difficult to predict the behavior of the data and UDF’s well enough so that better execution plans could be generated during runtime;
  • Another limitation was that since Spark leverages a cost-based optimization framework that collects a variety of data statistics, outdated statistics and imperfect cardinality estimates might occur which lead to sub-optimal query plans.

With the goal of tackling those limitations by re-optimizing and adjusting query plans based on runtime statistics, the Intel big data team along with Databricks prototyped and experimented with a more advanced version of AQE and shipped this version to Spark 3.0.

The bread and butter of the article

What is AQE and why is it needed?

To formally define AQE, I’d like to quote Maryann Xue at Spark+AI Summit 2020 in the “Adaptive Query Execution: Speeding Up Spark SQL at Runtime” talk:

AQE is a query re-optimization that occurs in query execution (…) [It] is a dynamic process of query optimization that happens in the middle of the query execution using runtime statistics as it’s optimization input.

In simpler words, it is an optimization of the query execution plan that Spark query planner uses to adapt for better execution plans at runtime, based on statistics collected during execution. These changes were designed to achieve better performance and parallelism and at the same time to decrease the effort involved in tuning configurations.

Apache Spark like many common data processing engines/frameworks has very little knowledge about the data before it starts processing it and as a result, a single processing job could face the following processing challenges:

  • Incorrect number of shuffle partitions;
  • Incorrect join strategies;
  • Highly skewed datasets;
  • Generate better physical plans based on runtime statistics.

AQE’s framework

OK, at this point we already know that AQE is some kind of optimization that happens somewhere in the query optimization phase, but we still don’t know when and where it happens. To answer the question of when to optimize, we first need to know the following details of Spark’s processing pipeline:

  1. Spark often combines multiple operators into a single whole-stage codegened task (often referred to as pipelining);
  2. Shuffle and exchange operations divide pipelines into query stages because there is insufficient data in the worker node to finalize the assignment given by the driver;
  3. Each query stage results in the materialization of an intermediate result;
  4. Whenever a query stage occurs, further stages can not proceed until all parallel processes running the materialization have completed.

Since there is a limitation for when further stages can proceed, there is also an optimization opportunity to leverage, in this case, the opportunity lies in the collection of statistics for all partitions before successive operations start:

A diagram representing AQE’s framework
AQE’s framework taken from Databricks

The inner details of this framework are better explained in the article “How to Speed up SQL Queries with Adaptive Query Execution”, but for reference, I will quote a brief part of it:

When the query starts, the Adaptive Query Execution framework first kicks off all the leaf stages — the stages that do not depend on any other stages. As soon as one or more of these stages finish materialization, the framework marks them complete in the physical query plan and updates the logical query plan accordingly, with the runtime statistics retrieved from completed stages. Based on these new statistics, the framework then runs the optimizer (with a selected list of logical optimization rules), the physical planner, as well as the physical optimization rules, which include the regular physical rules and the adaptive-execution-specific rules, such as coalescing partitions, skew join handling, etc. Now that we’ve got a newly optimized query plan with some completed stages, the adaptive execution framework will search for and execute new query stages whose child stages have all been materialized, and repeat the above execute-re-optimize-execute process until the entire query is done.

To answer the question of where the AQE is plugged in, we must look into the Spark SQL overall execution mechanism. As we formally defined before, AQE is an optimization of a query execution plan, hence its natural place is in the logical optimization step:

A graph represention the position of Adaptive execution in the overall processing scheme
Adaptive execution in the Spark processing framework

After the physical execution plan is selected the DAG of RDDs is generated, stages are created by the Spark Scheduler at shuffle boundaries, and later submitted for execution. Since the execution plan can’t be updated during the execution, the idea of adaptive execution was to split the logical plan into multiple stages at shuffle boundaries.

At this point, I expect you already know on a high level why there were changes applied to the execution plan and where those changes were applied, but you might not know what were these changes. Fear not young traveler, I have a high-level diagram for you:

Adaptive query execution QueryStage and QueryStageInput at shuffle boundaries
Adaptive execution physical plan adapted from Databricks

If you look at the diagram above, you can see how a physical plan looks like before and after AQE is applied and you can deduce that the execution plan is now divided into multiple QueryStagesat shuffle boundaries.

This is in fact one of the major changes AQE applied to the overall processing framework, but it does not answer the question: what is exactly anQueryStage and QueryStageInput?

  • A QueryStage represents a subtree that runs in a single stage and has leaf nodes called QueryStageInput . QueryStage materializes its output at the end of its execution;
  • AQueryStageInput is a child node for QueryStage and can have multiple child stages. Each QueryStageInputhides its child stages from the parent node and retrieves only the results of the child stages as input for QueryStage.

Query stages are created via the following steps:

  • First, the query tree is traversed bottom-up;
  • If an exchange node is reached and all its child query stages are materialized, then a new query stage is produced for this node;
  • If a query stage materializes its output, the remainder of the query is re-optimized and planned via the latest statistics produced by all materialized stages;
  • Repeat the steps above until all query stages have been materialized;
  • Execute what's left of the plan.

The shuffle partition challenge

Now that we have tackled the motivations behind AQE’s conception, let's take a detailed look into one of the challenges the framework seeks to solve: the incorrect number of shuffle partitions at runtime.

To better explain what is the challenge, we will represent it by means of an example. In the diagram below, you can see a typical shuffle mechanism in Spark, where two Mappers are dividing its data into four parts and four Reducers are fetching their own piece of data from them.

Graph representing a typical Spark shuffle
A typical Spark shuffle

In Spark SQL, the shuffle partition number is the number of partitions that are used when shuffling the data for wide transformations such as joins or aggregations. Wide transformations occur whenever information is needed from other partitions to achieve the desired output. So a good way to relate this information back to the example above is to think about the number of shuffle partitions as the number of reducers above, which is 4.

By default, this number is set at 200 and can be adjusted by changing the configuration parameter spark.sql.shuffle.partitions. This method of handling shuffle partitions has several problems:

  • if there is significant data skew, some tasks might take too long to run than others causing poor resource utilization and long stage execution times;
  • too many partitions on small data will result in scheduling overhead because there are many reduce tasks. Reducers will write small files which will cause network congestion when trying to read the small files later;
  • too much data on few partitions increases the load per executor, lowers parallelism, reduce tasks might have to process more data, data might spill to disk as memory might not be able to hold more data, OOMs and GC problems can arise;
  • shuffles will not be optimal because each shuffle corresponds to different sizes of data therefore the optimal number of partitions will not be optimal as well.

You got the picture, now let’s see the solution. Drum rolls, please…..

Solution — dynamically coalescing shuffle partitions

Dynamically coalescing shuffle partitions is a logical optimization that aims to minimize the number of Reducer tasks when performing the shuffle. To explain the mechanics of the algorithm, let’s re-do the graph from the previous section and add partition size into the Reducers to better showcase the solution:

A graph representation of the spark shuffle mechanism but with partition sizes
Partition sizes for a shuffle

In this example, we can see that the data size for some of the partitions is quite small compared to the others, namely, the partitions of 3 MB and 1 MB.

To uniformize the partition sizes, the dynamic shuffle partitions coalesce algorithm assumes the following:

  1. The map-side of the shuffle should produce an approximate number of output bytes for each partition. Each Mapper should output the volume of each partition, which happens in this case at the end of Stage 0;
  2. The minimal number of partitions that should be created must be computed. This number is based on the configuration spark.sql.adaptive.coalescePartitions.minPartitionNum.If this configuration is not set then to avoid performance issues, the minimal number of partitions will be the same as the default parallelism;
  3. Different partitions with the same index will be read in the same task and that all shuffles have the same number of partitions. To determine the number of coalesced partitions, a target size for a coalesced is set via the spark.sql.adaptive.advisoryPartitionSizeInBytes and all size statistics of shuffled partitions are collected;
  4. Same indexed partitions coalesce into a single partition until the act of adding new shuffle partitions causes the combined size of the coalesced partition to be equal or greater than the target size.

Now let’s take a look at the actual steps of the algorithm and code. It is generally divided into two sections:

  1. Control-flow verifications
  2. Algorithm execution steps

A side note here: please remember that the steps provided below are a high-level view of the execution, as such, there could be missing steps, so you should always check the source code for a complete picture.

  • Control-flow verification #1 → Checks whether the adaptive query execution is true;
  • Control-flow verification #2 → Decide if AQE should be applied to the query plan based on the return of the function shouldApplyAQE:

In other words, apply AQE whenever one condition is true:

  1. spark.sql.adaptive.forceApply configuration is enabled;
  2. the input query is a sub-query (if this happens, then AQE is already being applied on the main query);
  3. the query contains a sub-query;
  4. the query contains Exchanges;
  5. the query might need to add exchanges in the future (this is achieved by checking the SparkPlan.requiredChildDistribution).
  • Control-flow verification #3 → Decide if the query plan supports an adaptive execution, by calling supportAdaptive. The function returns true if the plan is not a streaming dataset or the subquery does not contain a dynamic partition pruning expression (DPP) waiting to be resolved. See more about DPP here.
  • Control-flow verification #4 → Check whether the configuration has been enabled for coalescing shuffle partitions and that all leaf nodes are query stages:
  • Control-flow verification #5 → Check if all the exchanges present in the plan are not introduced by a repartition statement, as shuffle exchanges introduced by it don’t allow for partition manipulation:
  • Control-flow verification #6 → Check if the input RDD has at least one partition and pre-shuffle partition numbers are the same (it is not a result of SortMergeJoin or a union of fully aggregated data):

Once all control-flow verifications are met, the coalesce algorithm is run. The strategy for the algorithm is as follows:

  • Algorithm step #1 → Determine the number of coalesced partitions. To determine this, a target size for a coalesced partition is set by the configurationspark.sql.adaptive.advisoryPartitionSizeInBytes;
  • Algorithm step #2 → Collect the statistics of all shuffle partitions;
  • Algorithm step #3 → Loop through partitions and their statistics by continuous indices. This basically means that partitions from different shuffles will be read at the same time because the loop is determined by a partition index. See assumptions of the algorithm at the beginning of this section;
  • Algorithm step #4 → Verify if the combined size of same-index partitions is bigger than the target size of the post-shuffle partition;
  • Algorithm step #5 → If that is the case, then produce a CoalescedPartitionSpec ;
  • Algorithm step #6 → If that is not the case, then continue looping partitions until the targetted size has been reached:

To better visualize the algorithm, I’ve prepared a representation of it in the diagram below. For this example, assume that we set the target to 70 MB:

A high level view of the algorithm’s mechanics
A visual representation of the algorithm

Enabling the optimization

Now that we understand how the logical optimization of dynamically coalescing shuffle partitions works at a high level, it is time to enable it. To enable this optimization you have to configure 2 mandatory configuration properties and 3 optional configuration properties:

  1. The first (and a rather obvious one too) is to set the spark.sql.adaptive.enabled property to trueto allow for Adaptive Query Executions optimizations, since this feature is turned off by default at Spark 3.0. This is mandatory for the optimization to be run;
  2. The second one (also obvious) is to enable the coalesce optimization itself, so set spark.sql.adaptive.coalescePartitions.enabled to true . This is mandatory for the optimization to be run as well.
  3. Now optionally you can setspark.sql.adaptive.coalescePartitions.initialPartitionNum to the desired number of shuffle partitions before coalescing starts. It is set by default to the same numberspark.sq.shuffle.partitions. This number is only enforced when both of the above conditions are satisfied;
  4. Optionally you can set spark.sql.adaptive.coalescePartitions.minPartitionNum to determine the minimal number of shuffle partitions to create after coalescing happens. If not set, the default value is the default parallelism of the Spark cluster. This number is only enforced when both condition 1 and 2 are satisfied;
  5. Optionally you can set spark.sql.adaptive.advisoryPartitionSizeInBytesas the advisory size in bytes of the shuffle partition during the optimization. This will be the target size of the coalesced partitions (see above in the explanation of the algorithm). Also, you guessed it, this number is only enforced when both conditions 1 and 2 are satisfied.

References and final remarks

If you managed to read this article until the end, thank you so much for your time and I really hope it helped to clarify the shuffle partition optimization and adaptive query execution in the overall Spark architecture. If you feel that something is incorrect or it can be improved, please leave a comment below. If you liked the article, please clap it away!!!

Finally, I’d like to mention a few really good articles that you should definitely give a look at if you want to understand more about the internals of this optimization and AQE in general.

The links are below:

--

--