Decoding Spark Executor Memory Management : Tuning spark configs

Shivank Chaturvedi
Airtel Digital
Published in
8 min readFeb 28, 2024

--

This article deep dives into the internals of Apache Spark executor memory allocation and derives a heuristical formula for finding the right value of various spark configs. on production environment and how one can play with these configs to optimise spark application and leverage its full potential.

As a prerequisite you must be aware about the spark architecture which involves different yarn containers : Driver and Multiple executors which are launched by ApplicationManager on cluster to achieve parallelism .

Let’s deep dive into the container of executor and demystify it’s internal workings :

The above image shows different section of memory of spark.
Let’s understand the function of each section :

Yarn container consists of Overhead Memory and JVM heap

i) Overhead Memory :

Spark Overhead Memory is off heap memory (No GC cycles) whose primary function is to act as buffer memory during the shuffles, Network I/O, task scheduling and task execution contexts .

This memory is by default 384M or 10% of executor memory whichever is higher, however one can modify its value by using spark.memory.overhead param.

JVM heap of executor is divided into four parts : Reserved memory, Spark Unified Memory, User Memory and Off heap memory (if configured):

ii) Reserved Memory : It is the JVM memory reserved for spark framework internal computations and its value is 300M and is hardcoded in spark source code and can’t be changed unless, someone overrides the spark and create a custom spark .

iii) Off Heap Memory : The main purpose of this off heap memory is to reduce the GC pauses as it is off heap and no GC cycles run here. The storage blocks and execution blocks (discussed below) can be launched here. This memory is by default not available to the executor and but can be configured by the configuration .

spark.memory.offHeap.enabled=true

and its value can be configured by :

spark.memory.offHeap.size=1G

iv) User Memory : This memory is responsible for the direct operation on RDDs or any UDFs which are registered by users in the application are implemented here, this memory is also responsible maintaining dag lineages and metadata to provide lazy evaluations and fault tolerance to spark.

v) Spark Unified Memory : It is the actual memory used by spark to do in memory computations and caching data in memory or storing broadcast tables for improving joins.

Relation between spark unified memory and user memory

The relation between spark unified memory and user memory is determined by a config spark.memory.fraction which is defined as the ratio of spark unified memory and (jvm memory — reserved memory ) i.e sum of spark unified memory and user memory so,

spark.memory.fraction = (spark_unified_memory)/(spark_unified_memory + user_memory ) ……..(Eqn.1)

by default its value is 0.6

and

(jvm_memory — reserved_memory ) =(spark_unified_memory + user_memory ) ……..(Eqn.2)

Now this spark unified memory is further divided into spark unified storage memory and spark unified memory

vi) Spark Unified Storage Memory : It is that memory which is used by spark to cache the data frames in its memory for faster reads whenever required and re-use it multiple times as a part of different optimisations , this also stores table that is broadcasted on executor in broadcast join . Also when one uses cache() or persist() this memory is used (In persist operation if its configured to use both memory and disc)

vii) Spark Unified Execution Memory : This is the memory, where your actual data processing happens. In this memory whatever executor cores you have defined is launched and thus shared among executor cores and these cores is responsible for processing a task of stage on partition of dataframe.

The interaction between Spark Unified Execution Memory and Spark Unified Storage Memory :

The relation between Spark Unified Storage Memory and Spark Unified Execution Memory is defined by the config provided by spark spark.memory.storageFraction which is defined as :

(spark_unified_storage_memory)/(spark_unified_execution_memory +
spark_ unified_storage_memory)

or

spark.memory.storageFraction = (spark_unified_storage_memory) /
(spark_ unified_memory)

or

(1-spark.memory.storageFraction) =
(spark_unified_execution_memory) / (spark_unified_memory) ……..(Eqn.3)

By default this value is 0.5.

But wait There is a catch!!!

This value separation between Spark Unified Execution Memory and Spark Unified Storage Memory is not static, however it changes dynamically as introduced in newer versions of spark.

Lets see how it goes :

Initial condition: Both execution and storage memory is below its peak (i.e. can accommodate 4 parts, but only have 3).

Case 1 : When execution memory is peaked

If 2 new tasks are added then :

Spark will use first fill the execution memory and then uses the empty space of storage memory

But what if one more execution task was added, then:

Spark will evict the storage blocks and starts putting execution blocks

Case 2 : When Storage blocks are peaked :

If 2 new storage blocks come, then:

Spark will allocate the empty space of execution memory to storage blocks .

But what if only one additional block came, then :

Spark will not evict any of execution task so it will not be cached in memory and such storage task will either go to disk and serialised there if configured so.

Conclusion : Execution blocks are given preference by spark as they evict the storage blocks even from the storage memory however storage blocks can use Execution Memory only if it’s available .

Finally after deep diving and unlocking every aspect of executor memory, we have now reached a state to derive a heuristic formula to find the right configs for executor-memory, executor-cores,spark.memory.overhead etc. and how we can tune all these params according to our particular data use case.

But let's set the background first !

