Spark SQL: Adaptive Query Execution

Altering the physical execution plan at runtime.

Enrique Rebollo García
The Startup
9 min readJul 2, 2020

--

Level of parallelism and selection of the right join strategy have shown to be the key factors when it comes to complex query performance in large clusters.

Even though Spark 2.x already implemented a few parameters to somehow tweak its related behaviour, having to manually tune them was not practical in many production scenarios. Besides, a static configuration may not be the right one for all stages of a job, as usually stages located closer to the final output have to process much less data than others, because of previous filters or aggregations along the whole pipeline.

Adaptive Query Execution (SPARK-31412) is a new enhancement included in Spark 3 (announced by Databricks just a few days ago) that radically changes this mindset. The framework is now responsible for somehow improving these decisions at runtime and altering the physical execution plan dynamically when it is convenient.

Modern distributed systems are designed upon the principle of decoupling compute and storage layers, that is storing persistent data on remote storage and making use of local storage only for transient data. Spark was built following this idea (and recently Hadoop 3 has adopted it). While this design brings benefits in terms of scalability and availability (as well as cost reductions), it also hinders the query optimization process, as unpredictable data arrivals make cost calculation less reliable.

Fig. 1 — Spark SQL engine.

Fig. 1 depicts the internals of Spark SQL engine. Very basically, a logical plan of operations (coming from the parsing a SQL sentence or applying a lineage of transformations over a DataFrame or a Dataset) is (1) resolved against the catalog, (2) optimized on a rule-basis (constant folding, predicate and projection pushdowns…) and (3) translated into a physical plan of Spark operators (actually into multiple plans and then selected on a cost basis).

For the sake of the explanation let’s take this sample query that read two tables, filter one of them, join the results on a common key, and finally performs an aggregation on another field.

select a1, avg(b1)
from A join B on A.pk = B.pk
where b1 <= 10000
group by a1

The SQL parser will analyze this sentence and generate a SQL logical plan, as shown in Fig. 2.

Fig. 2 — SQL logical plan.

Physical plan for this query is shown in Fig. 3, where dotted grey lines depict Whole-Stage Code Generation operations. Since Volcano stages were dropped in Spark 2.x, WSGC operations collapse a chain of narrow transformations into a single Java code, avoiding virtual calls and other issues actually out of the scope of this reading. Observe the SortMergeJoin operator is not fused with any of its children.

Fig. 3 — Spark physical plan.

The downside of this implementation is that once the physical execution plan is determined it cannot be altered at runtime. The plan is translated into a Direct Acyclic Graph (where nodes are RDDs and edges are dependencies) and this representation is submitted to the DAGScheduler. It breaks the graph down at shuffle boundaries and passes a TaskSet to a TaskScheduler, which takes care of execution on physical resources.

When adaptive execution is enabled, stages get earlier split by breaking down the logical plan into independent subgraphs called query stages.

Query stages materialize their processing before further proceeding through down-stream stages in the execution plan. This allows to individually submit map stages, collect their MapOutputStatistics objects, and analyze them for further tuning of subsequent steps .

Two types of query stages can be found in AQE plans:

  • Shuffle query stages, which materialize their output to shuffle files.
  • Broadcast query stages, which materialize their output to an array in Driver memory.

To bear with this new feature, DAGScheduler now supports the submission of a single map stage. Spark SQL engine also include modifications at planning and execution phases.

At physical planning, two new operation nodes are introduced wherever a ShuffleExchange is found in the original execution plan.

  • QueryStage is the root node of a stage, responsible for runtime decisions.
  • QueryStageInput is the leaf node of a stage, whose main purpose is to provide the result of a child stage to its father after the physical plan has been updated.

Resulting adaptive execution plan, with added QueryStage and QueryStageInput nodes, is depicted in Fig. 4.

Fig. 4 — AQE physical plan including QueryStage and QueryStageInput nodes.

During the execution phase, any QueryStage on the tree has a reference to its child stages and executes them recursively. After all children of a QueryStage are completed, runtime shuffle-write statistics are collected and used for further refinement. Spark then re-launches logical optimization and physical planning phases, and dynamically updates the query plan according to this fresh information.

Counting on these new capabilities, it was possible to add new rules to further improve the execution plan at runtime. AQE is disable by default.

spark.sql.adaptive.enabled = false

1. Parallelism on Reducers

The level of parallelism after shuffling data for join or aggregation operations has shown to be critical when it comes to complex query performance.

  • If the number of reduce tasks is too low, partitions may go too large to fit in executor memory, forcing to spill data on disk and thus requiring more I/O operations.
  • On the other hand, more scheduling overhead is introduced as the number of reducers get larger, which will start degrading performance at a certain point.

Despite this configuration is critical, up to Spark 2.x the user only counted on a (general) post-shuffle partitions parameter to somehow optimize this behaviour. Reduce operations also accept a parameter to specify the number of target partitions, but it has to be set at compile time with little knowledge about the size of your data (even harder for intermediate steps).

CoallesceShufflePartitions rule enables Spark to dynamically lower the number of post-shuffle partitions by eventually merging adjacent small ones. By setting the optimal figure accordingly to runtime data sizes, load balancing improves and the number of requested I/O operations also decreases.

Fig. 5 — Partition merging after shuffling.

