Quick Take: Scala Chops for Apache Spark

More functional Spark using Scala idioms

Note

This is the first in a series of stories on the same topic: Up your Scala game to write better Apache Spark code. We see a lot of good Data Scientists writing rather procedural, imperative code where there are better alternatives that are not well documented, at least for Data Scientists.

Background

The Rue Gilt Groupe Data Science Team writes Apache Spark mostly in Scala, but sometimes in Python, on the Databricks platform. We’re talking here about Scala, specifically some of Scala’s powerful functional features. Without care, Databricks notebooks can become procedural and messy. A fluency in these Scala features will improve your Spark code in the following ways:

  • Readability: Make it easier for you to write and others to read.
  • Robustness: Remove fragile constructs.
  • Performance: Make it faster.

We’re using an example that came up recently where we wanted to union some datasets and it took longer than expected. It’s a simple thing, but the principles scale well. Below are our investigation and some results we hope you’ll find interesting.

The issue

We had four datasets with the same schema that we wanted to merge, or union, in Apache Spark as follows:

The DataFrames we wanted to merge

Our union code was simply a sequence of .unions like you’d expect:

The simple approach totally works but leaves us wondering

which took about 20s. We include .filter($"rating > 0.1) so Spark can’t easily shortcut the count. Note that we use .count rather that display() because Databricks can use an implicit .limit(1000) to short-circuit things for display, which skews results. (You can .cache or .persist DataFrames to avoid reading the files again, which can slow down the initial load dramatically while speeding up subsequent operations.)

Our concerns with this approach were:

  1. The list of files was fragile because the path for each is long and must be the same.
  2. You have to remember to add the DataFrame for any new files to the .union.
  3. Performance was not predictable.
If you didn’t notice the deliberate bugs in each of the code snippets above, I think we’ve made our point about readability and robustness.

Let’s replace a couple of things in the code to make it easier to spot these errors.

Making it cleaner

Two things stand out:

  1. The repetitive file paths.
  2. The cumbersome union.

To fix this, let’s collect the files in a List, map them to DataFrames, then union them in a function:

Cleaned up a bit

Now our code is DRY: the file path and the union only appear once in the code. We could easily have put the union function inline with:

.reduceLeft(_ union _)

which many could argue is a bit idiosyncratic until you know about Scala’s trick with infix operators and how to use _ as a shortcut for parameters. Curiously, each _ above refers to a different parameter. But passing a function with a good name lets us write sentences in code that are clearer and therefore more likely to be a thing we like, provably correct by inspection.

reduceLeft?

Skip ahead if this is old hat.

reduceLeft takes the first element of a list. If there’s a second element, it applies the function, which is union in our case. Then it applies the function to this result and the third element, and so on. reduceLeft works fine with one element (it just doesn’t apply the function), but throws UnsupportedOperationException if the list is empty.

There are two ways to handle potentially empty lists:

  1. Use reduceLeftOption, which returns and Option[T], specifically None for the empty list and Some(t) if the list has values.
  2. Use foldLeft.

reduce/fold? left/right? This is confusing!

In general, left and right variants are interchangeable if the operators are commutative — that is, the order of parameters doesn’t matter. Both + and * are commutative (and so is union).

You can make reduceLeft/Right work on empty lists

As you see above, things get interesting for - and / because they are not commutative, and the order matters.

So what’s fold? Sometimes you want to start with an initial value or work with empty lists without Option tomfoolery. fold is just the ticket in this case, as follows:

Because fold always has an initial value, it works on empty lists just fine.

Yes, but what about performance?

We made our code more readable and robust, as promised. But does it go faster? No, not at all.

We can see this by doing a .explain. The plan is identical to the procedural code we started with.

Our fake example is only counting the number of rows in the union of all the DataFrames. So perhaps we can filter before the union like this:

Fold works this way for commutative operators like +

And try it out:

Can’t seem to short-circuit the union this way

Nope.

Incidentally, if we really want to go for brevity, we can write the filtered count as:

files.map(fromFile).map(_.filter($"rating" > 0.1).count).sum

Let’s look at the API reference for read.parquet. We can pass multiple files! So let’s try it:

The Scala idiom “_*” expands a list to separate arguments

About the same. Makes no practical difference. Oh wait, .parquet takes a file pattern.../group-*:

Try passing a pattern to .parquet()

If anything, that’s taking a hair longer and would get messy if you didn’t want to read everything in a location.

The upshot

If having your cake is robust and reliable, then not eating it is not making it go faster. In this example, anyway. Sorry about that. Apache Spark is good at optimizing expressions that are equivalent regardless of how they’re written. And it simply can’t cheat the bandwidth required to read from AWS S3.

If we missed something, or you have a way to make this faster, please let us know in the comments.


An aside on writing reduce* and fold* in terms of each other

You might think fold and reduce are interchangeable. Well, they are, but maybe not as you might think at first. Take a look at this:

Writing fold and reduce in terms of each other

With +, a commutative operator, this works out perfectly. We pass the initial value for foldLeft to 0, which is a good start if you’re adding things up. (You’d use 1 for *.)

But with subtraction, -, things do not go as planned: reduceLeft and foldLeft are not equivalent. What’s going on? We decompose these into their arguments and operators as follows. list.reduceLeft is the same as the expression:

1 - 7 - 5 - 2 = -13

Whereas list.foldLeft(0) is equivalent to:

0 - 1 - 7 - 5 - 2 = -15

You can see how we construct the equivalent correct sequence of operations using the starting value of list.head and mapping over the remainder of the list, list.tail. (It does not work to convert the list to 0 +: list: we’re back to square one.)

You can see why this works by converting it back to reduceLeft with the components we used in the fold. list.head +: list.tail is by definition the identity transform on list. You can see exceptions as expected when we apply this to the empty list, Seq().