State management, Stateful & Stateless aggregations on unbounded data in Structured Streaming (1/3)

Shiva Garg
10 min readAug 8, 2020

--

Structured Streaming has become the de facto way of stream processing in Apache Spark since with its release 2.2. In this series of blog posts, we will start with a high-level introduction to Structured Streaming and then plunge & delve deeper into the process of doing and understanding stateful and stateless aggregations, state management on unbounded data in Spark Structured Streaming applications.

This blog post is divided into 3 parts. Each part will focus on aggregation over different time components.

Part 1, the first in the series, will discuss State management and Stateful aggregations on event time and corresponding challenges.

Part 2, the second in the series, describes the Stateful aggregation on Ingestion time and Processing time.

Part 3, the last in the series, focuses on the Stateless aggregations and Session window using custom State.

Introduction to Spark Structured streaming

Streaming data pipelines run continuously to consume the data from a continuous stream and process it in real-time using a processing engine like Apache Spark, Apache Flink, Storm, Kafka Streams.

Apache Structured streaming is the first stream processing engine built on top of Spark SQL Engine. That means you can express the streaming computation in the same way as you would express a batch computation on static data. In Structured Streaming, It models stream as an infinite table, rather than a discrete collection of data.

Apache Spark 2.x follows the streaming first model and considers Batch jobs as the special use case of Streaming jobs to run the streaming query only once.

In Apache Spark 1.x, Streaming was all about faster processing of the batches. So the entire streaming methodology got changed as we hover from Spark 1.x to Spark 2.x.

At high-level, Simple Spark structured streaming applications consist of 4 parts:

  • Data Source: Apache Spark has extended its existing DataSource API to support the streams with new methods readStream()and writeStream(). Data read from various DataSources like Socket, Apache Kafka, Redis, Files are represented using DataFrame API. In Streams, Schema Inference is not supported and Schema needs to be given by DataSource connector.
  • Transformation: Structured Streaming is built on top of Spark SQL engine and use high-level API like Dataframe and Dataset to represent data. That means computation(transformations) on streams almost remains the same as compared to batch computation on DataFrame and DataSet. There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets like distinct, sorting, etc.
  • Data Sink: It defines when and what gets written out to the external storage. External storage is defined by the “format” method like Kafka, Redis, console, etc. When is defined by Triggers, It decides how frequently data is consumed from the stream and processed, By default is Trigger.ProcessingTime(0) aka ASAP. What is defined by not id the OutputMode among Append, Complete, Update. Data Sink can be created using the DataStreamWriter returned through Dataset.writeStream()
  • Streaming Query: Query running continuously as the new data arrives in the continuous stream. Always running streaming query can be started using DataStreamWriter.start().awaitTermination().

Stateful Aggregations in Spark Structured Streaming

Its time to devel deeper into the process of doing and understanding state management, stateful aggregations, fault tolerance using checkpointing.

A streaming application is an always running application and aggregation on the unbounded data is always directly linked to events captured till current Time `t`. Time in the streaming application is a way to correlate different events in the stream to extract some meaningful insights. So in order to understand the behavior of the streaming application over time, we need to take snapshots of the stream in various time components.

In Structured Streaming, When we say aggregate data of the last 15 seconds, then the question arises 15 seconds of what time?

Realtime Streaming data pipeline with various snapshot over time
Realtime Streaming data pipeline with a various snapshot over time

Aggregations on Event Time:

It signifies the time at which events occurred at the source like IoT devices, log events, etc. All the events have embedded timestamp information which indicates the event Time. Structured Streaming(Spark 2.x) has native support to do aggregation on event time.

When we aggregate the data of the last 15 seconds on event time, It means to collect all the records that occurred at the source in the last 15 seconds and then apply aggregations;

Use case: Find the maximum value for each stock listed in NASDAQ within the tumbling/fixed event window of 10 seconds? In this case, we need to consider the event time (time at which price changes at source), no matter when stock information arrives at the ingestion and processing layer. Here we define the window on time at which event occurred (event time).

Challenges:

- Late Events: Events occurred at the source may get ingested into the processing engine arrives late due to network latency or system outage. How does Spark know all the events of the last 15 seconds event time window have reached into the processing system? Till how long Spark needs to wait for late events? By default, Spark waits for late events indefinitely.

