Spark performance tuning from the trenches

Yann Moisan
May 29, 2018 · 9 min read
  • Execution plan analysis,
  • Data management (caching, broadcasting),
  • Cloud-related optimizations (including S3).

1- Use the power of Tungsten

It’s common sense, but the best way to improve code performance is to embrace Spark’s strengths. One of them is Tungsten.

Use Dataset structures rather than DataFrames

To make sure our code will benefit as much as possible from Tungsten optimizations we use the default Dataset API with Scala (instead of RDD).

Avoid User-Defined Functions (UDFs) as much as possible

Using a UDF implies deserialization to process the data in classic Scala and then reserialize it. UDFs can be replaced by Spark SQL functions, there are already a lot of them and new ones are regularly added.

def currency = udf(
(currencySub: String, currencyParent: String) ⇒
Option(currencyParent) match {
case Some(curr) ⇒ curr
case _ ⇒ currencySub
}
)

Avoid User-Defined Aggregate Functions (UDAFs)

A UDAF generates SortAggregate operations which are significantly slower than HashAggregate. For example, what we do instead of writing a UDAF that compute a median is using a built-in equivalent (quantile 0,5):

df.stat.approxQuantile(“value”, Array(0.5), 0)

Avoid UDFs or UDAFs that perform more than one thing

Software Craftsmanship principles obviously apply when writing big data stuff (do one thing and do it well). By splitting UDFs we are able to use built-in functions for one part of the resulting code. It also greatly simplify testing.

2- Look under the hood

Analysing Spark’s execution plan is an easy way to spot potential improvements. This plan is composed of stages, which are the physical units of execution in Spark. When we refactor our code, the first thing we look for is an abnormal number of stages. A suspicious plan can be one requiring 10 stages instead of 2–3 for a basic join operation between two DataFrames.

Simple execution plan example
A very simple DAG example — Image credits

3- Know your data and manage it efficiently

We’ve seen how to improve job performance by looking into the execution plan but there are also plenty of possible enhancements on the data side.

Highly imbalanced datasets

To quickly check if everything is ok we review the execution duration of each task and look for heterogeneous process time. If one of the tasks is significantly slower than the others it will extend the overall job duration and waste the resources of the fastest executors.

Stages tab example on Spark UI

Inappropriate use of caching

There is no universal answer when choosing what should be cached. Caching an intermediate result can dramatically improve performance and it’s tempting to cache a lot of things. However, due to Spark’s caching strategy (in-memory then swap to disk) the cache can end up in a slightly slower storage. Also, using that storage space for caching purposes means that it’s not available for processing. In the end, caching might cost more than simply reading the DataFrame.

Storage tab example on Spark UI

Broadcasting

We regularly use small DataFrames, for example when we want to cross a billion auctions with a website list we choose to broadcast the latter to all the executors and avoid a shuffle.

auction
.join(broadcast(website) as “w”, $”w.id” === $”website_id”)

4- Cloud related optimizations

Our Spark clusters run on AWS EMR. EMR provides a managed Hadoop framework on EC2 with YARN to centrally manage cluster resources. Until now we have been using r3.xlarge instances (30Gio, 4 vCPU). We decided to only use one kind of instance so that sizing is simpler.

-- driver-memory 1g
-- driver-cores 1
-- executor-memory 20g = executor heap size
-- executor-cores 4
-- num-executors $executorCount
  • Hourly training jobs that spawn their own ephemeral clusters,
  • Daily jobs (~2 hours duration) that also spawn their own ephemeral clusters,

A few precautions using S3

We use S3 for persistent storage but S3 is not a filesystem. It’s an object store and it means that simple operations are not supported. For example, a simple renaming actually needs to copy and then delete the original file.

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 spark.speculation false

That’s it for now

We hope this selection will be helpful and we thank Spark’s vibrant community for sharing such great resources (see references below)!

Bibliography

Teads Engineering

120+ innovators building the future of digital advertising

Thanks to Benjamin DAVY and Alban Perillat-Merceroz.

Yann Moisan

Written by

Teads Engineering

120+ innovators building the future of digital advertising