One can find many resources on internet which concerns about configuring these parameters however, few of these considers the real life scenario like :

  1. A cluster is a shared resource and many applications runs in parallel on them and in such case dynamic executor allocation is enabled (i.e executors are allocated only when application demands them as there are several jobs and stages in an application and not every stage or job requires same number of executors, so providing fixed number of executors is not a good idea because it chokes other application ). This dynamic allocation is handled by ResourceManager on request of driver.
  2. Impact of playing with params discussed above is not very clear (this is subjective).

Ready Get Set and Go!!!!

Basic intuition is doing what we are good at : The Reverse Engineering

We established above that a partition of any stage comes to a particular core to be processed i.e. task is applied on partition by core where thread is launched .

Let’s take the size of maximum partition that came on during the application runtime .

Call it max_partition_size.

so spark unified execution memory will be :

spark_unified_execution_memory= (number of cores)*(max_partition size) …… (Eqn.4)

now we know ,

from ……..(Eqn.3)

(1-spark.memory.storageFraction) = (spark_unified_execution_memory)/ (spark_unified_memory)

Also,

spark_unified_memory= (spark_unified_execution_memory)/(1-spark.memory.storageFraction)…..(Eqn.5)

substituting value of spark_unified_execution_memory from (Eqn.4) in (Eqn.5)

We get

spark_unified_memory = (number_of _cores)*(max_partition_size)/(1-spark.memory.storageFraction) …..(Eqn.6)

From (Eqn.1)

spark.memory.fraction = (spark_unified_memory)/(spark_unified_memory + user_memory )

putting value of spark_unified_memory from (Eqn.6) in (Eqn.1) we get

spark_unified_memory + user_memory = (number of cores*max_partition size)/(1-spark.memory.storageFraction * spark.memory.fraction) …..(Eqn.7)

Now From …..(Eqn.2) we know,

(jvm_memory — reserved_memory ) =(spark_unified_memory + user_memory )

substituting value (spark_unified_memory + user_memory ) in (Eqn.7)

we get ,

jvm_memory — reserved_memory = (number_of_cores*max_partition_size)/(1-spark.memory.storageFraction * spark.memory.fraction)

We know the value of reserved_memory is fixed i.e 300M so,

jvm_memory = (number of cores*max_partition_size)/(1-spark.memory.storageFraction * spark.memory.fraction) + 300M ….. (Eqn.8)

Note : This jvm_memory is executor-memory config but for our calculation we will pass yarn_container_memory as final executor_memory to take as an buffer. Although overhead_memory is not a part of executor_memory.

We know yarn container consists of overhead_memory ,offheap_memory and jvm_memory So, finally we have arrived to find

yarn_container_memory = (number of cores*max_partition_size)/(1-spark.memory.storageFraction * spark.memory.fraction) + 300M +overhead_memory +offheap_memory ….. (Eqn.9)

Convert it to nearest GB and we have successfully found the relation.

This relation will determine the size of container that is to be launched by yarn to process a partition of max_partition_size

Lets put the defaults to find the simplified expression:

spark.memory.storageFraction=0.5 , spark.memory.fraction=0.6 , number of cores =4, offheap_memory=0 (disabled), overhead_memory is 10% of jvm_memory

Industry standard is max 5 core per executor (I will also explain in my next article why should not take all cores available in node in one executor or why one should not open 1 core per executor)

Putting all the values we get,

yarn_container_memory= 13.34*max_partition_size +300M + overhead_memory

now overhead_memory is 10 % of jvm_memory so and since we are considering yarn_container_memory as executor_memory so,

executor_memory= 14.667*max_partition_size +330M

Approx,

executor_memory= 15*max_partition_size +330M

or to further simplify,

executor_memory= 16*max_partition_size

Practical Example :

If we need to process (just read it in memory) 100GB file stored in HDFS then our configs. should be

100 GB file in hdfs so, each partition size will be 128M (HDFS default block size)

so executor_memory= 16*128 = 2048M (take 2.5GB or 3GB)

executor_cores = 4

No. of executor to which application can consume if enough resources are available :

No. of partitions = 10GB /128M =80

Each executor cores =4

so total executor = 80/4 = 20 executors.

Hence in peak application will be running 20 executor each having 4 cores and 2.5GB of executor memory.

Key Considerations !!! :

  1. This will provide the optimal config and generate optimal results only if you have done all your data related optimsations (Ex . Broadcast Join vs Shuffle Sort Join or groupBy() Vs reduceBy() or using spark encoders or handling skews(Again, I will write it in next article)
  2. This doesn’t consider num-executor in play as in dynamic resource allocation it should be ResourceManager(Yarn,Kubernetes,Mesos) who should be doing according to workload of stage.
  3. This formula will uses the size of maximum partition in any stage because it will be the limiting factor for any executor resource .
  4. You can always tweak with configs discussed above and this will provide you the configs to start with , for general purposes.

That's it , Thanks for your time ,

I hope i was able to generate some insights by this and please let me know if you find any error in approach as it is based on my understanding and approach, discussions are most welcome :)

write me at : shivank9753@gmail.com

Jai Hind ! Happy Coding !

--

--