Optimising different Apache Spark SQL Joins

Rupesh Malkar
Analytics Vidhya
Published in
4 min readMar 17, 2020

There are different types of joins in Spark SQL:

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Cartesian Join
  • Theta Join
  • One to many join

Shuffle Hash Join

A Shuffle hash join is the most basic type of join and its used MapReduce fundamentals

  • Map through two different data frames/tables
  • Use the field in the join condition as output key
  • Shuffle both dataset by output key
  • In the reduce phase, join the two dataset. Same key will be in same machine and sorted

Shuffle Hash Join Performance

Works best when:

  • Keys are evenly distributed
  • Adequate number of keys for parallelism

This essentially means that the data should not be skewed. If there are millions of record but very less number of unique keys, shuffle hash join’s performance will not be able to leverage parallelism and performance of it would decrease.

Let look into few examples:

Case 1:

rdd = sqlContext.sql("select * FROM software_jobs JOIN indian_states ON software_jobs.state = states.name")

As the number of unique keys in table 2(no of states) will be very less and also most number of records will come from only few keys in second table, there will cause problem of uneven sharing and limited parallelism.

The table2 will have only 35keys (no of states + UT) and most of the rows would be from Karnataka or Andhra Pradesh key.

Also in this case, if we increase the nodes to more than 35, this will not solve the problem as we will have only 35 keys.

Broadcast Hash Join can solve this problem if the table2 is small enough to fit in memory. (We will discuss this soon)

Case 2:

rdd = sqlContext.sql("select * from people_in_tamil_nadu 
LEFT JOIN people_in_india
ON people_in_tamil_nadu.id = people_in_india.id

Looking at this query we assume that the size of the output file will be equal to size of people_in_tamil_nadu. However, this is not how shuffle hash join works!

All the rows from table1 and table2 will shuffle across the network to join and then it will realise that the majority of records are from a single key and rest gets dropped.

This can be solved by analysing the data and dropping the unused data before performing the join. This will greatly increase the query speed and will reduce the unnecessary data to shuffle across the network.

Detecting Shuffle Problems:

The best place to check for shuffle problem is to check in the Spark UI screen.

In spark UI, under jobs → Task , we can check for :

  • Which task is taking more time than others
  • Speculative tasks being launch

Broadcast Hash Join

Broadcast Hash Join can be used when one of the table in join is small enough to fit in memory. Essentially spark takes the small table and copy it in the memory of each machine. This ensures that there is no data shuffle across the network and also the parallelism of the large dataset will be maintained.

If we use parquet file format, spark catalyst optimiser will automatically make this decision broadcast small table to memory. However, if we use other file formats like textfile, spark catalyst optimiser may not be able to calculate the size of the table and we would need to explicitly give a hint to broadcast the table.

Cartesian Join

A cartesian join can easily explode the number of output rows:

100,00 * 100,00 = 10 Billion

To optimise such join, we would need to increase the size of cluster. The better way to decide the size of cluster is to take calculate the time on a sample set and they scale it accordingly with the original dataset size.

One to Many Join

A single row on one table can map to many rows on other table. This can also explode the number of output rows.

This type of join can be optimise by using a parquet file format. The size of the output file will be reduced because parquet encodes duplicate data.

So the size of the output file will be less than size of input table.

Theta Join

rdd = sqlContext.sql("select * from table1 JOIN table2 ON (key1 < key2 + 10)")

This type of join is when the join condition is not equal rather it joins on some range of keys.

In this case spark internally performs a full cartesian join and loop through each record to perform the condition on it. Even if output file will have most of the records dropped, the query will run very slow.

To optimise theta join, we can use bucketing. We can create the buckets in a way that the query can be joined with equal condition and on less dataset.

--

--