A Glance at Apache Spark optimization techniques

Gaurav Patil
Globant
Published in
13 min readOct 31, 2022

A brief look at widely used performance optimization techniques for Spark

Photo by Marc-Olivier Jodoin on Unsplash

Data is nothing but information about facts and figures. There are three primary sources of data: Social data, Machine-generated data, and Transaction data. Data analysis helps organizations to improve their internal processes, customer relationship management, increased user experience, potential improvements, etc. This vast data is nothing but Big Data. It needs high-speed computing techniques to process it faster because traditional systems cannot process it.

Spark is an immensely powerful framework for processing big data. You might have seen that your Spark application runs slow, even though enough resources are allocated because that application might not be optimized enough. Spark has some in-built optimization techniques like Catalyst optimizer and Tungsten optimizer, but there are more ways to optimize Spark applications.

This article is intended for data engineers and data scientists who work on Spark and want to gain some performance improvement in their Spark applications. In this article, I shall give you a brief understanding of some widely used optimization techniques supported by Spark and how you can use them. Let's start…

Optimization techniques in Spark

Spark engine has some built-in optimizations, but still, we need to focus more on other optimizations, which we might need to do in terms of cluster optimization, configuration values tweaks, table optimization, code level optimizations, etc., which collectively help to improve the performance of our application.

We can broadly categorize optimization techniques into three categories as given below:

  • Optimizing Spark configurations: This includes changing the Spark properties.
  • Optimizing Spark program: This includes code-level optimizations.
  • Optimizing storage: This includes file format optimization.

Let's dive deep into each category…

Optimizing Spark Configurations

Many Spark configuration properties are available in the Spark engine, which can be set while creating the SparkSesion object. These properties control the Spark application and can be tuned to improve the performance of the application.

How will you set the Spark configuration?

1. You need to import the required classes.

from pyspark.sql import SparkSession

2. Create an object and set the configuration value in the config() function.

spark = (SparkSession.builder
.master("yarn")
.appName("test")
.config("spark.sql.shuffle.partitions", 200)
.getOrCreate())

Now let's see which properties we can tune to improve the performance of the Spark application…

Tuning Spark Executor

Most of the time, we allocate static resources to the application. This strategy works well when we know our application's source data size. But what if some day you are getting massive data in source and the other day you are getting small data? Static allocation will not be a practical choice in this scenario. We can dynamically add/remove executors according to our Spark application's workload. Spark has a dynamic allocation technique to enable executor scaling. This approach is best when we have an inconsistent workload daily. You need to set the spark.dynamicAllocation.enabled property to "true" to enable the dynamic allocation of resources. After that Spark application will scale in and scale out executors according to its need.

This requires any one of the below two properties to be set:

  • spark.shuffle.service.enabled
  • spark.dynamicAllocation.shuffle tracking.enabled

The following properties are also relevant to dynamic allocation configuration:

  • spark.dynamicAllocation.minExecutors
  • spark.dynamicAllocation.maxExecutors
  • spark.dynamicAllocation.initialExecutors
  • spark.dynamicAllocation.executorAllocationRatio

Tuning Spark Memory

We often get an out-of-memory error either at the driver or executor sides. To avoid that, we need a correct memory configuration. The executor memory is divided into different layers, which can be tuned to improve performance. We need to set the spark.memory.fraction property to tune the values of these layers.

You can see three memory regions in the diagram below:

Memory management in Spark
Memory management in Spark
  • Reserved Memory: Memory reserved by the system and its size is hard coded. It is generally 300 MB.
  • User Memory: User memory is calculated as ("Java Heap Memory" — "Reserved Memory") * (1.0 — spark.memory.fraction). This memory pool remains after the allocation of Spark Memory, and it is completely up to you to use it in the way you like. User Memory, and it's completely up to you what would be stored in this RAM and how Spark makes completely no accounting on what you do there and whether you respect this boundary or not. Not respecting this boundary in your code might cause an out-of-memory error.
  • Spark Memory: Spark memory is calculated as ("Java Heap Memory" — "Reserved Memory") * spark.memory.fraction. Memory pool, managed by Spark, is further divided into two memory regions — Storage Memory & Execution Memory.

