Spark 2.x to spark 3.0 — Adaptive Query Execution — Part1

Wassim Almaaoui
Data Engineering For All
8 min readJun 30, 2020

Switching Join Strategy, by Radhwane Chebaane and Wassim Almaaoui

Recently Apache Spark community has released Spark 3.0 which holds many useful new features and significant performance improvements. There is already a wide range of enterprises and developers which are using Spark extensively for all data processing needs. They all will be probably facing the same question, does it worth upgrading from Spark 2 to Spark 3?

Spark community claims that “Spark 3.0 is roughly two times faster than Spark 2.4” in the TPC-DS 30TB benchmark.

from a Databricks blog

But what does this mean for my use case?

In this series of articles, we will try to explain some main changes from spark 2 to Spark 3. We will focus on performance improvements by providing simple use cases easy to reproduce and measuring impact on processing time.

One of the major new features which has been initially developed by Intel and integrated in spark 3.0 is Adaptive Query Execution (AQE), this covers three important optimisations:

  • Switch Join Strategy (the subject of this article)
  • Data skew (will be discussed in next articles)
  • Dynamic partition number reducing post shuffle (will be discussed in next articles)

We will cover also some other awesome new features, apart from AQE, in the next articles.

Adaptive Query Execution

AQE (Adaptive Query Execution) must be activated in spark config ‘spark.sql.adaptive.enabled’. It enables spark to change its initially created execution plan (usually based on the performed workflow and the datasets metadata) in the middle of the job execution to optimize it further.

These changes can occur only in multi-stage jobs. Spark can use information learned from finished stages to optimize the upcoming ones. This is not possible in Spark 2 since the execution plan is frozen once the job has started.

Spark Join Strategies

Based on the metadata and some other characteristics of the joined datasets, spark will choose one of three possible algorithms.

  • Broadcast Join: Easier to run on a cluster. Spark can “broadcast” the smallest DataFrame by sending all its data to all workers in the cluster. Once the DataFrame is broadcasted, Spark can perform a join without shuffling (moving the rows with the same key to the same executor) any of the data in the large DataFrame. However the small dataset should fit in memory and if you force spark to broadcast a big dataset, this may make the executors crash. Here is a more detailed explanation.
  • Sort Merge Join: Default mode since Spark 2.3, used when Spark thinks that broadcast is not possible. This is the most scalable algorithm. It consists in shuffling the two datasets, then sorting each partition and performing the join. Here is a more detailed explanation.
  • Shuffle Hash Join: This is used when Sort Merge Join is disabled or if the join columns doesn’t respect some specific conditions. And since it creates a hash table of the smallest dataset, the former should not be too big. In few scenarios Shuffle Hash join is faster than Sort Merge Join but in most of times the latter is a better choice due to its performance and scalability. Here is a more detailed explanation.

Let’s check this in a concrete example of simple join in Spark 2 and 3 and see what brings up the new AQE .

Concrete Example

Input Datasets

  • Clients[id, inscription_date]: relatively small dataset 17MB, stored in csv format.
  • Sales[client_id, price]: to get fast feedbacks I generated it with a size of 170MB, but in reality it could be much bigger, stored in csv format as well.

Both datasets are available here.

This is a common data modeling known as star schema with fact tables (Sales) and dimension tables (Clients). Usually a fact table is joined with many dimension tables to enrich it, hence the importance of this new feature in Spark 3.0.

Spark 2, Joining the two datasets

Note that since spark has lazy evaluation, we write data (an action) at the end to trigger the execution of the workflow. Be Careful with the choice of action for testing/benchmarking since for some of them (count, show, take …) spark do some optimisations and don’t read all data.

The previous job gives the following execution plan

simple join between sales and clients spark 2
  • The first two steps are just reading the two datasets.
  • Spark adds a filter on isNotNull on inner join keys to optimize the execution.
  • The Project is just to keep only the select columns in the query (here we keep them all).
  • The Exchange represents the shuffle needed for the chosen join strategy SortMergeJoin which is performed just after.

As expected Spark chooses the SortMergeJoin. By default spark will use Broadcast Join only if one of the datasets is < 10Mb. This default behaviour is configurable via the parameter `spark.sql.autoBroadcastJoinThreshold`.

Spark 2 uses an estimation of the size dataset to decide on the join mode. Let’s see what was his estimation for clients dataset:

What happens if I join on a subset of the initial dataset?

I will filter on recently subscribed clients only (inscription_date > “2020–06–15”), the resulting dataset should be smaller than 10Mb. Let’s see what does the spark 2 estimator think about this

The estimated size didn’t change after the filter, this doesn’t look reassuring, let’s run the job anyway:

Execution Plan

join with recent clients only, spark 2

Since Spark doesn’t know about the cardinality of each value of my filtering column, he couldn’t adapt his estimation which was imprecise (17 Mb estimated vs 3.8 Mb in reality) and actually used Sort Merge Join instead of Broadcast Join. So he shuffled both dataset and we missed an opportunity to go much faster.

Here comes in the power of Spark 3’s AQE, who on the contrary of spark 2, uses real stats to readapt the initial planned execution plan.

=> the whole job took 12 seconds.

Join on a filtered dataset in spark 3.0

I run the same code on Spark 3.0 after enabling AQE which is false by default.

The initially (before running the job) execution plan based on the dataset estimation shows that spark 3 has planned to do a SortMergeJoin also. Note that when AQE is enabled, spark shows that the plan is not final.

initial execution plan

After running the job we can see that thanks to AQE, The Broadcast Hash Join has been finally chosen and Spark re-adapted the execution plan:

join with recent clients only, spark 3

=> the whole job took 7 seconds.

This run was 42% faster than the one on Spark 2.4 using Sort Merge Join (12s vs 7s). We saw this significant performance gain using just a dataset of 170MB, avoiding a shuffle on a bigger dataset will make the gain much greater.

However since the join switch is done once the two datasets have been written to local disk to be shuffled (shuffle first step), this is not as efficient as planning a broadcast from the beginning because the latter would avoid doing this shuffle first step. From Spark official documentation:

AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if spark.sql.adaptive.localShuffleReader.enabled is true — default value in Spark 3.0)

I tried the join with a broadcast hint

=> the whole job job took 4 seconds. So its even faster!

If you want to try all this yourself and go further, here are the notebooks in a ready to use sandbox:

https://github.com/radcheb/spark-hands-on/tree/master

Conclusion

Using Broadcast Join, whenever is possible, can strongly improve your job performance.

Unfortunately it’s not obvious for Spark to have a good estimation of the joined dataset sizes, especially when you apply many transformations on them before the join (filtering, aggregation, enriching, other prior joining, etc).

Even if spark gives the possibility for users to specify the join strategy, this is heavy to maintain since you will have to do some extra effort to beforehand estimate each of the used datasets in all joins of all your jobs. You will need to update your estimation for each change in the job or data.. Yes this becomes quickly not funny.

Fortunately Spark 3.0, when performing a join, uses real dataset sizes (not just estimated ones) thus he will, hopefully, choose the most suitable strategy each time. The code remaining exactly the same, you will need just to change a single config.

In the next articles, we will speak about other advantages of AQE. So stay tuned!

by Radhwane Chebaane and Wassim Almaaoui

Bibliography

--

--