Simple queries in Spark Catalyst optimisation (2) join and aggregation

Back to my first Spark blog in August about simple query in Spark Catalyst, it’s time to write the part II now. \o/ This is about the join query and aggregation. The goal is to see how Spark implement these kind of operations in Catalyst engine.

I put the “join” and “aggregation” together, because they all need shuffle the data, but Spark generate the “Spark plan” with different strategies behind. All the executed strategies (JoinSelection, Aggregation …) could be found in SparkPlanner class. (If you don’t know what is SparkPlan means, you can check the part I https://medium.com/@wx.london.cun/simple-queries-in-spark-catalyst-optimisation-1-5797bb1945bc#.fbocfza09)

Join operation

Suppose we have this simple join query and the physic plan generated below (using Spark 2.0):

df.join(df2, Seq("key"), "inner")

This query is about a inner join between df and df2 on column “key”. Let’s look at the Spark plans generated. SortMergeJoin is standard plan for join operation by keys. If your joined dataset is small (spark.sql.autoBroadcastJoinThreshold, default value 10M), Spark will choice BroadcastHashJoin, and ShuffledHashJoin (much less occasionally). This implementation is in JoinSelection object.

SortMergeJoin has two sub branches here, corresponds the two dataset df and df1, the both branch need do Sort, Exchange and Scan plan before the final merge. Exchange plan is for shuffle the data, and Scan plan is for project your dataset with required columns to unsafe row.

The implementation of SortMergeJoin (class SortMergeJoinExec) is quiet long (1k lines), because it need deal with all the different type of join. The principal is create a new RowIterator by manipulating the row iterators of the two below data branches. Each time, it move forward one raw in the left branch iterator first, and try to find the rows with same keys in right branch, all the rows found for this key in the right will be saved into a buffer. As the data in two branches are sorted, we only need move forward the iterator to explore the whole dataset. All the data structures here are Iterator, that mean all the reading data steps are lazy, nothing will be save in memory. So this plan don’t consume any Spark execution memory.

Sort (SortExec class) plan perform a sorting on the data. The execution of this plan is quiet simple, sorting all data with UnsafeExternalRowSorter and return a sorted data iterator. UnsafeExternalRowSorter is a wrapper for UnsafeExternalSorter which also use UnsafeInMemorySorter to do sorting in memory. Bad news is that all these classes are heavy memory consumers and could trigger the spill.

Exchange (ShuffleExchange class) plan is actually using a HashPartitioning here with 5 partitions. The execution of this plan is create a new ShuffledRowRDD which is a specialised version of origin ShuffledRDD, but it is optimised for shuffling rows instead of Java key-value pairs. (copy from Java doc)

Last one is the leaf level Spark plan RDDScanExec. This plan has the RDD as input parameter, the execution is just calling mapPartitionsInternal() of rdd, and apply an projection with given columns (called sequence of expression in SparkSQL) to each row which return a unSafeRow. UnSafeRow is a record point to the raw memory where store your row data. All the data in one row will be write to a single array of bytes, and store directly in the memory by using sun.misc.Unsafe library. The advantage of this is to avoid the heavy GC of all Java object created for your rows. I plan to write a separated blog to talk about all the detail of this part.

After all, the most interesting part is to see how Spark Catalyst add shuffle steps for the SQL query. The code is in EnsureRequirements class ( ensureDistributionAndOrdering method). It’s a Rule to apply any SparkPlan. Before apply this ER rule, Spark only generate the SortMergeJoin with two Scan plan for each branch dataset. ER rule actually added these Exchange and Sort plan for each branch. How does this happen?

This process is very complicated, Spark Catalyst engine added a lot of optimisation here. However with our simplest join query, it’s much easier to understand. Each SparkPlan class has to implement a requiredChildDistribution method to declare what kind of distribution status its child dataset need to be. For example, SortMergeJoin need both child dataset be ClusteredDistribution. That mean each child dataset need be clustered by some keys (join key). To make sure that the two children data set satisfy the distribution required, ER rule immediately wrapped an new Exchange plan for each child SparkPlan.

child = ShuffleExchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child)

