Quick Take: Scala Chops for Apache Spark
More functional Spark using Scala idioms
This is the first in a series of stories on the same topic: Up your Scala game to write better Apache Spark code. We see a lot of good Data Scientists writing rather procedural, imperative code where there are better alternatives that are not well documented, at least for Data Scientists.
The Rue Gilt Groupe Data Science Team writes Apache Spark mostly in Scala, but sometimes in Python, on the Databricks platform. We’re talking here about Scala, specifically some of Scala’s powerful functional features. Without care, Databricks notebooks can become procedural and messy. A fluency in these Scala features will improve your Spark code in the following ways:
- Readability: Make it easier for you to write and others to read.
- Robustness: Remove fragile constructs.
- Performance: Make it faster.
We’re using an example that came up recently where we wanted to union some datasets and it took longer than expected. It’s a simple thing, but the principles scale well. Below are our investigation and some results we hope you’ll find interesting.
We had four datasets with the same schema that we wanted to merge, or
union, in Apache Spark as follows:
Our union code was simply a sequence of
.unions like you’d expect:
which took about 20s. We include
.filter($"rating > 0.1) so Spark can’t easily shortcut the count. Note that we use
.count rather that
display() because Databricks can use an implicit
.limit(1000) to short-circuit things for display, which skews results. (You can
.persist DataFrames to avoid reading the files again, which can slow down the initial load dramatically while speeding up subsequent operations.)
Our concerns with this approach were:
- The list of files was fragile because the path for each is long and must be the same.
- You have to remember to add the DataFrame for any new files to the
- Performance was not predictable.
If you didn’t notice the deliberate bugs in each of the code snippets above, I think we’ve made our point about readability and robustness.
Let’s replace a couple of things in the code to make it easier to spot these errors.
Making it cleaner
Two things stand out:
- The repetitive file paths.
- The cumbersome union.
To fix this, let’s collect the files in a
List, map them to DataFrames, then union them in a function:
Now our code is DRY: the file path and the union only appear once in the code. We could easily have put the
union function inline with:
.reduceLeft(_ union _)
which many could argue is a bit idiosyncratic until you know about Scala’s trick with infix operators and how to use
_ as a shortcut for parameters. Curiously, each
_ above refers to a different parameter. But passing a function with a good name lets us write sentences in code that are clearer and therefore more likely to be a thing we like, provably correct by inspection.
Skip ahead if this is old hat.
reduceLeft takes the first element of a list. If there’s a second element, it applies the function, which is
union in our case. Then it applies the function to this result and the third element, and so on.
reduceLeft works fine with one element (it just doesn’t apply the function), but throws
UnsupportedOperationException if the list is empty.
There are two ways to handle potentially empty lists:
reduceLeftOption, which returns and
Nonefor the empty list and
Some(t)if the list has values.
reduce/fold? left/right? This is confusing!
right variants are interchangeable if the operators are commutative — that is, the order of parameters doesn’t matter. Both
* are commutative (and so is
As you see above, things get interesting for
/ because they are not commutative, and the order matters.
fold? Sometimes you want to start with an initial value or work with empty lists without
fold is just the ticket in this case, as follows:
Because fold always has an initial value, it works on empty lists just fine.
Yes, but what about performance?
We made our code more readable and robust, as promised. But does it go faster? No, not at all.
We can see this by doing a
.explain. The plan is identical to the procedural code we started with.
Our fake example is only counting the number of rows in the union of all the DataFrames. So perhaps we can filter before the union like this:
And try it out:
Incidentally, if we really want to go for brevity, we can write the filtered count as:
files.map(fromFile).map(_.filter($"rating" > 0.1).count).sum
Let’s look at the API reference for
read.parquet. We can pass multiple files! So let’s try it:
About the same. Makes no practical difference. Oh wait,
.parquet takes a file pattern,
If anything, that’s taking a hair longer and would get messy if you didn’t want to read everything in a location.
If having your cake is robust and reliable, then not eating it is not making it go faster. In this example, anyway. Sorry about that. Apache Spark is good at optimizing expressions that are equivalent regardless of how they’re written. And it simply can’t cheat the bandwidth required to read from AWS S3.
If we missed something, or you have a way to make this faster, please let us know in the comments.
An aside on writing reduce* and fold* in terms of each other
You might think
reduce are interchangeable. Well, they are, but maybe not as you might think at first. Take a look at this:
+, a commutative operator, this works out perfectly. We pass the initial value for
0, which is a good start if you’re adding things up. (You’d use
But with subtraction,
-, things do not go as planned:
foldLeft are not equivalent. What’s going on? We decompose these into their arguments and operators as follows.
list.reduceLeft is the same as the expression:
1 - 7 - 5 - 2 = -13
list.foldLeft(0) is equivalent to:
0 - 1 - 7 - 5 - 2 = -15
You can see how we construct the equivalent correct sequence of operations using the starting value of
list.head and mapping over the remainder of the list,
list.tail. (It does not work to convert the list to
0 +: list: we’re back to square one.)
You can see why this works by converting it back to
reduceLeft with the components we used in the fold.
list.head +: list.tail is by definition the identity transform on
list. You can see exceptions as expected when we apply this to the empty list,