Stateful processing in Spark Structured Streaming — Troubleshooting Java OOM heap space error

Vindhya G
7 min readJan 6, 2023

--

In earlier days of working with spark structured streaming be it an application with a flatmapgroupwithstate or an application with just an aggregator, one of the major reasons an application (with a state or aggregator or stream stream join) would fail was due to Java Out Of Memory error. We were clueless as to why the application has memory crunch because size of shuffle, sort and the state in Spark UI (and logs) seemed to be much lesser than the total executor memory provided . So, we had to do trial and error to get to optimal size for the executor memory until it stopped failing after running for a while .

Instead, if we consider what is happening behind the scene w.r.t state , we can come up with a good estimation of memory being used and importantly can be used as a benchmark for higher event rate in future. And also this knowledge can help us optimise the app as well to use resources efficiently.

So in this article main focus is to estimate the memory required for an application (based on initial run) using the knowledge of how spark structured streaming application uses memory for state( and other execution purposes).

Stateful processing was introduced in spark structured streaming

  1. To store the state of previous batches internally instead of relying on external storage which is not efficient.
  2. To provide window functions and watermarking for aggregators.

Based on above reasons there are two types of states

Aggregators — State is created internally when we use groupBy to maintain aggregation over a window or till watermarking is complete.

Note: State is maintained even when there is no watermarking or window involved.

FlatMapGroupWithState — State created explicitly by users in an application and stored till explicitly removed.

It is important to remember state is maintained even if an application does not have explicit state created.

Where is the state stored?

Structured streaming supports two types of states storage. HDFS backed state(default) and RocksDB State.
It is called HDFS backed states because state is stored in-memory and backed up by HDFS (or RocksDB)

We need HDFS for two main reasons.

  1. State across executors. Spark uses RDD’s preferred locations feature to run on the same executor where previous batch had run for same data partition so that it can update state from memory directly but sometimes partitions from next batch end up in different executor. Than updating state would need loading it from different executor and that is done using files generated by HDFS or RocksDB
  2. Resiliency : It is nothing but checkpointing. If executors go down spark does not lose the state and can be loaded immediately in another executor to continue without data loss.

How is the state stored?

Memory : State is stored as a Key-Value map (Concurrent Hash Map ) for each partition. Since aggregation uses fixed number of partitions , every batch uses the same executor where previous partition was executed to utilise the in-memory state to update. This is done using preferred locations feature of RDD. Every new batch creates a new version of the map for each partition and updates the state after copying the previous version. This per partition key-value map is called state store. For resiliency purposes spark maintains atleast 2 versions(current batch and the previous) before deleting.

State store with the version is stored in a TreeMap as shown below

TreeMap[
Long, // version
ConcurrentHashMap[UnsafeRow, UnsafeRow]]

Storage : State is also stored in HDFS with the help of snapshot files and Delta files regularly being created. These files are used for two important purposes.
1. If due to some reason next partition of the batch cannot be sent to same executor as the previous batch it will be loaded from these files.
2. If executors are removed or even if the driver is removed due to some failure these files can be reloaded to new executors.

Where does executor maintain the state?

Now that we know we have a memory cache backed by HDFS or RocksDB it is important to understand how memory management is done in spark to estimate if memory provided for executors is enough considering we have a state to maintain in addition to other spark execution requirement like shuffle, aggregation, sorting etc.

Memory division in executor

Total Executor memory we provide per executor while running an application is used for multiple purposes within spark.

  1. Reserved Memory: 300MB is reserved memory for spark internal objects.
  2. User Memory: 40% is reserved for user specific objects. UDFs and other custom aggregations defined by user etc. is stored in this space.
  3. Spark Unified Memory: Remaining 60% is reserved for executon and storage. Execution includes space for shuffle, sort, aggregation etc.
    Storage involves caching of the data frame. Execution and Storage space is equally divided.

For ex: if we give 4G as the executor memory ,

Reserved Memory : 300 MB
User Memory : 0.4 *(4096MB — 300MB) ~1.5GB
Spark Unified Memory : 0.6 * (4096–300) ~ 2.2GB
So execution Memory 1.1 GB and Storage Memory 1.1GB

State is part of execution memory .

