Pyspark Joins — Explained

Diogo Veloso
BiLD Journal
Published in
7 min readNov 23, 2020

I ‘ve wrote this article in order to briefly explain the Joins in Pyspark. Hopefully this can be a cheat sheet where you can quickly go throw the examples and find the correct syntax. I’ve included some cases for the alias on join usage.

Types of joins

First of all let’s review the types of joins available in Spark (in quotation marks are the arguments in Pyspark)

Inner join– “inner”

Left Outer Join or Left Join — “left” or “leftouter” or “left_outer”

Right Outer Join or Right Join — “right” or “rightouter” or “right_outer”

Outer Join or Full Join — “full” or “outer” or “fullouter” or “full_outer”

Left Semi Join — “semi” or “leftsemi” or “left_semi”

Left Anti Join — “anti” or “leftanti” or “left_anti”

Cross Join — “cross”

Syntax

Let’s take two dataframes, one called df_1 and the other df_2 and columns called key1 and key2 in each column.

Syntax

join(other, on = … , how = …)

Example

df_1.join(df_2, on = df_1.key1 == df_2.key1, how = "anti")

or

df_1.join(df_2, = df_1.key1 == df_2.key1, "anti")

Parameters

other — the right dataframe that we want to join, in this example df_2.

on — a string for the join column name (key) or a list of column names or a join expression, let’s give some examples:

  • String :
"key1"
  • List of columns :
["key1","key2"]
  • Join Expression :
df_1.key1 == df_2.key1
  • Multiple Join Expressions:
(df_1.key1 == df_2.key2) & (df_1.key1 == df_2.key2)

& = AND | = OR

  • List of Join Expressions
[ df_1.key1 == df_2.key1,
df_1.key2 == df_2.key2 ]

how — Type of join, it must be a string, the default value (if it isn’t explicit wrote) is an inner join. All possible string are in the Types of Joins section.

#Inner join
"inner"
#Left Outer Join or Left Join
"left" or "leftouter" or "left_outer"
#Right Outer Join or Right Join
"right" or "rightouter" or "right_outer"
#Outer Join or Full Join
"full" or "outer" or "fullouter" or "full_outer"
#Left Semi Join
"semi" or "leftsemi" or "left_semi"
#Left Anti Join
"anti" or "leftanti" or "left_anti"
#Cross Join
"cross"

Data

Let’s create one dataframe with the names of football players and another dataframe with countries, each dataframe have IDs and this IDs are related with the nationality of each player, they may have IDs in common or not.

The Players (Ordered by name)

Countries (Ordered by name)

football_players = [("Cristiano Ronaldo", 1), 
("Kylian Mbappé", 2),
("Lionel Messi", 3) ]
df_football_players = spark.createDataFrame(football_players, ["NAME", "ID"])
countries = [("Portugal", 1), 
("France", 2),
("Brazil", 4) ]
df_countries = spark.createDataFrame(countries, ["COUNTRY", "ID_2"])

Examples

Inner Join

With the Inner join, it evaluates both dataframes’s keys and only includes the rows that have keys in common in each dataframe:

condition = df_football_players.ID == df_countries.ID_2inner_join = df_football_players.join(df_countries, condition, "inner" )

Full Outer Join/ Outer Join

Evaluate the keys in both Dataframes and includes the rows that have keys in common in each dataframe as well as the one that aren’t in common in this case it will insert null to the columns of which side

full_join =df_football_players.join(df_countries, condition, "full")

Left Join/ Outer Left Join

With the Left Outer joins or Left Join, it evaluates both dataframes’s keys and includes the left rows (even it doesn’t match with the right dataframe) and includes the rows that have keys in common in each dataframe. Basically, it’s the inner join plus the rows that didn’t match on the left side.

left_join = df_football_players.join(df_countries, condition, "left")

Right Join/ Outer Right Join

Same logic as the Left Join but it includes the right dataframe. It evaluates both dataframes’s keys and includes the right rows (even it doesn’t match with the left dataframe) and includes the rows that have keys in common in each dataframe.

right_join = df_football_players.join(df_countries, condition, "right")

Left Semi Join

Semi Joins are a bit different from the other joins, it will not include any Columns from the right DataFrame. It only compares the values and if exists in the right Dataframe it will keep those records in the Left DataFrame, it acts as a filter.

semi_join = df_football_players.join(df_countries, condition, "semi")

Left Anti Join

Left Anti Join is the opposite of left Semi Joins. Basically, it filters out the values in common with the Dataframes and only give us the Left Dataframes Columns.

anti_join = df_football_players.join(df_countries, condition, "anti")

Cross Join (Cartesian)

Cross join will join every single row in the left Dataframe to every single row in the right DataFrame, if you have alot of rows it will explode the number of rows (if you have 1000 rows in each dataframe,you will get 1000 x 1000 = 1 million of rows) you will need to explicitly call out the function joinCross:

cross_join = df_football_players.crossJoin(df_countries)

If you use the join function with the argument on, it will act as a inner join

cross_join2 = df_football_players.join(df_countries, on = condition, how= "cross" )

Caution

Whenever you forget to add the argument on to the join function, the join will act as a cross join, even if you add a different type of join:

cross_join3 = df_football_players.join(df_countries, how = "left")
Error

Depending on which setting you have, it will throw an AnalysisException error, to use this you can set the crossJoin enable on your Spark session builder:

spark.conf.set("spark.sql.crossJoin.enabled", True)

I would advice not to, because if you forget to add the argument on and the dataframes have millions of records you would have an explosion in records. My advice is to use the method crossJoin this way you will only use the cross join whenever you want and not by mistake 😉. To turn off the cross join in the join function just simply:

spark.conf.set("spark.sql.crossJoin.enabled", False)

Alias

Let’s modify the df_countries’s column from ID_2 to ID, now we are going to join these dataframe but they have a column with the same name, let’s see what happens:

df_countries_2 = df_countries.withColumnRenamed("ID_2", "ID")keys = df_football_players.ID == df_countries_2.IDdf_inner = df_football_players.join(df_countries_2, keys, "inner")

It looks like we have two columns with same name, let’s try to select those columns

df_inner.select("ID").show()

It throws an AnalysisException error, to avoid ambiguous let’s use the alias

alias_football = df_football_players.alias("A")alias_countries = df_countries_2.alias("B")condition = alias_football.ID == alias_countries.IDalias_football.join(alias_countries,condition ,"inner")\
.select("B.ID").show()

Now it’s not so ambiguous for Spark 😉. This can be also used for dropping columns, really useful to drop columns that you’re not going to use as well as one of the keys of the join.

alias_football.join(alias_countries, condition,how = "inner")\
.drop(alias_football.ID)\
.drop("COUNTRY")\
.show()

As well as selecting columns:

alias_football.join(alias_countries, condition,how = "inner" ).select("A.ID","B.COUNTRY").show()

Final Cautions

I didn’t talk about Natural joins because it can be dangerous, Natural joins guesses which columns you would like to join by finding columns with the same name. The function join doesn’t support the natural join, you can do it with SQL but I would advise joining with explicit keys.

Sources

Spark: The Definitive Guide: Big Data Processing Made Simple -Book by Matei Zaharia

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join

--

--

Diogo Veloso
BiLD Journal

Big Data Engineer, currently working with Azure and Databricks