Fig. 5 shows an example of this. Two map tasks output shuffle files in order to bucket the data into 6 partitions. After completion of all mappers and according to configuration parameters, it is decided to decrease to 3 the number of reducers. Observe tasks 2 and 3 work on multiple adjacent partitions. BlockManager is now enabled to serve a block representing multiple partitions, allowing to make a single request.

Usually, the number of post-shuffle partitions is set at compile time to a relatively large number and AQE takes care of coalescing them at runtime if convenient. The behaviour of these rule is driven by these configuration parameters.

spark.sql.shuffle.partitions = 200  (Spark 2.x)
spark.sql.adaptive.coalescePartitions.enabled = true
spark.sql.adaptive.coalescePartitions.minPartitionNum
spark.sql.adaptive.coalescePartitions.initialPartitionNum = 200
spark.sql.adaptive.advisoryPartitionSizeInBytes = 64MB

2. Join Strategy

Another key decision affecting the performance of complex queries is the selection of a physical join strategy. Actually, Sparks implements three physical join operators:

  • Two operators first shuffle the data according to the join keys and then apply a join algorithm. ShuffleHashJoin performs a hash join, that is create a hash table (build) using content from one side and then scan the other one (probe). SortMergeJoin sorts both sides by the join attribute and merge the data by iterating over the elements and finding rows with the same key. Traffic exchange due to shuffle-join operations is shown in Fig. 6. Every reducer have to read one file from every mapper which implies transferring a lot of data over the network when tasks happen to run in different workers.
Fig. 6— Shuffle exchange prior to a SortMergeJoin operator.
  • BroadcastHashJoin is preferable over the previous strategies when one of the sides is reasonably small to be broadcasted to and stored in each executor. As shown in Fig. 7, this join implementation avoids to exchange data across nodes for co-partitioning. The number of reduce tasks equals that of the mappers, having to read each reducer the output from a single mapper. Usually, both tasks are scheduled on the same working node to avoid unnecessary network transfers.
Fig. 7 — Shuffle exchange prior to a BroadcastHashJoin operator.

But again, in Spark 2.x the join implementation is selected before execution, when the data size for intermediate operators is unknown. Therefore based on estimations, this decision might not be the optimal one in every case.

This rule enables Spark to avoid shuffle by dynamically replacing a Sort-Merge join for a Broadcast-Hash join implementation, when size conditions are met on any of its sides. OptimizeLocalShuffleReader rule may further optimize the plan by replacing regular shuffle reads for localized ones in order to reduce network usage (see Figs. 6,7), depending on configuration.

Fig. 8 — Physical execution plan with BroadcastHashJoin operation.

Final execution plan after applying this rule is shown in Fig. 8. As the smaller side has to be sent to every executor in advance, the query has been re-structured into separate jobs. Observe the ShuffleExchange has been replaced by a BroadcastExchange and the BroacastHashJoin operator is now fused with the probe side into one single QueryStage.

Even though this runtime change may not be as efficient as originally planning a Broadcast-Hash join, its cost is lower than that of sorting both sides. The behaviour of these rule is driven by these configuration parameters.

spark.sql.autoBroadcastJoinThreshold = 10MB  (Spark 2.x)
spark.sql.adaptive.localShuffleReader.enabled

3. Skewed Joins

Another critical aspect in query performance is data distribution across partitions, which defines the size of the scheduled tasks. In case of unevenly distributed shards, tasks operating on a heavier amount of data will slow down the whole stage and block down-stream stages.

This inconvenience was usually tackled by increasing the parallelism level or somehow re-engineering the join keys to increase cardinality.

OptimizeSkewedJoin rule enables Spark to achieve better load balancing by following a simple rule. Being parameters F the skewed factor, S the skewed size and R the skewed row count, a partition is considered skewed iff:

  • Its size is larger than S and larger than the median partition size multiplied by F.
  • Its row count is langer than R and larger than the median partition row count multiplied by F.

When a partition is deemed skewed based on map-side statistics, multiple reducer tasks are scheduled to operate on it. This way, each of these reducers pulls the output written by a single mapper and performs the join operation on this partition chunk (instead of just one single reducer fetching files from every mapper).

This schema is illustrated in Fig. 9, where partition 0 in table A is processed by two reducers to later union their outputs. OptimizeLocalShuffleReader rule may further optimize the physical plan.

Fig. 9 — Handling of a skewed join.

The behaviour of these rule is driven by these configuration parameters.

spark.sql.adaptive.skewJoin.enabled = true
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 10
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB
spark.sql.adaptive.localShuffleReader.enabled

Conclusion

Adaptive Query Execution is an enhancement enabling Spark 3 (officially released just a few days ago) to alter physical execution plans at runtime, which allows improvements on the physical implementation based on statistics collected after shuffle exchange operations.

Level of parallelism and join implementation strategy have been shown to be key factors for query performance on large clusters. According to this premise, new optimization rules have been built on top of the AQE feature:

  1. dynamically coalescing shuffle partitions by eventually merging small adjacent ones.

2. dynamically replacing a Sort-Merge join for a Broadcast-Hash join when size conditions are met on any side.

3. dynamically optimizing skew joins by eventually scheduling multiple reduce tasks to work on a single huge partition.

It is expected that future Spark releases will ship with additional optimization rules built on Adaptive Query Execution.

--

--