Six point checklist for Spark job optimization

Abhinav Prakash
3 min readMar 22, 2023

--

I have been scouring the internet to try and understand the best ways to optimize a spark job. Here, I am summarizing my findings. This should be helpful for anyone who has intermediate knowledge of Spark and wishes to start their performance optimization journey.

Apply each of the below mentioned points to your spark job, and at the end you should witness optimized performance -

1. Use the correct file format

  • If you are dealing with tabular data, you should choose the Apache Parquet format. The format is optimized for queries (transformations) on a subset of columns and on a large dataframe.
  • If Spark is being used with Databricks, another particularly interesting format is the delta format which offers automatic optimization tools.

2. Maximize parallelism by splitting data using partitions

  • To read and process data in parallel, we need to split data into multiple partitions.
  • Spark organizes one thread per task and per CPU core. Each task is related to a single partition. (Configure a number of partitions at least as large as the number of available CPU cores)
  • Goal is to split Spark job stages into number of tasks.
  • How to create partitions?
DF.repartition(<number of partitions>) # To increase the number of partitions
DF.coalesce(<number of partitions>) # To decrease the number of partitions

3. Handle shuffle operations

  • Shuffle partitions are special kinds of partitions which are created during stages of jobs involving a shuffle [wide transformations — join(), groupBy() etc.]
  • The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions.
  • Based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark.sql.shuffle.partitions configuration (default value: 200)
  • You can typically set it to be 1.5 or 2 times of the initial partitions or number of cores in the cluster
sqlContext.setConf("spark.sql.shuffle.partitions", "256")

4. Broadcast hash joins

  • When joining tables, if one table is significantly smaller than the other, the smaller table can be duplicated in memory on all executor nodes. This can be done in two ways:
  • Broadcast small tables automatically -

spark.sql.autoBroadcastHashJoin (default: 10 MB)

You can either increase this value or set it to -1 to force spark to broadcast.

  • Manually broadcast tables:
SELECT /*+ BROADCAST(SMALL_TABLE) */
*
FROM
LARGE_TABLE
LEFT JOIN
SMALL_TABLE
ON
LARGE_TABLE.COL1 = SMALL_TABLE.COL2 ;

5. Cache intermediate results

  • Intermediate results that are being used downstream (in multiple operations) should be cached.
DF.persist()
DF.persist(storageLevel="MEMORY_ONLY")
Storage Level descriptions

6. Manage memory of executor nodes

Execution Memory = spark.memory.fraction * (spark.executor.memory — Reserved Memory)

spark.memory.fraction (default : 0.6) → This means 60% of the memory is reserved for executions and 40% for storage

Reserved Memory (default: 300 MB)

We can modify the below parameters to tinker with performance:

  • spark.executor.memory
  • spark.memory.fraction

--

--