There is a change in memory management model from Apache Spark v1.6.0, The old memory management model is implemented by StaticMemoryManager class, and now it is called “legacy”. It is disabled by default but can be turned on with spark.memory.useLegacyMode parameter. Thus, running the same code on Spark v1.5.x and Spark v1.6.0 would result in different behavior.
In this article I will be describing about new memory management model used in Apache Spark in v1.6.0+, which is implemented by UnifiedMemoryManager class.
Here’s the diagram of Spark Memory allocation inside of the JVM heap as per the new Memory Management Model :
There are three main memory regions in Apache Spark Unified Memory Manager :
- Reserved Memory : This is the memory reserved by the system and as of Spark v1.6.0+, the value is 300MB. That means 300MB of RAM does not participate in Spark memory region size calculations. Reserved memory’s size is hardcoded and cannot be changed in any way without Spark recompilation or setting spark.testing.reservedMemory, which is not recommended as it is a testing parameter not intended to be used in production. Reserved memory store lots of Spark internal objects. So, even if you want to give all the Java Heap for Spark to cache your data, you won’t be able to do so as this “reserved” part would still exist. Please note, Spark executor it will fail with “please use larger heap size” error message if you don’t give Spark executor at least 1.5 * Reserved Memory = 450 MB heap.
- User Memory : Lets say, you can store your own data structures in memory that would be used in RDD transformations or you want to write a user defined Spark Aggregation by using mapPartitions transformation maintaining hash table for this aggregation, all that will consume so called User Memory. User Memory is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. From Spark v1.6.0+ the size of this memory pool can be calculated as (Java Heap - Reserved Memory) * (1.0 - spark.memory.fraction), which is by default equal to (Java Heap - 300MB) * 0.25. For example, with 8GB heap, you would have 1973 MB of User Memory. Its completely up to you, how and what would be stored in this 1973 MB of RAM , Spark makes completely no accounting on what you do there and whether you take this size limit under consideration. Not cautiously considering this limit in your code might cause Out Of Memory error.
- Spark Memory : This is the memory pool managed by Apache Spark. Its size can be calculated as (Java Heap - Reserved Memory) * spark.memory.fraction, and default is (Java Heap - 300 MB) * 0.75. Thus, in relation to our last example of 8 GB heap, Spark Memory pool would be of 5919 MB. This whole pool is split into 2 regions namely Storage Memory and Execution Memory and the boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5. This boundary is not static, and in case of addition memory requirement by any region this boundary will move i.e. one region would grow by borrowing space from another one. Before discussing how these boundaries move let’s focus on Storage and Execution Memory:
1. Storage Memory : This pool is used for both storing cached data and for temporary space serialized data unroll. Also all the broadcast variables are stored there as cached blocks. “unroll” is essentially a process of deserializing a serialized data. Here’s how Apache Spark unroll a serialized data. Well, as you may see, it does not require much memory for unrolled block to be available. In case there is not enough memory to fit the whole unrolled partition it would directly put it to the drive if desired persistence level allows this. As of broadcast variables, they are stored in cache with MEMORY_AND_DISK persistence level.
2. Execution Memory : As the name suggests, this pool is used for storing the objects required during the execution of Spark tasks. For example,it is used to store hash table for hash aggregation step. also, it is used to store shuffle intermediate buffer on the Map side in memory. This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other tasks.
Now once we know what Storage and Execution memories are for, we can discuss on moving boundary between Storage Memory and Execution Memory. As explained above due to the nature of Execution Memory, blocks cannot be forcefully evicted from this pool, because this is the data used in intermediate computations and the process requiring this memory would simply fail if the block it refers to won’t be found. But it is not so for the Storage Memory as it is just a cache of blocks stored in RAM, and if we evict the block from there we can just update the block metadata reflecting the fact this block was evicted to HDD whenever a this block is required, Spark would read it from HDD (or recalculate in case the persistence level does not allow to spill on HDD). So, by this discussion it is clear that we can forcefully evict the block from Storage Memory, but cannot do so from Execution Memory.
Storage Memory pool can borrow some space from Execution Memory pool only if there is some free space available in Execution Memory pool. Whereas, there are two cases when Execution Memory pool can be extended to use Storage Memory pool :
- There is free space available in Storage Memory pool, i.e. cached blocks don’t use all the memory available in Storage Memory pool. Then it just reduces the Storage Memory pool size and proportionally increases the Execution Memory pool.
- Storage Memory pool size exceeds the initial Storage Memory region size and it has all this space utilized. This situation causes forceful eviction of the blocks from Storage Memory pool, unless it reaches its initial size.
Initial Storage Memory region size is calculated as (Java Heap - Reserved Memory) * spark.memory.fraction * spark.memory.storageFraction = (Java Heap - 300 MB) * 0.75 * 0.5 . Thus, for 8 GB heap this would result in 2959.5 MB of RAM.
This implies that if we use Spark cache and the total amount of data cached on executor is at least the same as initial Storage Memory region size, we are guaranteed that storage region size would be at least as big as its initial size. However, if your Execution Memory region has grown beyond its initial size before you filled the Storage Memory region, you won’t be able to forcefully evict entries from Execution Memory, so you would end up with smaller Storage Memory region while execution holds its blocks in memory.
I hope this article helped in understanding Apache Spark memory management model.