How to avoid BroadcastNestedJoin in Spark

Vikas Singh
Analytics Vidhya
Published in
3 min readMar 31, 2021

Joins are one of the crucial operations in Spark jobs. Spark supports the following types of joins

  1. Inner Join
  2. Left outer
  3. Right outer
  4. Full outer
  5. Left Anti
  6. Left Semi

In this article, we will explore, performance issue with Broadcast Nested Join and how to avoid it.

Each one has its use cases. Spark Cost-based Optimizer select the best join strategy based on various parameters, like:

  • Type of Join
  • Join conditions(equi, between etc.)
  • Size of Data
  • Hint
  • Spark configurations like of spark.sql.autoBroadcastJoinThreshold
  • And many more…

Based on the aforementioned parameters, Spark selects one of the join strategy listed below:

  • Broadcast Hash Join
  • Shuffle hash join
  • Shuffle sort-merge join
  • Broadcast nested loop join

For more information on it, please refer below link.

For most of the scenarios, spark selects the correct join strategy, but there is some scenario, where selected strategy is not best. We will discuss one of them in this article and ways to fix it.

For demonstration purposes, I will use the below dataset.

Employee and Bonus tables

Purpose: Calculate bonus for employees.

How: Join employee and bonus table based on min_salary≤salary ≤ max_salary

Expected Outcome: Calculate bonus in optimal time.

For better performance, as bonus table is small it should be eligible for Broadcast Hash Join.

First we will load the data into DataFrames and then join them using between function. Below is the code snippet for it.

Load employee and bonus table data

Below is the prepared physical plan by Spark, In this, it has used ‘BroadcastNestedLoopJoin’ Join strategy. Which is slow. We will understand this with the below example

Ex: In a real-time scenario, Let’s consider one partition has 100K lines, and broadcasted table has 20K records.

Ball part estimate: Optimize the size of the partition in Spark is 100 MB(approx). For 12 columns and 100K records, file size would be 104 MB )

So with the above calculation, for each partition, BroadcastNextedLoopJoin will iterate over 2,000,000,000 times. Which could lead to performance issues. Here we have considered that all partitions are evenly distributed(Maybe before this step, we have to perform other join or group by or window operations). Which is not the case all the time.

The same join strategy will be used, if we use a join condition similar like below:

Spark will use Broadcast Hash Join if we add one equal condition in join like below.

But how we will get this new column, which we can use in the joining condition and it does not impact the joining outcome.

Let us try to add default columns in both Dataframe and use it in the joining condition.

From the physical plan, it can be seen that Spark has ignored this new column in the query plan, which may be due to some internal optimization.

Let us try to construct a constant column with the help of existing columns.

We have used the salary column and multiply it with 0 and used the resultant column in joining. Now Spark is using BroadcastHashJoin.

What if we don’t have a numerical column in Data. We could create a dummy Dataframe with one record and cross join with both Dataframe.

In above code ,Spark has considered temp_clm in joining. which converted BroadcastNestedLoopJoin to BroadcastHashJoin.
As cross join is being performed with one record, its impact will be negligible.

We could use any approach depending on our convenience and force the spark to use BroadcastHash join, which would be faster in comparison to BroadcastNestedLoopJoin .

Please share your thought on this, Happy Learning.

“Once you stop learning, you start dying”
— Albert Einstein

--

--