Storage memory is used for storing all of the cached data, and broadcast variables are also stored here. Spark will store that data in this segment for any persist() option that includes 'MEMORY'. It is calculated by using the formula given below:

Storage Memory = (Java Heap Memory — Reserved Memory) * spark.memory.fraction * spark.memory.storageFraction

Execution memory is used by Spark for objects created during the execution of a task. It is calculated by using the formula given below:

Execution Memory = (Java Heap Memory — Reserved Memory) * spark.memory.fraction * (1.0 — spark.memory.storageFraction)

Tune Shuffle File Buffer

Disk access is slower when compared to in-memory data access as it involves a serialization process that takes up time and resources. We can reduce disk I/O costs by introducing a shuffle read/write file buffer in the memory.

The memory buffer size controls the disk seeks and system calls made in creating intermediate shuffle files. We need to set the spark.shuffle.file.buffer property to change the memory buffer size. The default value for this property is 32k. If the available memory resources are sufficient, we can increase the size of this parameter to reduce the number of times the disk file overflows during the shuffle write process, reducing the number of disk IO times and improving performance. The recommended size for this property is 1 MB. This allows Spark to do more buffering before writing the final map results to disk. We can set the property as given below:
spark.shuffle.file.buffer = 1 MB

Tune Compression block size

For large datasets, we can change the default compressed block size. These data blocks can be compressed through either storage or speed-based, like LZO, SNAPPY, and GZIP. The below property needs to be set for the block size used in LZ4 compression. The default compression block size is 32 kb which is not optimal for large datasets. By increasing block size, you can see up to a 20% reduction in shuffle/spill file size.
spark.io.compression.lz4.blockSize = 512KB

Compression block size vs size of shuffle files
Compression block size vs. size of shuffle files

Tune Shuffle Partitions value

The shuffle-partition means the number of partitions generated after each transformation step that causes data shuffling, such as join(), agg(), reduce(), etc. By setting spark.sql.shuffle.partitions property, you can decide the level of parallelism in your Spark application.

You can set Spark property spark.sql.shuffle.partitions to control default shuffle partitions. The default value for this property is 200. You should set the number of shuffle partitions according to your cluster's data size and available resources. The number of partitions should be multiple of the executors you have so that partitions can be equally distributed across tasks.
spark.sql.shuffle.partitions = <<integer value>>

Use off-heap memory

Off-heap memory lies outside the JVM but is used by JVM for specific use cases (for example - interning of Strings). Spark can explicitly use Off-Heap memory to store serialized dataframes and RDDs. Spark uses off-heap memory for two purposes:

— A part of off-heap memory is used by Java internally for purposes like String interning and JVM overheads.
— Off-Heap memory can also be used by Spark explicitly for storing its data as part of Project Tungsten.

The total off-heap memory for a Spark executor is controlled by spark.executor.memoryOverhead configuration. The amount of off-heap memory used by Spark to store actual data frames is controlled by spark.memory.offHeap.size. It can be enabled by setting spark.memory.offHeap.use configuration to true. The difference between these two memories is explained here.

Optimizing Spark Program

The real power of a CPU can only be seen if we run a well-optimized program. We should not just think that increasing resources will help to improve performance unless we optimize our code. Spark provides many techniques we can include to optimize our application. This section will see some of the widely used program optimization techniques.

Broadcast Join

Let's consider a scenario where you are joining a big table with a small table. During this join operation, more shuffling will happen. We can avoid shuffling by using a broadcast join. It will copy a small table to every node where the executor is running. However, after a certain threshold, broadcast join tends to have less advantage over shuffle-based joins.

# Example program to illustrate use of broadcast join
from pyspark.sql.functions import broadcast
emp_df = spark.sql("select id, name, dept_id from employee")
dept_df = spark.sql("select dept_id, dept_name from department")
df_joined = emp_df.join(broadcast(dept_df),emp_df.dept_id == dept_df.dept_id, ‘inner’)
df_joined.show(20)

Cache data

