SPARK 3.0 : Enhancements and Optimization

Santosh Kumar
Xebia Engineering Blog
5 min readDec 11, 2020

Spark 3.0 has lots of interesting changes/enhancements in various areas. Some significant changes have been done on the performance side.

This blog talks about some of those performance changes in detail.

1) Adaptive Query Execution (AQE): Speeding up Spark SQL at Runtime

History of Optimizers:

Spark 1.x — We had some set of rules that were used to optimize the query.

Spark 2.x — Along with the above set of rules, Cost was also taken in consideration. Optimizer looked at the files sizes and data stats stored in Hive metastore to optimize (pre collected stats).

Spark3.x — In addition to spark 2.x optimizer , Runtime stats is also taken in account. Optimizer looks at the runtime stats of data when it’s being processed and query is rewritten based on the runtime stats.

Adaptive query execution means optimizing and adjusting the query based on run time statistics collected in the process of query execution, It involves adaptive planning. Introduced in Spark 1.6, it has been continuously enhanced till date with spark 3.0.

How AQE works:

As we know that shuffle or broadcast exchanges breaks down the query into query stages and intermediate results are stored. These query stage boundaries are optimal for runtime optimization.

  1. Run leaf stages i.e. stages which do not depend upon input from another stages.
  2. Optimize when any stages completes as new stats will be available.
  3. Run more stages with dependency requirement satisfied.
  4. Repeat step 2 and 3 until no more stages to run.
AQE process flow diagram

While we are still executing code the data stats are sent back so that logical plan can be revised, and changes can be done for the remaining queries. Changes are optimized at different levels:

  • Switching join strategies
  • Coalescing shuffle partitions
  • Optimizing skew joins

Let’s look at each of these optimizations in detail:

Join type change:

It has always been a tough task to decide which one to choose either the Sort merge or a broadcast join. We generally follow the below golden rule: Sort-merge join is good if both the datasets are large and broadcast join is good if one of the dataset is smaller than the broadcast threshold limit that we have set.

In Spark 2.x , converting sort merge join to broadcast join we had to provide the broadcast hint and set the config to use spark.sql.autoBroadcastJoinThreshold based on our estimate of data size. It was very difficult to tune as workloads could change over time and it could also lead to Out Of Memory error.

In Spark 3.0, due to adaptive query execution spark can alter the logical plan to do a broadcast join based on the data stats collected at runtime.

For eg.: select * from a JOIN b on a.key=b.key where b.value like ‘%ab%’

Spark will use sort merge join basis table size estimates. It will run two stages in parallel(leaf stages) in which sort operation is run in both stages. Now for AQE it will base itself on runtime stats which suggests that size of table is below the threshold given for broadcast join. Now after optimizing, It will use broadcast join and sort operation will be taken away and a new broadcast operation is introduced for next set of data.

Shuffle Partitions :Partitions:

Default value for shuffle partition is 200. This is a very tricky number to set as it impacts performance to a great extent. If the number is set to a very small value it leads to disk spills and also overhead in terms of Garbage collection. If the number is set to a very large value it leads to increase in I/O. In 3.0 if spark realizes that the data partitions are too many it uses coalesce to optimize the number of partitions and as a result the performance is optimized.

Data Skewness :Skewness:

Data skewness is one of the most widely known issue that impacts performance. It is caused by uneven partitioning of the data which leads to Out Of Memory error, low resource utilization on few nodes, long running jobs etc. Spark 3.0 analyses the data size during the shuffle phase and splits the data into smaller sub partitions if needed.

How do we enable AQE:

By default, AQE is disabled, to enable AQE we need to set spark.sql.adaptive.enabled to true. AQE applies if the query meets the below criteria:

a) It is not a streaming query.

b) It contains at least one exchange (usually when there’s a join, aggregate or window operator) or one subquery.

2) Dynamic Partition Pruning :Pruning: Avoid scanning irrelevant data

In Spark 2.x we had static partition pruning which used to push down filter for better performance.

E.g.: Select * from customer where state_id = 10

Flow: data -> scan -> filter

Filter pushdown will result in: Flow: data -> filter -> scan

This works fine for simple queries and gives huge performance benefit. In case of joins where we are operating on two data sets one of which is very big and does not have the filter column (sales), static pruning cannot be applied. This issue can be resolved by pre-joining the data sets or de-normalizing them.

Select * from sales JOIN Date where Date.day_of_week = ‘Mon’

With spark 3.0 dynamic pruning: Filter pushdown is used, and the date table is filtered by date of week. The respective data (If data is less than the broadcast join threshold) are is broadcasted to sales table and sales table is filtered based on this.

Flow: Filter dimension — broadcast join — filter fact — scan fact — broadcast join

3) Join Hints :Hints: More hints can be included

Spark 3.0 extends broadcast join hint by implementing following join strategies:

Sort merge

- Join Hints for shuffle sort merge join

SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

Shuffle hash

- Join Hints for shuffle hash join

SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

- When different join strategy hints are specified on both sides of a join, Spark prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint over the SHUFFLE_REPLICATE_NL hint. Spark will issue Warning in the following example org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge) is overridden by another hint and will not take effect.

SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

--

--