- State management: As discussed, In case of late events, Spark waits indefinitely for events to come. For a specific event time window (w) of 15 seconds (2020–05–09 12:15:30–2020–05–09 12:15:45) spark will collate all the events and apply aggregation. By default, To accommodate late events, the spark will remember the state of the window(w) forever in the memory. As time passes, the number of window increases and so resource usage; It may lead to Out of memory exception since resources are limited;

- State Recovery: Since the spark maintains the state of windows to handle the late events in the memory. State recovery is the other challenge if the streaming job got failed due to any reason. The state need to be fault-tolerant.

Watermarks

By default, spark remembers all the windows forever and waits for the late events forever. Watermarks are the solution to forever state management of the windows to accommodate late events. It provides a mechanism to control the state in a bounded way. It controls the state to grow indefinitely.

Watermarks define the threshold for late events i.e How long spark should wait for late events? Once the threshold passes, It deletes the state of the expired windows and discards the late events corresponding to expired windows.
Example :
- Consider the watermark is of 5 minutes for a particular Streaming Query;
- Spark will see the event time of the latest event received at the processing layer.(Say, 2020–05–09 14:15:00).
- Spark will only accept the events which are later than 2020–05–09 14:10:00 (minus 5 minutes to latest timestamp). All the events prior to 14:10:00 will be discarded and state of windows prior to 14:10:00 will be deleted;

** Watermarks are highly recommended with stateful aggregations otherwise resource usage will shoot upwards and may lead to breaking the system.
** Watermarks are supported only with Update Output mode.

Checkpoints

Streaming applications are always running and most of them are maintaining some kind of state. Preserving the state across failures/restarts becomes a vital part of State management. The checkpoint is the solution to the State recovery and implemented using WAL (write-ahead logs).

The checkpoint is achieved by writing the state of the streaming query into the HDFS folder. Once the query restarts, It reads the HDFS folder to recover the state of the query before accepting new data from the data source stream.

Checkpoint directory (/hadoop/checkpoint/) consists of following subfolders :

Checkpoint Directory folder structure
  • Offsets: Indicates to what point data has been consumed for processing from the data source stream. Example: In the case of Kafka source, It contains the {PartitionId: Offset} details for the Kafka topic. For each micro-batch (created for each trigger), the new offset file is created. The below figure describes 3 offset files corresponding to 3 micro-batches.
Offset file
  • Commits: Indicates to what point the processing engine has processed the data. Corresponding to each “offset” file there will be a “commit” file once the data is processed. The below figure describes 3 commit files corresponding to the offsets file.
Commits files
  • Source: It contains information about the data source; Example: Location of the file, Kafka topic name, etc.
Source file
  • State: As the name indicates, this folder contains the state of each computed partition in the encoded format (LZ4). If Spark has 200 partitions, there would be 200 directories under the state folder.
State files
  • * Not all the sources in structured streaming support checkpointing. Checkpointing sometime may need to replay some of the data from the source in order to recover the state. In those cases, we need a source that supports that kind of functionality. The socket stream doesn't support that.

Event time aggregation | Query plan

Let’s take one more step deeper and analyze Spark streaming query’s logical and physical plan on aggregation over event Time.

Structured streaming built on top of SQL engine and use all optimization packages which come with Dataframes and Dataset API.

Below is the code snippet to describe aggregation on the event stream:

  • Use files as a continuous stream
  • Do aggregation to find the max value of Stock on event time over 10 seconds tumbling window with Watermarks threshold of 500 milliseconds to handle late events and control the state in a bounded way.
  • Streaming Query with checkpoints to preserve the state from failure/restarts.

Full code can be found on GitHub

case class Stock(time: Timestamp, symbol: String, value: Double)val stockSchema = StructType(Array(
StructField("time", TimestampType),
StructField("symbol", StringType),
StructField("value", DoubleType)
))
// Datasource API with Files DataSource connector
val
fileStream = spark
.readStream
.option("maxFilesPerTrigger", 1)
.schema(stockSchema)
.csv("src/main/resources/stocks")
.as[Stock]
/**
* 10 seconds window on the event time.
* Watermarks to handle late events and to control the state.
*/

val windowedMax = fileStream
.withWatermark("time", "500 milliseconds")
.groupBy(
col("symbol"),
window(col("time"), "10 seconds")
).max("value")
/**
* Data Sink + Streaming query
* Checkpointing to recover state
*/