Every time we call the Action in the Spark program, it triggers DAG and executes it from the beginning. That's why it's recommended not to use unnecessary Actions in Spark programs. Double-check the code and remove or comment Actions you wrote for debugging/testing in your Spark program.

One key point to improve performance when the same dataframe is being referred to in multiple places is to cache that dataframe. Spark has two functions to cache dataframe: cache() and persist(). RDD's cache() function default saves the dataframe to memory while the persist() function is used to store dataframe at the user-defined storage level. The persist() function supports different parameters.

Storage Levels comparison chart
Storage Levels comparison chart

Repartition data

A dataframe is partitioned means there are logical groups of records in it. A group of records is called a partition. Each task processes each partition, many tasks run in parallel inside one executor, and parallel execution happens in Spark. The level of parallelism can be increased if we distribute data correctly. For repartitioning, Spark has two methods — repartition() and coalesce().

If we check the source code, we see that coalesce() is called inside the repartition() with the shuffle parameter set to true. The major difference between coalesce() without shuffle parameter and the repartition() is that coalesce() is mainly used to decrease partitions and repartition() is used to increase partitions. Less shuffling happens when we use coalesce() it compared torepartition(). The repartition() function can also be used to solve data skewness issues.

Below is the code snippet taken from the RDD source code.

/*
* Return a new RDD that has exactly numPartitions partitions.
* Can increase or decrease the level of parallelism in this RDD.
* Internally, this uses a shuffle to redistribute data.
* If you are decreasing the number of partitions in this RDD,
* consider using `coalesce`, which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}

So, to improve the performance of your application, you need to either increase or decrease the partitions. That totally depends on your data. If you are processing large data and want to create more simultaneous tasks, then you can use the repartition() function to increase the partitions. If you want to re-shuffle the data based on column value, you can also use the repartition() function. If you want to reduce the number of output files, then you can use the coalesce() function. You can significantly improve the application's performance after applying these changes.

Filter data in earlier steps

The key point to improve the performance of joins and other processing is to filter data in earlier steps which you don't need in the result set. Let's say you want to process and store only employees in 'Mumbai'. Then instead of reading the whole data and process and then applying filters on the result, you can just filter your dataframe earlier while reading the tables into the dataframe.

There is one more important feature of Spark. It optimizes the logical plan internally. It uses predicate pushdown for supported file formats and pushes filters at the beginning, and then it does the processing on filtered data.

Use Salting Technique to eliminate data skewness issues.

We often see on Spark UI that some tasks take longer, and some finish quickly. This happens when your data is skewed. That means data is not evenly distributed across the partitions. So, tasks working on partitions where the data size is more than the other partitions take more time to complete. This also causes an out-of-memory issue sometimes. Re-shuffling data across partitions can eliminate data skewness. We can achieve shuffling by using the repartition() function. But this function does not guarantee the even distribution of records. So, we must add some random values in a new column in the dataframe, which we often call the salted key, and then we can pass that salted key in the repartition function as an argument. After that repartition() function will re-shuffle data based on the salted key column values.

# Example program to illustrate use of Salting Techniquesalt_df_new = df.withColumn(“salted_key”, 
(rand * n).cast(IntegerType))
shuffled_df = salt_df_new.repartition(100, ‘salted_key’)

Provide schema explicitly while reading data into dataframe

The predefined schema makes it easier for Spark to get columns and datatypes without reading the entire file; this improves the performance of your Spark code if you are dealing with a massive volume of data. We can use the inferSchema=True option if we want Spark to identify schemas implicitly, and we can use the schema option to provide a schema to dataframe.

# Example program showing how to provide the schema externally
# while reading the file
from pyspark.sql.types import StructType, IntegerType, DateTypeschema = StructType([
StructField(“col_01”, IntegerType()),
StructField(“col_02”, DateType()),
StructField(“col_03”, IntegerType()) ])
df = spark.read.csv(filename, header=True, schema=schema)

Optimize JSON files processing

JSON is a widely used format that allows for semi-structured data because it does not require a schema. This offers added flexibility to store and query data that doesn't always adhere to fixed schemas and data types. JSON files are compatible with many systems. It is always recommended to pass schema explicitly while reading JSON files to avoid incorrect identification of data types.

# Example program showing how to provide the schema externally 
# while reading JSON file
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
StructField(“first_name”, StringType(), True),
StructField(“last_name”, StringType(), True),
StructField(“address”, StringType(), True)
])
emp_df = sqlContext.read.json(“employee_file.json”, schema)

Use the ReduceByKey function over GroupByKey

Both reduceByKey() and groupByKey() are broad transformations, meaning both will shuffle across the partitions. The critical difference between reduceByKey() and groupByKey() is that reduceByKey() does a map side combine and groupByKey() does not. The reduceByKey() acts like a mini reducer. So, the shuffling of data can be reduced if we use reduceByKey().

Reduce by key example
Reduce by key example
Group by key example
Group by key example

Avoid the use of UDFs (User Defined Functions)

UDFs are heavy when it comes to performance. Spark does not optimize UDFs as it's like a black box for them. Built-in "Spark SQL" functions are optimized and recommended to be used in programs. Spark provides a lot of commonly used Spark SQL functions for data transformation. So, before deciding to create a UDF, look at Spark SQL documentation to see if any function is available which can satisfy your requirement.

Check the uniqueness of keys while using the join columns

We should always carefully choose the join columns for joining two dataframes. If duplicate values exist in either of the columns, it takes a long time to join such dataframes because a cartesian product can happen. This also leads to an out-of-memory error. So, it is always better to analyze the table by executing some queries before we write the code.

Optimizing Storage

Optimizing storage is very important to improve the performance of the Spark application. This includes organizing data by creating partitions and buckets, using serialized data formats, choosing an appropriate file format according to the application's needs, etc. Let's see them one by one…

Bucketing and Partitioning

Bucketing and Partitioning are widely used techniques for optimizing hive tables. Partitioning is often represented as directories, and bucketing is represented as files. Partitioning splits records into files present under different directories named the same as partition column values, based on the partition column. Bucketing is helpful in further splitting records into different files based on a hashing function. This improves performance when we read data in Spark.

If the table is partitioned and bucketed, then the "Spark SQL" in which filter columns are the same as partitioned/bucketed columns can run faster because it only scans particular directories and files instead of scanning all directories. Thus, query performance is improved.

The cardinality of a column can be defined as the number of distinct values in that column. It is always recommended to use bucketing on high-cardinality columns and partitioning on low-cardinality columns.

# Example program showing how to use partitioning and bucketing 
# while creating a Hive table
CREATE EXTERNAL TABLE zipcodes(
rec_no int,
country string,
city string,
zip_code int
)
PARTITIONED BY(
state string
)
CLUSTERED BY zip_code INTO 10 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’;

Serialized data formats

Serialization is a mechanism for converting the state of an object into a byte stream. This mechanism is used to persist the object. Deserialization is the reverse process where the byte stream is used to recreate the actual Java object in memory. Spark uses Java Serializer. We can explicitly hint Spark by using the Kryo serializer, which improves performance.

Different file formats support serialization, like Avro, Parquet, and ORC. These file formats also support compression libraries like ZLIB, SNAPPY, etc.

Choosing an appropriate file format

Spark is compatible with many file formats such as text, Parquet, ORC, Avro, JSON, etc. Each file format has its advantages and disadvantages. Text format is widely used for unstructured data. The JSON format is used for semi-structured data. Structured data can be stored in ORC or Parquet, as both are columnar formats. ORC and Parquet are memory efficient and perform better in speeding up queries. Avro stores schema in JSON format, and it has broad compatibility with other systems. Depending on the requirement, we must choose the correct file format to balance performance and the need.

File formats comparison
File formats comparison

Conclusion

In this article, I gave you a brief understanding of all widely used optimization techniques. Though Spark is an immensely popular and prominent Big Data processing engine, optimization is always a challenging topic. Hence, it is essential to understand your data and all possible Spark configurations for your process.

There's no predefined way to optimize Spark applications. It all depends on your data and code. Make sure you perform different benchmarks and source data analyses before deciding on the optimization technique for your program. You can use the Spark UI to analyze your applications and determine where you can improve them. Thank you for reading my blog. Happy coding!

References

--

--