Writing Scalable Apache Spark Code | Towards AI
In this article, I will share some tips on how to write scalable Apache Spark code. The examples presented here are actually based on code I encountered in the real world. So, by sharing these tips, I hope I can help newcomers to write performant Spark code without needlessly increasing their cluster’s resources.
The cluster I used to run the code in this article is hosted on Databricks with the following configuration:
- Cluster Mode: Standard
- Databricks Runtime Version: 5.5 LTS ML (includes Apache Spark 2.4.3 Scala 2.11)
There are 8 workers and both the workers and driver are m4.xlarge instances (16.0 GB, 4 Cores).
I recently inherited a notebook meant to track the results of our AB Tests to evaluate the performance of our recommendation engine. Running the notebook from start to finish was excruciatingly slow. This was the cell that took the longest time to execute:
The files used to track the data in our AB Test experiments are stored as newline delimited JSON files partitioned into different folders by year, month and day. Each day could have a few hundred JSON files. If we give a list of dates (
bucketPeriod) to code in Figure1, it will loop through them and load all the JSON files on each date by calling
getAnalytics on each date. This will return a list of data frames which we will combine into a single data frame by calling
union after we remove empty data frames from the list (those that have 0 rows).
Executing the code in Figure 1 to retrieve just 3 days worth of data takes 17.72 minutes and counting the results i.e.
df.count takes another 26.71 minutes. Let’s see if we can run things faster without increasing our cluster size.
Before moving to discuss the tips, here’s the implementation of
getAnalytics which basically reads a folder containing a bunch of newline delimited JSON files and adds few features based on the content of the payload field in those files:
Tip 1: Feed as much input as possible to Spark functions
The code in Figure 1 is essentially calling
spark.read.json on one folder at a time. This can be inefficient considering that
spark.read.json can take in a list of filenames which would allow the driver to schedule tasks only once as opposed to multiple times if
spark.read.json was called repeatedly like in a for a loop.
Therefore, if you need to read files that are spread across many folders, instead of writing this:
You should write this:
The code in Figure 4 is slightly faster compared to the code in Figure 3 (6.86 minutes vs 7.30 minutes).But, bigger the date range, bigger the difference.
Tip 2: Skip schema inference whenever possible
According to the
This function goes through the input once to determine the input schema. If you know the schema in advance, use the version that specifies the schema to avoid the extra scan.
Therefore, the code in Figure 4 can be rewritten as follows:
In Figure 5, instead of explicitly specifying the schema, I decided to read only 1 file in the list of files, infer the schema based on that one file and apply the same schema to the rest of the files. This code cell only took 29.82 seconds to complete.
Tip 3: Structure your data frame operations to minimize shuffling
Now that we’ve improved file I/O, let’s see if we can improve the transformations in
getAnalytics as shown in Figure 2.
The problem with the code in Figure 2 is that the first call to distinct will cause a shuffle. How can we tell this will happen? By calling the
toDebugString method of an RDD after defining a data frame operation like so:
Each indent in the output of
toDebugString signals that a shuffle will be triggered when the transformation is executed. Recall that
df1 is just reading the JSON files so the cause of the shuffle must have been the call to
distinct. Using the same method, we can also deduce that the call to
rank().over(rankingOrderedIds) (line 13) will cause a shuffle.
In this case, triggering a shuffle immediately after reading the files is a bad idea because the whole dataset is huge and there are actually a lot of extraneous columns that we don’t need for our analysis. So we are unnecessarily moving around large files in the cluster. What we should aim for is given a list of operations that we would like to perform to achieve our end goal, can we reorder them so that the shuffle only happens as late as possible and on as little data as possible?
Fortunately for us, this just means rewriting the code in Figure 2 as follows:
For clarity, I’ve combined the relevant parts of Figure 1 into Figure 7 so that the results are identical. Notice that instead of calling
distinct 3 times (once in Figure 1 and twice in Figure 2), we only need to call it once to achieve the same result. When the shuffle happens (due to the call in line 9), the physical plan shows that only the columns specified in the
select call get moved around which is great because it is just a small subset of all the columns in the original dataset.
The code in Figure 7 only takes 0.90 seconds to execute and it counts only 2.58 minutes. This example shows that it pays to really think through the queries you are writing to avoid doing redundant operations that cause unnecessary shuffles.
Tip 4: Perform obvious optimizations by hand instead of relying on the Catalyst Optimizer
Compare and contrast the following two snippets:
Figure 8a and 8b are identical except in the order of the filter performed (line 14 and line 9 respectively). But both snippets give identical results because the rank window function is not a function of column c3 so performing the filter before or after the ranking does not matter. However, the execution time
df2.count is significantly different:
The physical plan of these two queries explains the reason:
Figure 9 shows that the effect of filtering before calling the rank window function is to reduce the number of rows at the first stage from 802 million to just 7 million (Figure 8b column). Consequently, when the shuffle happens only a total of 2.9 GB of data needs to be moved around the cluster (the box labelled “Exchange” in the Figure 8b column). In contrast, not filtering before calling the rank window function caused 304.9 GB of data to be moved around the cluster.
The lesson here is that if your query causes a shuffle, try to look for opportunities to apply some filters to the data to reduce the amount of data that needs to be transferred. Also, don’t be afraid to study your query’s physical plan for any optimization opportunities that the Catalyst Optimizer may have missed.
We’ve seen that the way a Spark query is written can have a significant effect on its execution time. I hope these tips will help you write a better Spark code that maximizes your cluster’s resources. Let me know in the comments if you have any questions.