Joining Spark Datasets

Gergely Soti
datamindedbe
Published in
3 min readNov 16, 2018

Ever wanted to do better than joins on Apache Spark DataFrames? Now you can!

The new Dataset API has brought a new approach to joins. As opposed to DataFrames, it returns a Tuple of the two classes from the left and right Dataset. The function is defined as

joinWith[U](other: Dataset[U], 
condition: Column,
joinType: S): Dataset[(T,U)]

Assuming that the left Dataset’s TypeTag is T, the join returns a tuple of the matching objects. There is a minor catch, though: the resulting objects can be null.

In the following example we will walk through the different scenarios and explore the different use cases. Let’s prepare some data to be joined — we pretend that we work for a publishing house, and want to track which articles were viewed:

case class Article(author: String, title: String, id: Int)
case class ArticleView(articleId: Int, viewCount: Int)
case class AuthorViews(author: String, viewCount: Int)
val articles = Seq(
Article("Gergely", "Spark joins", 0),
Article("Kris", "Athena with DataGrip", 1),
Article("Kris", "Something Else", 2),
Article("Gergely", "My article in preparation", 3)
).toDS
val views = Seq(
ArticleView(0, 1), //my article is not very popular :(
ArticleView(1, 123), //Kris' article has been viewed 123 times
ArticleView(2, 24), //Kris' not so popular article
ArticleView(10, 104) //a deleted article
).toDS

Let’s start with inner joins, because that works out of the box as expected. Usually that is also what you’ll want: who are the popular authors?

articles
.joinWith(views,
articles("id") === views("articleId"),
"inner")
+-----------------------------+-------+
|_1 |_2 |
+-----------------------------+-------+
|[Gergely,Spark joins,0] |[0,1] |
|[Kris,Athena with DataGrip,1]|[1,123]|
|[Kris,Something Else,2] |[2,24] |
+-----------------------------+-------+

One would map the results of the join to a new class (we have defined AuthorViews for this):

articles
.joinWith(views,
articles("id") === views("articleId"),
"inner")
.map{ case (a,v) => AuthorViews(a.author, v.viewCount) }
+-------+---------+
| author|viewCount|
+-------+---------+
|Gergely| 1|
| Kris| 123|
| Kris| 24|
+-------+---------+

So far everything works as expected. However, the moment we switch to a left join (maybe there are some articles which have no views), we get an informative java.lang.NullPointerException! Which is expected, since it is a left join, so accessing viewCount in the map is not safe. One needs to handle that explicitly.

articles
.joinWith(views,
articles("id") === views("articleId"),
"inner")
.map{
case (a, null) => AuthorViews(a.author, 0)
case (a,v) => AuthorViews(a.author, v.viewCount)
}
+-------+---------+
| author|viewCount|
+-------+---------+
|Gergely| 1|
| Kris| 123|
| Kris| 24|
|Gergely| 0| //this line is handled by case(a, null)
+-------+---------+

Keep in mind that the case statement needs to be first, othervise case (a,v) will “swallow” up the (a, null)case.

Naturally, an outer join needs to handle both cases. This happens when you deleted an article but failed to clean the corresponding viewCounts

articles
.joinWith(views,
articles("id") === views("articleId"),
"inner")
.map{
case (a, null) => AuthorViews(a.author, 0)
case (null, v) => AuthorViews("Unknown article", v.viewCount)
case (a,v) => AuthorViews(a.author, v.viewCount)
}
+---------------+---------+
| author|viewCount|
+---------------+---------+
| Kris| 123|
| Gergely| 0|
|Unknown article| 104|
| Kris| 24|
| Gergely| 1|
+---------------+---------+

One can take a slightly different approach as well: modify the AuthorView class to handle correctly the null values.

case class AuthorView(author: String, viewCount: Option[Int])

In such case the left join could work simpler (note that one must wrap vin an Option):

articles
.joinWith(views,
articles("id") === views("articleId"),
"left")
.map{ case (a,v) => AuthorViews(a.author,
Option(v).map(_.viewCount)) }

Please do keep in mind, however, that even though the Dataset API was introduced in Spark version 1.6, in the current version (2.4) it is still marked as “Experimental”.

--

--