The “child” above is the SparkPlan for one branch dataset. Base on ClusteredDistribution requirement, createPartitioning() always create the HashPartitioning. The defaultNumPreShufflePartitions is come from configuration with default value 200.

Then ER rule need check if the two created partitioning plan(HashPartitioning with different key) need to be compatible (in our case yes, because the two partitioning need be joined late) and guarantee each other (in our case yes, two partitioning’s number of partition is the same). In this case, we don’t need to do any additional shuffle for the two underneath branches dataset. The conception about Distribution, Partitioning and their relations are good documented on org.apache.spark.sql.catalyst.plans.physical.partitioning.scala.

The last part is to add Sort plan for the two underneath branches. Each SparkPlan could override the requiredChildOrdering() if it need a specific ordering for its children. For SortMergeJoin, it require the two children Spark plan to be sorted by their keys. As the outputOrder of HashPartitioning is Nil (of Seq[SortOrder]()), ER rule need to wrapped another Sort plan around the HashPartitioning. That become the final status of the SparkPlan above.

I have skip the part for ExchangeCoordinator in the ER rule, because it is disable by default (spark.sql.adaptive.enabled) and it’s complicated to explain … ExchangeCoordinator is something about to optimise your post shuffle partition based on your pre shuffle data size. Actually I like to try this feature with some production data.

Follow the whole process, I found the interpretation of a join query in Spark plan is quite straight forward. In the 90% of time, it interpret in the same way. The optimisation for multi stages has been treated carefully. But if I start cold a Spark application, and my data is already clustered by keys and sorted, how I can let Spark know to don’t repeat the not necessary Spark plan? Looks like, in all the condition, the same Spark plan will be generated for a join query. Anyway, at least now, we have a better understanding about how Spark manipulate our data behind a single join query…

Aggregation

For an example simple aggregation:

df.groupBy("a").agg(count($"b"))

The Spark plan is like this:

Let’s explore this Spark plan tree from the bottom. Scan plan project the columns in need from original rdd and load them to UnsafeRow. HashAggregate is essential to build a TungstenAggregationIterator to process the aggregation for a given partition. TAI operate directly on UnsafeRow, it should be faster and more efficient. Inside TAI, It use one instance of BytesToBytesMap to store the aggregation buffer, which need consume the Spark execution memory and could trigger spill to disk. After the aggregation by partition is done, Exchange plan shuffle the data by keys. In the end we need do another aggregation after data is shuffled.

Where is come from this HashAggregate plan? In the createAggregate() of AggUtils class, it could create the HashAggregateExec or SortAggregateExec Spark plan. If the aggregation in the query allow to use “hash way”, that mean the types of all attributes in aggregation buffer are mutable in UnsafeRow. Because the HashAggregate implementation need update the values in the buffer (it’s a UnsafeRow as well) directly. The mutable type for UnsafeRow are defined in the static variable UnsafeRow.mutableFieldType. In the most cast, Spark will use HashAggregate which is much faster than SortAggregate.

Part of huge optimisation work in HashAggregateExec (CodegenSupport) and TungstenAggregationIterator, from the higher level, Spark aggregate the rows first in one partition, then it could potentially shuffle much less data. After the shuffle by keys, the final aggregation is compute on the aggregated buffer row on each partition. This is why in the UserDefinedAggregateFunction interface, there are update() and merge() two methods to implement. In the old rdd fashion, we have been told not use groupeBy(), this problem doesn’t exist any more in SparkSQL world.

Summary

We have see how Spark Catalyst interpret a simple query to some SparkPlan steps which can be executed directly in the Spark computation engine. It’s clear that, compared to old rdd fashion, the improvement of performance is huge with Catalyst. Also with all the effort they did for shuffle process, this make Spark become one of the best big data processing framework.