Tuning Apache Spark

Shishir Chandra
Analytics Vidhya
Published in
7 min readDec 24, 2020

In modern day data processing architectures developing a pipelines is fairly simple thanks to feature packed Spark and its APIs. But the fun and the challenging part is to optimise the system to achieve the best performance and faster processing throughput. Optimising things in a distributed systems is a tricky part because its not always a function of linear improvements by adjusting the configurations. The most important thing before working on optimisation is to know what the system you are building is primarily intended to do. Optimisation ain’t no fairytale as it comes at a cost of settling for the trade offs those are not going to impact the final goal of your system significantly against the non negotiable ones. Bottomline is you got to know what you are optimising for. Let’s talk about some of the common tuning opportunities for making the spark processing performant.

Executors

First things first, well this is the most exploited and misunderstood part of the spark processing. Tuning this up is the most low hanging fruit one could eye when given a task of optimising Apache spark application.

Total Cores

More the better so allocate what all we have got, seems obvious, right? probably not! This is the number of system core an executer is allocated with to execute its tasks. While coming up with an number for this you must consider:

  1. More the tasks getting executed in parallel, more the garbage collection overhead.
  2. There would be a few cores occupied by the OS processes and other HDFS deamons on all the nodes.

Total Executors

This is the number of the java process instance running or submitted by the resource manager. Given you have X cores available in a cluster and you wish to have Y cores per executor, you end up having X/Y executors. While submitting the task you must consider an executor needs to be allocated to the yarn application master.

Executor Memory

Well this is the available memory you have on the cluster(sum of the available memory across nodes minus the memory needed for operating system and other file system deamons). Say you have 11 GB on each node and have 5 executors running per node, your OS and other process may need say 1GB of memory. You can allocate 10/5= 2GB memory per executor. You don’t effectively get entire 2 GB per executor for your processing though as around 10% of the total executor memory is utilised by Yarn.

Garbage Collector

Garbage Collection in Spark Streaming is a crucial point of concern since it runs in streams or micro batches. Particularly in case of streaming it can stressfully impact the standard Java JVM garbage collection due to the high number of objects processed during the run-time. This causes frequent pauses and thereby increase the latency of the real-time applications. An option here would be using concurrent mark sweep (CMS) garbage collector as an effective step for both the driver and the executors, which reduces pause time by running garbage collection concurrently with the application. Limiting the numbers of objects created is most important in reducing the GC overheads and achieving greater performance.

For higher heap sizes and even better performance for high throughput and low latency needs G1GC garbage collector is recommended. However, when the G1 GC collector tries to collect garbage for certain regions, it fails to find free regions which it can copy the live objects to. This situation is called Evacuation Failure and often leads to full GC. And apparently, full GC in G1 GC is even worse than in Parallel GC, so we must try to avoid full GC in order to achieve better performance.

To avoid full GC in G1 GC, there are two commonly-used approaches:

  1. Decrease the InitiatingHeapOccupancyPercent value ( default is 45), to let G1 GC starts initial concurrent marking at an earlier time, so that we have higher chances to avoid full GC.
  2. Increase the ConcGCThreads value, to have more threads for concurrent marking, thus we can speed up the concurrent marking phase. Note that this option could also take up some effective worker thread resources, depending on your workload CPU utilization.

Similar to CMS collector G1 starts concurrent processing depending on how much of the heap is filled.

Concurrent Task Execution

This is an important area to focus when you are interested in your application to perform tasks faster and making the processed data available in the quicker turnaround timelines. You got to execute things in parallel in order to ensure you are extracting the best out of your compute capacity and ensuring your cluster is optimally utilised throughout to ascertain you are not going easy on the computation cost. A fact to be considered here is to have optimal partition sizes so that they are neither too fat for not allowing you to execute things in parallel due to not enough resources or too small which end up creating a lot of small tasks and costs heavily on the IO.

spark.sql.files.maxPartitionBytes and spark.sql.files.openCostInBytes are the parameters that needs to be configured in order to have spark figure out the ideal partition size on the cluster.

Shuffle property for partition size — spark.sql.shuffle.partitions property determines the size of the partition at every shuffle operation ideally the size should be directly proportional to the size of the file.

Serialization

Serialization plays an important role in optimising any distributed system and it holds true for spark performance tuning as well. Serialization directly causes slowness in the systems if the type of serializer chosen takes longer to compress and convert the objects. Its mostly a choice between ease vs performance when we have to choose a particular serialization formats. Spark supports Java and Kyro serializer and both of them coexists for reasons.

Java Serializer — The default serializer used by Apache Spark. It uses Java’s object output stream framework. The class needs to be implemented with java.io.Serializable. It provides a good flexibility and can work for most of the formats seamlessly. The trade off is its slower.

Kyro Serializer — Works around 8x faster than the default java serializer but the down side is its does not support all the data formats and requires you to register your classes before hand. To override the default serializer-

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Note, Appropriate caching can help in avoiding the deserialization overheads.

Executor Dynamic Allocation

Spark is equipped well with the elastic scaling that dynamically adds or removes executors based on the need and availability of resources in the cluster for better optimisation of compute capacity. spark.dynamicAllocation.enabled is the property that allows us to enable the dynamic allocation of resources. Dynamic allocation kicks in when the spark context is initialised and there are two policies namely scale up which requests new executors when there are pending tasks and increases the number of executors and scale down which removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds. Note that spark external shuffle service should be enabled to enable dynamic allocation, this is controlled by spark_shuffle_service_enabled property.

Processing Close to Storage

As a thumb rule for building distributed systems the processing is most desired to be closest to storage for low read latencies and lesser expensive disk IO and reducing the data movement. The locality configurations in Spark help in determining the right most pick out of the five supported settings.

  • PROCESS_LOCAL — data and processing are localized on the same JVM
  • NODE_LOCAL — data and processing are in the same node but on different executor. This level is slower than the previous one because it has to move the data between processed
  • RACK_LOCAL — data is located in other node than processing but both nodes are on the same rack. Obviously here the data is moved through the network.
  • NO_PREF — means no locality preference.
  • ANY — data is elsewhere but not on the same rack.

However, it is important to know that the locality levels works like an hierarchical precedence so if there is no data fetched at a level spark would look for the lower locality level after waiting for the timeout window set by spark.locality.wait setting.

Fair Scheduling

By default Spark executes tasks in FIFO sequence but in order to achieve better parallelism for independent tasks its recommended to change the schedular setting to fair scheduling for tasks to be picked and executed in round robin manner. Create a poll of tasks based on the them being mutually exclusive.

Use Broadcast Variables and Cache

Broadcast variables in Spark is a way for sharing read-only variables across executors. Without broadcast variables these variables would be transported to each executor for every transformation and action, which would cause network overhead. However, with broadcast variables, they are shipped once and are cached for future reference at all executors.

Keep eying for the caching needs of data frames wherever feasible and optimised as this would lower down the GC overhead and memory utilisation.

Conclusion

Reiterating that knowing the eventual goal and the most desired performance properties of the system you are building is important. It is about knowing what you want and what you can compromise. At times, in order to achieve a performance goal you need to trade off another one.

--

--

Shishir Chandra
Analytics Vidhya

Distributed computing enthusiast, data engineer, system architect, cloud computing