Writing Scalable Apache Spark Code | Towards AI

4 Tips To Write Scalable Apache Spark Code

___
___
Sep 18 · 6 min read

Introduction

In this article, I will share some tips on how to write scalable Apache Spark code. The examples presented here are actually based on code I encountered in the real world. So, by sharing these tips, I hope I can help newcomers to write performant Spark code without needlessly increasing their cluster’s resources.

Cluster Setup

The cluster I used to run the code in this article is hosted on Databricks with the following configuration:

  • Cluster Mode: Standard
  • Databricks Runtime Version: 5.5 LTS ML (includes Apache Spark 2.4.3 Scala 2.11)

There are 8 workers and both the workers and driver are m4.xlarge instances (16.0 GB, 4 Cores).

Background

I recently inherited a notebook meant to track the results of our AB Tests to evaluate the performance of our recommendation engine. Running the notebook from start to finish was excruciatingly slow. This was the cell that took the longest time to execute:

Figure 1: Very bad implementation

The files used to track the data in our AB Test experiments are stored as newline delimited JSON files partitioned into different folders by year, month and day. Each day could have a few hundred JSON files. If we give a list of dates (bucketPeriod) to code in Figure1, it will loop through them and load all the JSON files on each date by calling getAnalytics on each date. This will return a list of data frames which we will combine into a single data frame by calling union after we remove empty data frames from the list (those that have 0 rows).

Executing the code in Figure 1 to retrieve just 3 days worth of data takes 17.72 minutes and counting the results i.e. df.count takes another 26.71 minutes. Let’s see if we can run things faster without increasing our cluster size.

Before moving to discuss the tips, here’s the implementation of getAnalytics which basically reads a folder containing a bunch of newline delimited JSON files and adds few features based on the content of the payload field in those files:

Figure 2: Another very bad implementation

Tip 1: Feed as much input as possible to Spark functions

The code in Figure 1 is essentially calling spark.read.json on one folder at a time. This can be inefficient considering that spark.read.json can take in a list of filenames which would allow the driver to schedule tasks only once as opposed to multiple times if spark.read.json was called repeatedly like in a for a loop.

Therefore, if you need to read files that are spread across many folders, instead of writing this:

Figure 3: Reading files sequentially

You should write this:

Figure 4: Reading all files at once

The code in Figure 4 is slightly faster compared to the code in Figure 3 (6.86 minutes vs 7.30 minutes).But, bigger the date range, bigger the difference.

Tip 2: Skip schema inference whenever possible

According to the spark.read.json documentation:

This function goes through the input once to determine the input schema. If you know the schema in advance, use the version that specifies the schema to avoid the extra scan.

Therefore, the code in Figure 4 can be rewritten as follows:

Figure 5: Reading JSON files while skipping schema inference

In Figure 5, instead of explicitly specifying the schema, I decided to read only 1 file in the list of files, infer the schema based on that one file and apply the same schema to the rest of the files. This code cell only took 29.82 seconds to complete.

Tip 3: Structure your data frame operations to minimize shuffling

Now that we’ve improved file I/O, let’s see if we can improve the transformations in getAnalytics as shown in Figure 2.

The problem with the code in Figure 2 is that the first call to distinct will cause a shuffle. How can we tell this will happen? By calling the toDebugString method of an RDD after defining a data frame operation like so:

Figure 6: How to tell that distinct causes a shuffle

Each indent in the output of toDebugString signals that a shuffle will be triggered when the transformation is executed. Recall that df1 is just reading the JSON files so the cause of the shuffle must have been the call to distinct. Using the same method, we can also deduce that the call to rank().over(rankingOrderedIds) (line 13) will cause a shuffle.

In this case, triggering a shuffle immediately after reading the files is a bad idea because the whole dataset is huge and there are actually a lot of extraneous columns that we don’t need for our analysis. So we are unnecessarily moving around large files in the cluster. What we should aim for is given a list of operations that we would like to perform to achieve our end goal, can we reorder them so that the shuffle only happens as late as possible and on as little data as possible?

Fortunately for us, this just means rewriting the code in Figure 2 as follows:

Figure 7: Optimized version of Figure 1

For clarity, I’ve combined the relevant parts of Figure 1 into Figure 7 so that the results are identical. Notice that instead of calling distinct 3 times (once in Figure 1 and twice in Figure 2), we only need to call it once to achieve the same result. When the shuffle happens (due to the call in line 9), the physical plan shows that only the columns specified in the select call get moved around which is great because it is just a small subset of all the columns in the original dataset.

The code in Figure 7 only takes 0.90 seconds to execute and it counts only 2.58 minutes. This example shows that it pays to really think through the queries you are writing to avoid doing redundant operations that cause unnecessary shuffles.

Tip 4: Perform obvious optimizations by hand instead of relying on the Catalyst Optimizer

Compare and contrast the following two snippets:

Figure 8a: Filter after calling the rank window function (same as Figure 7)
Figure 8b: Filter before calling the rank window function

Figure 8a and 8b are identical except in the order of the filter performed (line 14 and line 9 respectively). But both snippets give identical results because the rank window function is not a function of column c3 so performing the filter before or after the ranking does not matter. However, the execution time df2.count is significantly different:

Table 1: Execution time of df2.count

The physical plan of these two queries explains the reason:

Figure 9: Physical plans for Figure 8a and 8b

Figure 9 shows that the effect of filtering before calling the rank window function is to reduce the number of rows at the first stage from 802 million to just 7 million (Figure 8b column). Consequently, when the shuffle happens only a total of 2.9 GB of data needs to be moved around the cluster (the box labelled “Exchange” in the Figure 8b column). In contrast, not filtering before calling the rank window function caused 304.9 GB of data to be moved around the cluster.

The lesson here is that if your query causes a shuffle, try to look for opportunities to apply some filters to the data to reduce the amount of data that needs to be transferred. Also, don’t be afraid to study your query’s physical plan for any optimization opportunities that the Catalyst Optimizer may have missed.

Conclusion

We’ve seen that the way a Spark query is written can have a significant effect on its execution time. I hope these tips will help you write a better Spark code that maximizes your cluster’s resources. Let me know in the comments if you have any questions.

Towards AI

Towards AI, is the world’s fastest-growing AI community for learning, programming, building and implementing AI.

___

Written by

___

About me: https://bit.ly/2GQX7i9

Towards AI

Towards AI, is the world’s fastest-growing AI community for learning, programming, building and implementing AI.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade