Decoding Memory in Spark — Parameters that are often confused

Sohom Majumdar
Walmart Global Tech Blog
6 min readJan 4, 2021
Photo by Chris Ried on Unsplash

Apache Spark™ ️is one of the most active open-source projects out there. It keeps on improving on its previous models with each release, and that leads to a plethora of confusing parameters and configurations which at times might feel similar, but in reality, has different use-case. In this post, we take a look at commonly misunderstood parameters in Spark concerning memory management.

Spark Memory Blocks — Quick Recap

Spark was developed using Scala as the primary language. Hence, operations in Spark happens inside a JVM, even if the user’s code is written in a different language like python or R. The Spark runtime segregates the JVM heap space in the driver and executors into 4 different parts:

  1. Storage Memory — JVM heap space reserved for cached data
  2. Execution Memory — JVM heap space used by data-structures during shuffle operations (joins, group-by’s and aggregations). Earlier (before Spark 1.6), the term shuffle memory was also used to describe this section of the memory.
  3. User Memory — For storing the data-structures created and managed by the user’s code
  4. Reserved Memory — Reserved by Spark for internal purposes.

In addition to JVM Heap, there are two more segments of memory which are accessed by Spark

  1. Off-Heap Memory — This segment of memory lies outside the JVM, but is used by JVM for certain use-cases (e.g. interning of Strings). Off-Heap memory can be used by Spark explicitly as well to store serialized data-frames and RDDs.
  2. External Process Memory — Specific to PySpark and SparkR, this is the memory used by the python/R process which resides outside of the JVM.

There are good articles (see References) which discuss in detail about these sectors of memory.

But now and then, there are Spark configurations that seem to be the same. But a deeper understanding of those configs helps to decode what each of them means, and what is being addressed by tweaking the values for the same.

Spark Storage Memory

spark.storage.memoryFraction vs spark.memory.storageFraction

Both these parameters set the amount of JVM space to be used as a Storage memory (for cached data). But it is unclear which parameter should be set.

Spark made a significant overhaul in the handling of Storage and Execution space in version 1.6. The legacy mode of handling is known as StaticMemoryManager, and the new one is UnifiedMemoryManager.

Spark Static Memory Manager

In the StaticMemoryManager, the storage and execution memory (or shuffle memory in this case) is fixed in the configuration. The configuration parameters spark.storage.memoryFraction and spark.shuffle.memoryFraction governs the size of storage and shuffle memory respectively. These parameters are deprecated from Spark 1.6, and setting these parameters does not have any effect unless spark.memory.useLegacyMode is set to true.

Spark Unified Memory Manager

In the UnifiedMemoryManager, the boundary between storage memory and execution memory is not fixed. Hence, storage blocks can occupy parts of execution memory if it is free and vice-versa. The parameter spark.memory.fraction determines the total memory dedicated to Spark (for both shuffle and storage). The amount of storage memory which is protected from eviction is governed by spark.memory.storageFraction.

TL;DR: It is preferable to use spark.memory.fraction and spark.memory.storageFraction to configure the Spark memory segments.

Off-Heap Memory

spark.executor.memoryOverhead vs. spark.memory.offHeap.size

JVM Heap vs Off-Heap Memory

Although most of the operations in Spark happens inside the JVM and subsequently uses the JVM Heap for its memory, each executor has the ability to utilize an off-heap space for certain cases. This off-heap space lies outside the JVM space and is generally accessed via sun.misc.Unsafe APIs. The off-heap memory is outside the ambit of Garbage Collection, hence it provides more fine-grained control over the memory for the application developer.

Spark uses off-heap memory for two purposes:

  • A part of off-heap memory is used by Java internally for purposes like String interning and JVM overheads.
  • Off-Heap memory can also be used by Spark explicitly for storing its data as part of Project Tungsten [5].

The total off-heap memory for a Spark executor is controlled by spark.executor.memoryOverhead. The default value for this is 10% of executor memory subject to a minimum of 384MB. This means, even if the user does not explicitly set this parameter, Spark would set aside 10% of executor memory(or 384MB whichever is higher) for VM overheads. The amount of off-heap memory used by Spark to store actual data frames is governed by spark.memory.offHeap.size. This is an optional feature, which can be enabled by setting spark.memory.offHeap.use to true.

Before Spark 3.x, the total off-heap memory indicated by memoryOverhead also included the off-heap memory for Spark data-frames. So while setting the parameter for memoryOverhead, users have to also account for the Spark off-heap memory usage by data-frames. Spark 3.0 makes the Spark off-heap a separate entity from the memoryOverhead, so users do not have to account for it explicitly during setting the executor memoryOverhead.

Off-Heap Memory Allocation in Spark

TL;DR: For Spark 1.x and 2.x, Total Off-Heap Memory = spark.executor.memoryOverhead (spark.offHeap.size included within)
For Spark 3.x, Total Off-Heap Memory = spark.executor.memoryOverhead + spark.offHeap.size

Python Memory

spark.python.worker.memory vs spark.executor.pyspark.memory

While executing PySpark, both these parameters seem to limit the memory allocated to python. But in reality, they limit very different sections of the memory in the executor. In PySpark, two separate process runs in the executor, a JVM that executes the Spark part of code (joins, aggregations and shuffles) and a python process that executes the user’s code. The two processes communicate via a Py4J bridge that exposes the JVM objects in the python process and vice-versa.

Configuring Python worker memory

The parameter spark.python.worker.memory controls the amount of memory reserved for each pyspark worker beyond which it spills over to the disk. In other words, it is the amount of memory that can be occupied by the objects created via the Py4J bridge during a Spark operation. In case this parameter is not set, the default value is 512MB.

Introduced in Spark 2.4, spark.executor.pyspark.memory controls the actual memory of the python worker process. Each python worker process is set the limit of the memory space it can address using the system.RLIMIT_AS property in python.

In case the python worker memory is not set via spark.executor.
pyspark.memory
, the python worker process can potentially occupy the entire node’s memory. Since this portion of memory is not tracked by YARN, this might lead to over-scheduling in the node (because YARN assumes the memory occupied by the python worker to be free). This can lead to page-swaps in the memory and slow down all the YARN containers on that node.

TL;DR: spark.python.worker.memory limits the memory in JVM for Python objects, whereas spark.executor.pyspark.memory limits the actual memory of the Python process

Total Container Memory

Total Memory Request to YARN

As evident in the diagram, the total memory requested by Spark to the container manager (e.g. YARN) is the sum of the executor memory, the memory overhead and the python worker memory limit. This ensures proper resource scheduling of the executors.

References

  1. Spark Configuration
  2. Tuning Spark: Memory Management Overview
  3. Spark Memory Management
  4. Apache Spark Memory Management
  5. Project Tungsten: Bringing Apache Spark Closer to Bare Metal

--

--