windowedMax
.writeStream
.format("console")
.option("checkpointLocation","/hadoop/stocks/checkpoint")
.outputMode(OutputMode.Update())
.option("truncate", "false")
.start()
.awaitTermination()

Running the code :

Each trigger(micro-batch) will read only one file from the source directory (src/main/resources/stocks) & process the records.

Spark outputs below results which indicate the start of the window :

-------------------------------------------
Batch: 0
-------------------------------------------
+---------------------------------------------+----------+
|window |max(value)|
+---------------------------------------------+----------+
|[2020-10-10 10:00:00.0,2020-10-10 10:00:10.0]|200.0 |
+---------------------------------------------+----------+

Later on, a new file(2.csv) gets ingested into the source directory (src/main/resources/stocks) with the following record

2020-10-10 10:00:07,aapl,500

Then Spark outputs below result, It updates the window between [2020-10-10 10:00:00.0,2020-10-10 10:00:10.0]

-------------------------------------------
Batch: 0
-------------------------------------------
+---------------------------------------------+----------+
|window |max(value)|
+---------------------------------------------+----------+
|[2020-10-10 10:00:00.0,2020-10-10 10:00:10.0]|500.0 |
+---------------------------------------------+----------+

Later on, a new file(3.csv) gets ingested into the source directory (src/main/resources/stocks) with the following the late event. Spark will discard this event because It has breached the watermark threshold i.e. Latest record timestamp minus(-) Watermark time (10:00:07 – 500 milliseconds)

2020-10-10 10:00:05,aapl,1000

Spark Catalyst’s Physical plan :

Look at the highlighted part of the Physical plan, It describes the usage of physical operators like “StateStoreSave”, ”StateStoreRestore” and “EventTimeWatermark” which indicates that the streaming query is stateful and using watermarks.

High-Level operators are mapped to Logical Operators which further mapped to the Physical operators in Spark Streaming query to create StateStoreRDD.

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@b79b293
+- *(4) HashAggregate(keys=[symbol#1, window#17-T500ms], functions=[max(value#2)], output=[symbol#1, window#11-T500ms, max(value)#16])
+- StateStoreSave [symbol#1, window#17-T500ms], state info [ checkpoint = file:/Users/shigarg1/hadoop/stocks/checkpoint/state, runId = a38c585d-5219-44f8-8102-dfe6d60c2720, opId = 0, ver = 1, numPartitions = 1], Update, 1602320400500, 2
+- *(3) HashAggregate(keys=[symbol#1, window#17-T500ms], functions=[merge_max(value#2)], output=[symbol#1, window#17-T500ms, max#53])
+- StateStoreRestore [symbol#1, window#17-T500ms], state info [ checkpoint = file:/Users/shigarg1/hadoop/stocks/checkpoint/state, runId = a38c585d-5219-44f8-8102-dfe6d60c2720, opId = 0, ver = 1, numPartitions = 1], 2
+- *(2) HashAggregate(keys=[symbol#1, window#17-T500ms], functions=[merge_max(value#2)], output=[symbol#1, window#17-T500ms, max#53])
+- Exchange hashpartitioning(symbol#1, window#17-T500ms, 1)
+- *(1) HashAggregate(keys=[symbol#1, window#17-T500ms], functions=[partial_max(value#2)], output=[symbol#1, window#17-T500ms, max#53])
+- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(time#0-T500ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(time#0-T500ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(time#0-T500ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(time#0-T500ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(time#0-T500ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(time#0-T500ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(time#0-T500ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(time#0-T500ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#17-T500ms, symbol#1, value#2]
+- *(1) Filter isnotnull(time#0-T500ms)
+- EventTimeWatermark time#0: timestamp, interval 500 milliseconds
+- LocalTableScan <empty>, [time#0, symbol#1, value#2]

Summary

Structured Streaming has native support of aggregations over different time components i.e. Event time, Ingestion time, and processing time. While Spark 1.x supports aggregation only on Processing time.
it is highly recommended to use watermarks with event-time aggregations to accommodate late events and control the state in a bounded way since it can lead to memory overflow. The checkpointing is mandatory to make the always running Streaming query a fault-tolerant.

--

--

Shiva Garg

Senior Data Engineer , Cloud Architect , Machine Learning Enthusiast