How AQE Helps Optimize Shuffle Partition Number

Songkunjump
Agoda Engineering & Design
5 min readMay 23, 2022

Apache Spark is swiftly gaining traction as a versatile data processing system when it comes to big data tools. It is especially useful for companies who need to stream and analyze large volumes of data in real-time.

At Agoda, we use Apache spark to process many things, from feature stores for machine learning to financial reconciliation and reporting. Its distributed and scalable nature makes these processes easier.

However, it can also be challenging to optimize Spark’s performance without knowing our data or properly distributing it. Thus, we must ensure that our data is equally distributed and partitions align to the correct number.

More partition means that data is more distributed, and less partition means that data is less distributed. So to tune our Spark jobs, we need to find the perfect balance in partitions’ size since minor divisions increase GC time and cause overhead, while large partitions will decrease parallelism.

This article will discuss how we manually tune our shuffle partition size to speed up our join and how AQE (Adaptive Query Engine) helps in this process.

Data Set up

Figure 1: Data Set-Up

In the data set-up above, we have 900 million rows of data in both table_a and table_b, which create two folders each with around 4.4 GB worth of compressed data in the form of 100 equally sized parquet files.

Cluster Setup

Many sources recommend that the partition size be around 1 MB to 200 MB. Since we are working with compressed data, we will use 30 MB as our ballpark partition size.

With our ballpark partition size, a reasonable number of partitions would be 290 partitions (8800/30). Since we want the number of partitions to be around 2 to 4 times the number of cores, we will use about 100 cores to run this job.

Manually Find The Perfect Number of Partitions

Figure 2: Join Experiment
Figure 3: Experiment Result

From the experiment result above, we set different shuffle partitions sizes and see what size would join to be the fastest for 8.8 GB of data. As a result of this experiment, we found that using 200 shuffle partitions results in most performance join. In addition, changing the shuffle partition size within 50 to 10000 ranges does not affect the performance of the join that much.

However, once we go below or above that range, we can see a significant decrease in performance. With the experiment data, it is clear that if we have a job before this join that requires us to set the shuffle partition to 100000 partitions, we will need to set the shuffle partition back down to prevent a performance bottleneck. Alternatively, we will see whether using the AQE framework will alleviate the burden of manually bringing the shuffle partition back down from us.

Adaptive Query Execution

Figure 4: AQE Partition Coalescing

Adaptive Query Executor is a framework that helps optimize query plans at runtime by using the previous stage statistic. Enabling AQE will help us choose the join strategy, coalesce shuffle partitions, and optimize skew join.

We are interested in merging shuffle partitions ability in our case, which allows Spark to determine a reasonable number of partitions on the reducer side based on the map partition size statistic.

For example, suppose we have three partitions on the map side, and two of the partitions are quite small. Spark can combine both divisions as one partition on the reducer side, resulting in only two partitions on the reducer side.

Testing Adaptive Query Execution

Figure 5: AQE Coalescing Recommended Set-Up
Figure 6: AQE Coalescing Set-Up
Figure 7: Query Plan
Figure 8: AQE Experiment Result

According to the spark documentation, we follow all the recommended ways of setting up AQE. In addition, we choose 100000 as initialPartitionNum because, within a spark application, one job might require 100000 shuffle partitions, and the other might only need 100 shuffle partitions.

With this requirement, Spark can use map side partition size statistics to determine what should be the number of partitions on the reducer side. As a result of this experiment, we see that AQE can find a reasonable number of partitions (195 partitions) for our job at run time and give our job a good performance without changing the number of shuffle partitions in between jobs manually.

Conclusion

The key takeaway is that setting a reasonable number of shuffle partitions matters, and different jobs require a different number of shuffle partitions. To optimize your job correctly, you need to understand your data to tune the number of shuffle partitions correctly manually. Or you could rely on AQE to do that job for you at run time.

In this article, we only explored how AQE helps optimize the number of shuffle partitions at runtime when Spark transforms data without cache in between the transformation. However, AQE can also be used in instances when data is cached between transformations. The only drawback to this is that Spark might need to do extra shuffles if the outputPartitioning between join was able to match without AQE and not match after AQE.

Learn more about AQE in these helpful articles:

--

--