Spark Joins for Dummies

Anant Mishra
6 min readJul 25, 2021

--

Everyone including me would say Spark is easy to get started with but difficult to master.

It’s hard to get Spark to work properly, but when it works — it works great!

This guide is all about how to use the PySpark join function while working with DataFrames in Spark.

My starting point has been a GitHub repo, Spark: The Definitive Guide (forked here), I believe it’s a good introduction to the tool: it is authored by Bill Chambers (Product Manager at Databricks) and Matei Zaharia (Chief Technologist at Databricks and creator of Spark).

Before we begin let’s recap SQL joins for better understanding:

Google Images

Let’s create some DataFrames to work with:

Sample Data

How to do joins?

Doing joins in PySpark is easy to do with 3 parameters

Inner Join

Inner joins evaluate the keys in both of the DataFrames or tables and include (and join together) only the rows that evaluate to true.

Inner join is the default join, so you just need to specify left DataFrame and join the right in the JOIN expression.

Inner join results

Outer Join

  • Outer joins evaluate the keys in both of the DataFrames or tables and includes (and joins together) the rows that evaluate to true or false.
  • If there is no equivalent row in either the left or right DataFrame, Spark will insert null.
joinExpression is same as earlier used in inner join
Outer join results (Extra record with null values)

Left Outer Join

  • Left outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the left DataFrame as well as any rows in the right DataFrame that have a match in the left DataFrame.
  • If there is no equivalent row in the right DataFrame, Spark will insert null.
joinExpression is same as earlier used in inner join
In case we had any keys which were present in left dataframe but not in right then there would have been null in those cells.

Right Outer Join

  • Right outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the right DataFrame as well as any rows in the left DataFrame that have a match in the right DataFrame.
  • If there is no equivalent row in the left DataFrame, Spark will insert null.
joinExpression is same as earlier used in inner join
All rows from right dataframe were included with null values for all un-matched rows in left dataframe

Left Semi Join

  • This join actually doesnt include any values from the right DataFrame.
  • They only compare to see if value exists in second DataFrame.
  • If the value does exist, those rows will be kept in the result, even if there are duplicate keys in the left DataFrame.
joinExpression is same as earlier used in inner join
Even though we have 4 records but because no key for id=3 exists in person table, you don’t see that key after join

Left Anti Join

  • This join is exactly opposite to Left Semi Join.
  • This also compares values to see if the value exists in the second DataFrame. However, rather than keeping the values that exist in the second DataFrame, they keep only the values that do not have a corresponding key in the second DataFrame.
  • Think of anti joins as a NOT IN SQL-style filter
joinExpression is same as earlier used in inner join
This result is exact opposite of what we got from left-semi join

Cross (Cartesian) Join

  • Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame.
  • This will cause an absolute explosion in the number of rows contained in the resulting DataFrame.

You should use cross-joins only if you are absolutely, 100 percent sure that this is the join you need. Reason being we need to be explicit when defining a cross-join in Spark.

#1 When I ran this, it didn’t work because my guess is spark by default selected a better join type to use instead of doing cross join.

Both #2, #3 will do cross join.

#3 Here PySpark gives us out of the box crossJoin function.

So many unnecessary records!

Advanced users can set the session-level configuration spark.sql.crossJoin.enable to true in order to allow cross-joins without warnings or without Spark trying to perform another join for you.

Now that we have understood how you can do joins in Spark using PySpark. It’s time for taking a look at how Spark performs these joins internally.

How Spark Performs Joins?

To understand how Spark performs joins, we need to understand the two core resources at play:

  • node-to-node communication strategy
  • per node computation stratergy

Spark approaches cluster communication in two different ways during joins. It either incurs a

  • shuffle join, which results in an all-to-all communication. When you join a big table to another big table, you end up with a shuffle join.
  • a broadcast join

Big table-to-Big table

  • when you join big table to big table, we end up in Shuffle Join
  • In Shuffle join, every node talks to other node and they share data according to which node has a certain key or set of keys(on which you are joining).
  • These joins are expensive because the network can become congested with traffic, especially when the data is not partitioned well.

Big table–to–small table

  • This uses Broadcast Join.
  • When the table is small enough to fit into the memory of a single worker node, with some breathing room of-course, we can optimize our join.
  • Replicate our small DataFrame onto every worker node in the cluster (be it located on one machine or many).
  • Even if it looks expensive initially, but this prevents us from performing the all-to-all communication during the entire join process.
  • We perform it only once at the beginning and then let each individual worker node perform the work without having to wait or communicate with any other worker node.
  • This means that joins will be performed on every single node individually, making CPU the biggest bottleneck.

For our current set of data, we can see Spark joining type by looking at the explain plan:

By default spark uses SortMergeJoin

With DataFrame API, we can also explicitly give the optimizer a hint that we would like to use a broadcast join by using the correct function around the small DataFrame in question.

Because of hint, Spark was forced to use the BroadcastHashJoin

If you try to broadcast something too large, you can crash your driver node (because that collect is expensive).

Little table–to–little table

The rule of thumb here is when performing joins with small tables, it’s usually best to let Spark decide how to join them. You can always force a broadcast join if you’re noticing strange behaviour.

So, that’s about it folks, hope you enjoyed reading it.

Feel free to dm me if you have any questions!

Peace✌🏽

Anant Mishra

LinkedIn | Instagram | GitHub

--

--

Anant Mishra

Data Engineering @Expedia | Python, SQL, Spark, Databricks, Azure