Spark Joins Tuning Part-1(Sort-Merge vs Broadcast)

Sivaprasad Mandapati
The Startup
Published in
4 min readFeb 7, 2021

Parallelization is Spark’s bread and butter. The back bone of Spark architecture is Data should be split into pieces(Partitions) and allocate each piece to an executor in cluster, So multiple executors can work on different pieces of data in parallel .

A single row level operations like Mapping, Filtering makes Spark’s job easy , but when it comes to multi-row level operation like joining, grouping , data must be shuffled first before doing actual operation . Shuffling is very costly operation . It hits all your resources in the cluster

Shuffling : All the nodes and executors should exchange the data across the network and re-arrange partitions in such a way that each node/executor should receive a specific key data.

In this article, I would like to discuss most common spark join types and their use cases with an example.

  1. Sort Merge
  2. Broadcast

Let’s first understand on high-level how spark performs above join methods in the backend and then explore with an example.

Sort-Merge :

By default , Spark uses this method while joining data frames. It’s two step process. First all executors should exchange data across network to sort and re-allocate sorted partitions. At the end of this stage , Each executor should have same key valued data on both data frame partitions so that executor can do merge operation. Marge is very quick thing.

Let’s examine this sort merge join with an example . Two data frames A and B have four key columns (1,2,3,4) and let’s say we have 2 node cluster

Sort Phase : As you can see, both A and B are sorted by Join key i.e. Key column and sorted data is split into 2 partitions . Each partition should have specific key data. There should not be any overlapping of Keys between partitions which is hole idea of shuffling

Assign sorted partitions to executors

Merge Phase : Merging sorted data by keys is very simple and quickest operation.

Broad Cast Join :

We have observed , Sort-Merge join requires full shuffling of both data sets via network which is heavy task for Spark . What if we can eliminate shuffling . How can we do it ? replicate the whole small table to all executors .

Sort-Merge vs Broadcast :

I have an example data set Sales Fact table and Products Dimension table . Sales Fact Table is very big in size and Products is quite simple .

Verdict : broadcast join is 4 times faster if one of the table is small and enough to fit in memory .

I love any law or theory with examples and proofs .Please find below code snippets and results

Sort-Merge Join(58 seconds)

Sort Merge Join took 58 seconds

Broad Cast Join (13 seconds) :

Enabled autoBroadcastJoinThreshold parameter to 10 MB (default) and added hint in SQL query explicitly . Hint is not required . Spark Catalyst Optimizer automatically does broadcasting a small table if it’s less than 10mb size. But I intentionally added hint to demonstrate

Is broadcasting always a good solution ?

Absolutely no. If you are joining two data sets both are very large broad casting any table would kill your spark cluster and fails your job.

Why ?

Under the hood the driver node should start replicating broadcasted table into one of the executors, Once it’s finished the executor writes it to another executor and it continues until all the executors gets the copy of broadcast table. As you already get an idea, This is hitting the process and memory of all nodes in cluster including driver node. Be cautious while doing broadcast join. Thumb rule one small table to fit into memory

Hope you enjoy !! In the next article I shall demonstrate how to handle data skewness which bit tricky and interesting one

P.s : Sort Merge join and Broadcast join diagrams shown in the this article are DATABRICKS courtesy

--

--

Sivaprasad Mandapati
The Startup

Azure ||Google Cloud Certified||AWS|| Big Data,Spark,ETL Frameworks,Informatica|| Database migration Specialist||Data Architect||Google Cloud Authorized Trainer