Accelerating Big Data processing with Spark optimisation
The biggest room in this world is the room for improvement. With new technologies and frameworks emerging at a rapid pace, opportunities for improvement in ways of working always present themselves.
This article deals with one such journey that began with venturing into an unknown space of migrating the existing legacy projects into the big data space. Doing this at the enterprise level had its own set of challenges in the functional as well as in the non-functional areas.
One of the biggest reasons for this move was to have better control of our solutions as we ramped up our innovation gears. Even though we had the legacy systems, but our data size was growing each year. The teams then came to a consensus to design new solutions to maintain scalability, security and governance. This also tied in nicely with the transformation that we were applying to our technology stack.
Why Apache Spark?
Apache Spark is a leading open-source data processing engine used for batch processing, machine learning, stream processing, and large-scale SQL (structured query language). It has been designed to make big data processing quicker and easier. Since its inception, Spark has gained huge popularity as a big data processing framework and is extensively used by different industries and businesses that are dealing with large volumes of data.
This post will exhibit actionable solutions to maximise our chances of reducing computation time by optimising Spark jobs without the need of scouring the internet or other knowledge sources.
The strategy lays out different run stages, wherein each run stage builds upon the previous, and improves the computation time by making new enhancements and recommendations.
Our use case was for a complex credit risk capital calculator. The challenge was to perform this computation on extremely high data volumes with better performance, and at a lower cost. The recommendations made here are with YARN & HDFS cluster but can be applied to other infrastructures as well.
We start with an overview depicting the various run stages and the improved run times in each stage as shown above. The first five runs took place on Spark 2.4 and the last one was implemented on Spark 3.1.
Considerations for Run 1: Serialisation, Parquet File Format, and Broadcasting
1) Serialisation: Serialisation helps in converting objects into streams of bytes and vice versa. When we work on any type of computation, our data gets converted into bytes and are transferred over the network. If we transfer less data across the network, the time taken for the job to be executed decreases accordingly. Spark provides two types of serialisations, Java and Kryo.
Java serialisation
- Provided by default, which can work with any class that extends java.io.Serializable
- Flexible but quite slow, and leads to large, serialised formats for many classes
Kryo serialisation
- Faster and compact compared to Java serialisation
- Requires registering the classes in advance for best performance
2) Parquet file format: Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
Columnar formats are attractive in terms of both file size and query performance. File sizes are usually smaller than row-oriented equivalents as the values from one column are stored next to each other. Additionally, query performance is better as a query engine can skip columns that are not needed.
3) Broadcasting: Joining two tables is a routine operation in Spark. Usually, a large amount of data is exchanged over the network between the executing nodes. This exchange can cause network latency. Spark offers several join strategies to optimise this operation. One of them is Broadcast Hash Join. If one of the tables is small enough (the default is 10MB, but could go up to 40MB), the smaller table can be broadcasted to each executor in the cluster, and shuffle operations can be avoided.
Broadcast Hash Join happens in 2 phases, Broadcast, and Hash Join.
- Broadcast phase: Small dataset is broadcasted to all executors
- Hash Join phase: Small dataset is hashed in all the executors and joined with the partitioned big dataset
Here are some things to note about Broadcasting:
- The broadcast relation should completely fit into the memory of each executor as well as in the driver, because the latter starts the data transfer.
- When the size of the broadcasted data is big, you would get OutOfMemory exception.
- Broadcasting only works for equi(‘=’) joins.
- Broadcasting works for all join types (inner, left, right) except full outer joins.
- Spark deploys this join strategy when the size of one of the join relations is less than the threshold values (10 MB default). The Spark property which defines this threshold is spark.sql.autoBroadcastJoinThreshold(configurable).
Run 1 took 85 minutes to complete.
Considerations For Run 2: Breaking the Lineage
In our use case, we have complex computations which involve iterative and recursive algorithms that are performed on large volumes of data. Each time we apply transformations on a DataFrame, the query plan grows. When this query plan becomes huge, the performance decreases dramatically, which results in bottlenecks.
It is not advisable to chain a lot of transformations in a lineage, especially when you need to process a huge volume of data with minimum resources. Hence, it’s preferable to break the lineage.
Below is an example of a computation with transformations and actions. To speed up the performance and simplify the processing, we broke the lineage at Step E.
We tested the following three methods to break the lineage:
Checkpoint: Checkpointing is a process of truncating the execution plan and saving it to a reliable distributed (HDFS) or local file system. It’s a feature of Spark to that is specifically useful for highly iterative data algorithms. Checkpoint files can be used in subsequent job runs or driver programmes.
Checkpointing can be eager or lazy, as per the eager flag of checkpoint operator. The former is the default and happens immediately when requested, while the latter only happens when an action is executed. Developers can use this syntax: val brokenLineageDf = existingDf.checkpoint()
Local Checkpoint: This works similarly to a checkpoint, but the data is not transferred to HDFS, and is instead saved to the executor’s local filesystem. If an executor is killed during processing, the data will be lost, and Spark will not be able to recreate this DataFrame from the DAG (Directed Acyclic Graph). Developers can use this syntax:
val brokenLineageDf = existingDf.localCheckpoint()
Writing data to HDFS in parquet: When we checkpoint the RDD/DataFrame, it is serialised and stored in HDFS. It doesn’t store it in parquet format, parquet provides efficient data storage.
Breaking the lineage by writing to HDFS in parquet gave us the best performance of the above three.
Upon breaking the lineage, Run 2 took 55 minutes and 36 seconds, a stark improvement from Run 1’s time of 85 minutes.
It would also be good to note that caching offers an alternative to increase performance without breaking the lineage. The difference between both, and when to use one over the other, can be read here.
Considerations For Run 3: Right Shuffle Partition
Choosing the right shuffle partition number helps in job performance. Partitioning decides the degree of parallelism in a job, as there is a one-to-one correlation between a task and a partition (each task processes one partition).
The ideal size of each partition is around 100–200MB. The smaller partitions increase the number of parallel running jobs, which can improve performance, but too small of a partition will result in overhead and increase the GC time. Larger partitions will decrease the number of jobs running in parallel and will also leave some cores idle, which will increase the processing time.
In case of a shuffle, how does one choose the right number of shuffle partition (spark.sql.shuffle.partitions)?
We followed the following Spark recommendations and put them into practice:
- If intermediate data is too large, then we should increase shuffle partitions to make partitions smaller.
- In case of idle cores during job runs, increasing shuffle partitions helps in job performance.
- If intermediate partitions are small (in KBs), then decreasing shuffle partitions helps
- For a cluster with huge capacity, the number of partitions should be 1x to 4x of the number of cores to gain optimised performance. For instance, with a data of 40GB and 200 cores, set the shuffle partition to 200 or 400.
- For a cluster with limited capacity, shuffle partitions can be set to Input Data Size / Partition Size (100–200MB per partition). The best case scenario would be to set the shuffle partition to be a multiple of the number of cores to achieve maximum parallelism, depending on cluster capacity. For example:
- For a data of 1 GB with 6 cores (executor cores 3 with max executor 2). The ideal shuffle partition can be 12 (2x the number of cores), with a partition size of 100MB.
- For a data of 20 GB with 40 cores, set shuffle partitions to 120 or 160 (3x to 4x of the cores), with a partition size of 200MB).
We ran our job with the right shuffle partition, and it took 45 minutes and 39 seconds to complete.
Considerations For Run 4: Code Optimisation
For this run we focused on making two changes at the code level.
1. Replace join and aggregations with Window functions: In most of our computations, we had to perform aggregation on specified columns. The result was to be stored as a new column. In this instance, this operation consists of aggregation followed by join.
The more optimised option here would be to use Window functions. By replacing our join and aggregation with Window functions in our code, we found significant improvement.
A simple benchmark and DAG representations of the two methods can be found here.
2. Replace withColumn with Select: Every operation on a DataFrame results in a new DataFrame. In cases when we need to call withColumn repeatedly, it’s better to have a single DataFrame.
Instead of using:
use:
DAG also creates unnecessary shuffles when we have withColumn with multiple windowSpecs:
Using Select:
From the above execution plan, we can see that using Select is a better choice.
Run 4 took just 33 minutes and 53 seconds to complete.
Considerations For Run 5: Speculative Execution
Apache Spark has speculative execution feature to handle the slow tasks in a stage due to environment issues such as a slow network. If one task is running slowly in a stage, the Spark driver can launch a speculation task for it on a different host. Between the regular task and its speculation task, the Spark system will take the result from the first successfully completed task and kill the slower one.
In the case of long running jobs (where some tasks are slower than the others) — which can be identified from monitoring the time taken via Spark UI, enabling speculation would help.
If spark.speculation is set to true, then slow running tasks are identified based on the median computed by taking the completion time of other tasks. After identifying the slow running jobs, speculative tasks are initiated on the other nodes to complete the job.
Enabling the speculation resulted in Run 5 taking just 28 minutes 19 seconds to complete.
Considerations For Run 6: Enabling AQE (Adaptive Query execution)
For this run, we enabled AQE, which is the main feature of Spark 3.0. AQE can be enabled by setting SQL config spark.sql.adaptive.enabled to true (false is the default in Spark 3.0).
In Spark 3.0, the AQE framework is shipped with three features:
1) Dynamically coalescing shuffle partitions simplifies or even avoids tuning the number of shuffle partitions. Users can set a relatively large number of shuffle partitions at the beginning, and AQE can then combine adjacent small partitions into larger ones at runtime.
(set spark.sql.adaptive.coalescePartitions.enabled=true)
2) Dynamically switching join strategies partially avoids executing suboptimal plans due to missing statistics and/or size misestimation. This adaptive optimisation automatically converts sort-merge join to broadcast-hash join at runtime, further simplifying tuning and improving performance.
(set spark.sql.adaptive.localShuffleReader.enabled=true)
3) Dynamically optimising skew joins is another critical performance enhancement, as skew joins can lead to an extreme imbalance of work and severely downgrading performance. After AQE detects any skew from the shuffle file statistics, it can split the skew partitions into smaller ones and join them with the corresponding partitions from the other side. This optimisation can parallelise skew processing and achieve better overall performance.
(set spark.sql.adaptive.skewJoin.enabled=true)
After upgrading to Spark 3.0 (read its list of newest features here) and enabling the AQE features, our job took 22 minutes and 37 seconds to complete.
Conclusion
This article explains the various strategies deployed for Spark optimisation in an environment where complex computations are involved.
Building up from using the standard Spark methodologies of serialisation, parquet file format and broadcasting to breaking the lineage, allowed us to achieve a significant reduction in job time execution.
We then improved upon that by using the right shuffle partition, and optimised our code, and making use of speculative execution further enhance the performance.
Lastly, we deployed a newer version of Spark that enabled us to use the AQE feature (unavailable in Spark 2.x) which further reduced the execution time.
I hope this article helps you on your Spark Optimisation journey, where a combination of the above strategies can provide the best performance with limited resources. After all, it has improved our performance by more than 70%. from 85 minutes, to just 22 minutes and 37 seconds.
Gaurav works as a Machine Learning and Data engineer in DBS. He’s always happy to have a conversation on LinkedIn.
References
https://spark.apache.org/docs/latest/sql-performance-tuning.html
Leveraging Spark Speculation To Identify And Re-Schedule Slow Running Tasks — Yuval Itzchakov’s Blog