So total available memory for execution per executor for an app which has a state (be it an aggregator or flatmapgroupwithstate) is 1.1 GB if memory provided per executor is 4GB.

Note : if there is no dataframe cached or if the execution memory exceeds 50% of spark unified memory , cache space can be cleared by spark so worst case total available memory for execution is 2.2GB.

Estimation of required memory per executor

Let us take an example of an application which uses groupBy with processing window and sliding window with a foreachbatch which has two joins.
Below is the sample code of the application to give an idea of the flow


object joins extends App {
val spark = SparkSession.builder().appName("joins").config("spark.sql.execution.useObjectHashAggregateExec","true").master("local").getOrCreate()
val df = spark.readStream.option("maxFilesPerTrigger", 2).schema(schema).json("/users/vindhyag/public/events")
import spark.implicits._
val grouped = df.withWatermark("time","2 minutes").groupBy("key",window("time","10 minutes","5minutes"))

val query = grouped.writeStream.outputMode("append").option("queryName", "queryName").trigger(Trigger.ProcessingTime(triggerTime.getOrElse("1 minutes"))).foreachBatch(WriteMicroBatchToSinks _)

}

def WriteMicroBatchToSinks(microbatch: DataFrame, batchId: Long): Unit = {
val joinDf = microbathDf.join(hiveDf,"key","outer")
val join2 = joinDf(hiveDf2,"key","outer")
join2.write.format("kafka").option(KafkaBootStrapServers, KafkaBootStrapServers).
option("topic", "topicName").save()

}

Let us say we run above app with below configuration.

Executor Memory : 4GB
Executor Instances : 4
So total available memory is 16GB
Memory available for shuffle+state+cache = 0.6* 4 * (4096MB-300MB) ~ 9.1GB

Note:
1.
Memory available can vary slightly in MBs sometimes the way memory is allocated in JVM.
2. Assuming there are no dataframe caching being done its 9.1GB available for app. If there is dataframe cached, memory available for execution is 4.5GB — 9.1GB based on size of cached dataframe.

Below are the important metrics that needs to be monitored in spark yarn UI for the state.

Let us consider these are the metrics provided in Spark UI logs when we run the above app.

“numRowsTotal” : 367416,
“numRowsUpdated” : 117540,
“memoryUsedBytes” : 5341872854,
“customMetrics” : {
“loadedMapCacheHitCount” : 7694255,
“loadedMapCacheMissCount” : 1162929,
“stateOnCurrentVersionSizeBytes” : 4321112750
}

We can see state size is ~ 5GB

Sort size from SQL tab and shuffle read and write size from stages tab to get peak memory used in shuffle etc. You can see below that Sort is using around 3.8 GB of space and Exchange (shuffle) is using 126 MB.

So total memory required is approximately 8GB so application should be running fine.

Here we can clearly see that although it looks like memory used for state(~5GB ) is much lesser than the total memory provided(16GB) the total memory used (8GB) is close to total memory available(9GB).

Now what if we cache a dataframe in the app?
What if event rate doubles? We can re-estimate based on above metrics without having to do trial and error to provide optimal size.

Note:

  1. We need to monitor multiple batches and the increase in size of the state. There shouldn't be an exponential increase of size within short time or ever increasing state size. If that is happening we need to revisit state deletion condition to make sure state will not cause memory leak.
    Make sure setTimeDuration is well within memory requirement.
  2. If aggregation state size is ever increasing check if GC time is the bottleneck. Using G1GC garbage collection setting helps fasten the garbage collection and there by decrease state size quickly relinquishing memory for next batches of data.
  3. Stream-stream joins will have watermarking for every stream so state maintained will be higher. We will have to keep that in mind.
  4. More the state store size more space taken for sort and shuffling too.

Great articles to read more on each of the above mentioned concepts
1. https://medium.com/analytics-vidhya/spark-memory-management-583a16c1253f
2. https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-HDFSBackedStateStoreProvider.html#loadedMaps
3. https://medium.com/@chandanbaranwal/state-management-in-spark-structured-streaming-aaa87b6c9d31
4.https://www.waitingforcode.com/apache-spark-structured-streaming/state-store-101/read
5. https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit#heading=h.tbvulr8j6i

--

--