Spark Join Strategies

Mani Shankar S
5 min readSep 29, 2021

--

One of the very frequent transformations in Spark SQL is joining two DataFrames.

Spark 3.0 provides a flexible way to choose a specific algorithm using strategy hints

dfA.join(dfB.hint(algorithm), join_condition)

algorithm argument can be one of the following: broadcast, shuffle_hash, shuffle_merge.

JoinSelection strategy

Spark decides what algorithm will be used for joining the data in the phase of physical planning, where each node in the logical plan has to be converted to one or more operators in the physical plan using so-called strategies. The strategy responsible for planning the join is called JoinSelection. Among the most important variables that are used to make the choice belong:

• the hint

• the joining condition (whether or not it is equi-join)

• the join type (inner, left, full outer, …)

  • the estimated size of the data at the moment of the join

BroadcastHashJoin

BroadcastHashJoin (BHJ) is the preferred algorithm if one side of the join is small enough (in terms of bytes). In that case, the dataset can be broadcasted (send over) to each executor. This has the advantage that the other side of the join doesn’t require any shuffle and it will be beneficial especially if this other side is very large, so not doing the shuffle will bring notable speed-up as compared to other algorithms that would have to do the shuffle.

Spark will choose this algorithm if one side of the join is smaller than the autoBroadcastJoinThreshold, which is 10MB as default.

can be increased by changing the internal configuration. For example, to increase it to 100MB, you can just call

spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 100 * 1024 * 1024)

The optimal value will depend on the resources on your cluster. Broadcasting a big size can lead to OoM error or to a broadcast timeout. The timeout is related to another configuration that defines a time limit by which the data must be broadcasted and if it takes longer, it will fail with an error. The default value of this setting is 5 minutes and it can be changed as follows

spark.conf.set(“spark.sql.broadcastTimeout”, time_in_sec)

Besides the reason that the data might be large, there is also another reason why the broadcast may take too long. Imagine a situation like this

dfA = spark.table(…)

dfB = (

data

.withColumn(“x”, udf_call())

.groupBy(“id”).sum(“x”)

)

dfA.join(dfB.hint(“broadcast”), “id”)

In this query we join two DataFrames, where the second dfB is a result of some expensive transformations, there is called a user-defined function (UDF) and then the data is aggregated.

UDF (or any other transformation before the actual aggregation) takes to long to compute so the query will fail due to the broadcast timeout.

dfA = spark.table(…)

dfB = (

data

.withColumn(“x”, udf_call())

.groupBy(“id”).sum(“x”)

).cache()

dfB.count()

dfA.join(dfB.hint(“broadcast”), “id”)

Query will be executed in three jobs. The first job will be triggered by the count action and it will compute the aggregation and store the result in memory (in the caching layer). The second job will be responsible for broadcasting this result to each executor and this time it will not fail on the timeout because the data will be already computed and taken from the memory so it will run fast. Finally, the last job will do the actual join.

SortMergeJoin (SMJ)

If neither of the DataFrames can be broadcasted, Spark will plan the join with SMJ if there is an equi-condition and the joining keys are sortable (which is the case in most standard situations).

SMJ requires both sides of the join to have correct partitioning and order and in the general case this will be ensured by shuffle and sort in both branches of the join, so the typical physical plan looks like this

There is an Exchange and Sort operator in each branch of the plan and they make sure that the data is partitioned and sorted correctly to do the final merge.

ShuffledHashJoin (SHJ)

If you don’t call it by a hint, you will not see it very often in the query plan. The reason behind that is an internal configuration setting spark.sql.join.preferSortMergeJoin which is set to True as default.

The reason why is SMJ preferred by default is that it is more robust with respect to OoM errors. In the case of SHJ, if one partition doesn’t fit in memory, the job will fail, however, in the case of SMJ, Spark will just spill data on disk, which will slow down the execution but it will keep running.

If you switch the preferSortMergeJoin setting to False, it will choose the SHJ only if one side of the join is at least three times smaller then the other side and if the average size of each partition is smaller than the autoBroadcastJoinThreshold (used also for BHJ). This is to avoid the OoM error, which can however still occur because it checks only the average size, so if the data is highly skewed and one partition is very large, so it doesn’t fit in memory, it can still fail.

Similarly to SMJ, SHJ also requires the data to be partitioned correctly so in general it will introduce a shuffle in both branches of the join. However, as opposed to SMJ, it doesn’t require the data to be sorted, which is actually also a quite expensive operation and because of that, it has the potential to be faster than SMJ.

BroadcastNestedLoopJoin (BNLJ)

All the previous three algorithms require an equi-condition in the join. If there is no equi-condition, Spark has to use BroadcastNestedLoopJoin (BNLJ) or cartesian product (CPJ). BNLJ will be chosen if one side can be broadcasted similarly as in the case of BHJ. Both BNLJ and CPJ are rather slow algorithms and are encouraged to be avoided by providing an equi-condition if it is possible.

--

--