Optimising the runtime of massive PySpark applications

dunnhumby
dunnhumby Science blog
6 min readFeb 8, 2023

An article by Neil McCulloch, Alex Webb

Improving the performance of problematic PySpark applications can often seem like a daunting task. Here we outline a general strategy for tackling these projects, and describe how we’ve used it to reduce the runtime of our On Shelf Availability reporting by half.

Context

Our On Shelf Availability (OSA) classifies whether a product was Out of Stock (OOS) in a historic period. It does this by analysing characteristics of a product’s sales, comparing against ~5000 similar products (e.g., sales of a canned cola might be compared against other canned sodas)

As more teams used this solution, we found that it was getting pushed beyond its design limits. Teams were using this to report product availability across whole retailers — at times running on millions of products, across thousands of stores. Scalability of this level needs to be by design — rarely can problems be resolved by “throwing more compute at it”. Luckily, many of these design considerations aren’t application specific. We’ve isolated some general principles that may be useful in other projects.

So here are our general tips for optimising massive PySpark jobs.

Stop doing things you don’t need to

The quickest and simplest way of improving performance in an application is to simply do less. Though this might sound trivial, often code added during development, e.g. to check the number of rows in a DataFrame, is never removed, and causes unnecessary operations in production workflows.

Filter as early as possible, keeping only what you need

In most applications, some proportion of the runtime is just shuffling data around the cluster, so we should take care to reduce our data volume as much as possible. If you only need a subset of columns, only keep those columns. If data is sparse, and columns contain the same value most of the time, consider structuring joins to match the “interesting” rows, and fill in non-matching rows with the “samey” value.

Back to basics

There are some standard practices in Spark that we should probably be doing by default anyway — reducing shuffle by broadcasting small DataFrames in join with large ones, salting joins with skewed conditions to balance the job size, and making effective use of windows. Any instance of these is unlikely to be transformational in isolation, but where there are many joins, not sticking to these basic rules can often sum into significant performance challenges.

By far the most drastic change (but with the greatest potential) is to completely revaluate the approach you’re taking. These overhauls are often very invasive and require significant effort to verify that the new approach is scientifically valid. However, this is the only real way to multiple orders of magnitude of improvement.

Setting up for success

Before we embark on a project to improve performance, there are a few prerequisites:

A clear aim and timeframe

This is perhaps the most important thing to have before embarking on your performance improvement journey. The aim might be a particular reduction in runtime or cost, or a scale you want to enable. Performance gains often have a long tail, with more and more effort required to unlock smaller and smaller incremental gains.

A baseline

To improve something, we need a baseline. This is normally a repository in source control that contains the branch of the solution to be improved. All experiments/improvements should be done without changing this — i.e. in a separate development branch.

We also need to be able to say that our performance improvements haven’t come at the cost of functionality that we care about. It is often easier to decouple non-functional changes from analytic ones — it’s much easier to say the “outputs must be identical”, rather than deal with some finite tolerance for what a “good” output is. In the latter case, write a wrapper for this test, so is as quick as possible to check for a “good” output.

A way of analysing how things are running

In order to find potential areas for improvement, and assess whether we’re making progress, we need to be able to see how our application is executing — which jobs are largest, which might be unnecessary, and how data is being moved around the executors in our cluster.

Optimising availability forecasts for retailers

Our primary objective in this case-study was to support full-scale retailer runs of OSA.

Prior to any development work, we laid some foundations. Baseline outputs were generated, using a month of transaction data from 5000 stores, to ensure we weren’t causing unexpected functional changes. To test our implementation at different scales, a range of product hierarchy levels were created, from Single Canned Carbonates through Dry Grocery, extending from 40 to almost 400,000 products.

We began by investigating the event timeline of typical run of OSA using Apache Hadoop’s YARN Timeline Server, allowing us to identify unnecessary tasks and which tasks were the most memory intensive and time consuming. Shown below is the event timeline of a baseline run of OSA.

YARN timeline of a baseline run, showing multiple count tasks, and shuffle in the range of 54–72GB. The total runtime is approximately 6.5 hours.
Figure 1: YARN event timeline of a baseline run

Look at the greyed-out section comprising of Spark count tasks and the volume of data shuffled in large tasks.

  1. The first falls under the “Stop doing things you don’t need to” category discussed in the Approaches section earlier. Throughout OSA, the size of intermediate DataFrames were periodically checked, before applying some logic (see Python example below). In many cases, the logic applied could tolerate an empty DataFrame without issue. In these cases, tasks triggered by these count operations are redundant.
  2. The volume of shuffled data relates to the “Filter as early as possible, keeping only what you need” category discussed above. Due to the nature of grocery retail transactional data, it is common to be dealing with DataFrames with more than 99% of rows not contributing to output values. Most products do not sell in most stores most of the time. Therefore, to reduce the volume of data shuffled, we excluded rows without sales from DataFrames as early as possible.
# do you really need to do this?
if not df.count() == 0:
df = apply_logic(df)

The figure below shows the event timeline after the changes above were implemented, notice the removal of non-functional Spark count tasks, the reduction in shuffled data and the total reduction in runtime.

YARN timeline of an optimised run, showing no count tasks, and shuffle in the range of 33–36GB. The total runtime is approximately 3.5 hours.
Figure 2: YARN event timeline of an equivalent run, after changes are made

Our implementation resulted in a 50% reduction in runtime of OSA on average, and crucially the enablement of full-scale retailer runs. Notably, the runtime reduction scales nonlinearly with input size, shown below.

Scatter chart with y axis of runtime, and x axis of number of products, including lines of best fit, showing exponential scaling of runtime against number of products. The Development curve is approximately the same shape as the Baseline, at around half the total runtime. Development runtime is approximately 4.5 hours at 400,000 products.
Figure 3: Runtime scaling of On Shelf Availability

The nature of this scaling relationship significantly reduces the prohibitive nature of large-scale runs, which is particularly important when using shared clusters.

Conclusion

Spark performance optimization requires a thorough understanding of the underlying technology and the specific use case. By analysing the event timeline, identifying bottlenecks, and implementing appropriate changes, it is possible to significantly improve the performance of Spark applications.

We achieved our goal, enabling full-retailer scale runs of OSA, by identifying the low-hanging fruit; choosing the lowest effort changes for the largest performance gain. By setting a clear performance goal, the stopping point was clearly defined — It’s all too easy to sink significant time into changes resulting in minor gains.

Most importantly, this isn’t an application specific strategy — and we hope outlining it here will equip you to tackle similar performance problems in your own project.

--

--

dunnhumby
dunnhumby Science blog

dunnhumby is the global leader in Customer #DataScience, empowering businesses everywhere to compete and thrive in the modern data-driven economy.