EXPEDIA GROUP TECHNOLOGY — DATA
Apache Spark Structured Streaming — Watermarking (6 of 6)
Watermarking in Spark Streaming to handle late data
In previous blogs, we discussed input sources, sinks, checkpoints, triggers and operations. In this post, we discuss watermarking in Apache Spark™️ Streaming.
You may also be interested in some of my other posts on Apache Spark.
- Apache Spark Structured Streaming — First Streaming Example
- Apache Spark Structured Streaming — Input Sources
- Apache Spark Structured Streaming — Output Sinks
- Apache Spark Structured Streaming — Checkpoints and Triggers
- Apache Spark Structured Streaming — Operations
- Deep Dive into Apache Spark DateTime Functions
- Working with JSON in Apache Spark
- Deep Dive into Apache Spark Window Functions
- Deep Dive into Apache Spark Array Functions
- Start Your Journey with Apache Spark
Let’s set-up a streaming application by reading data from a socket to understand the need and concept of watermarking with an example.
Setup
We read data from a socket (127.0.0.1:9999
) and pass along data which is #
separated. Then we create a DataFrame with 2 columns event_timestamp
and val
. More details on reading data from a socket can be found in this blog.
Now let’s open port 9999
and send some data, do the window aggregation and display the results on console. We kept the size of the tumbling window as 5 minutes
. More details on window
operations can be found in this blog.
The first 2 records are from 1st Jan 2021
while the 3rd record is from the 31st Dec 2020
. Let’s consider the third record late data. (Please be aware that the definition of late data varies from application to application). With window operations, a streaming application will process all data and put late data into an older window. At the same time, it maintains state for the old data which might not be required anymore. We can use watermarking
to handle late data so that the system discards records automatically.
Now let’s add withWatermark()
to our application and see how it works. We set watermark
to delayThreshold = 10 minutes
as well as windowDuration = 5 minutes
using groupBy()
. Watermark is set to max event time seen so far — delayThreshold
. Let’s understand with an example.
Open the port 9999
, start our streaming application and send the same data again to the socket. Sample data can be found here. Let's discuss each record in detail.
- First record :
2021–01–01 10:06:00#2
It will be processed since this is the first record.
Output:
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+---+
|window |sum|
+------------------------------------------+---+
|[2021-01-01 10:05:00, 2021-01-01 10:10:00]|2 |
+------------------------------------------+---+
Note: Batch:0
was empty.
Since size of the window is 5 minutes
we have a window value of [2021–01–01 10:05:00, 2021–01–01 10:10:00]
2. Second record : 2021–01–01 10:13:00#5
We have only processed one record so far, which means max event time = 2021–01–01 10:06:00
. Hence, the watermark (max event time — delayThreshold
) is 2021–01–01 09:56:00
. For a window of size 5 minutes
, the watermark window will be [09:55:00, 10:00:00]
. So any data earlier than the start of the watermark window (2021–01–01 09:55:00
) will be discarded. However, the event time (2021–01–01 10:13:00
) for the second record is greater than the start time of the watermark window, so it will be processed.
Note: in the watermark window, I only included time because date will always be 2021–01–01
in our dataset.
-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+---+
|window |sum|
+------------------------------------------+---+
|[2021-01-01 10:10:00, 2021-01-01 10:15:00]|5 |
+------------------------------------------+---+
3. Third Record: 2020–12–31 10:06:00#1
Our system’s current state is
- max event time:
[2021–01–01 10:13:00]
- watermark:
[2021–01–01 10:03:00]
- watermark window:
[2021–01–01 10:00:00, 2021–01–01 10:05:00]
For this event, the timestamp is less than the start of the watermark window, hence it will be treated as late data and discarded. We will get an empty batch.
-------------------------------------------
Batch: 4
-------------------------------------------
+------+---+
|window|sum|
+------+---+
+------+---+
4. Fourth Record: 2021–01–01 10:08:00#8
- max event time:
[2021–01–01 10:13:00]
- watermark:
[2021–01–01 10:03:00]
- watermark window:
[10:00:00, 10:05:00]
This event’s time is greater than the start of the watermark window, hence it will be processed.
-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+---+
|window |sum|
+------------------------------------------+---+
|[2021-01-01 10:05:00, 2021-01-01 10:10:00]|10 |
+------------------------------------------+---+
5. Fifth Record: 2021–01–01 10:00:01#7
- max event time:
[2021–01–01 10:13:00]
- watermark:
[2021–01–01 10:03:00]
- watermark window:
[10:00:00, 10:05:00]
This event’s time is slightly greater than the start of the watermark window, hence it will be processed.
6. Sixth Record: 2021–01–01 9:59:00#3
- max event time:
[2021–01–01 10:13:00]
- watermark:
[2021–01–01 10:03:00]
- watermark window:
[10:00:00, 10:05:00]
This event’s time is less than the start of the watermark window, hence it will not be processed.
Here is the consolidated list of all ingested events with max event seen so far, watermark, watermark window, and if they were processed or not.
Ingest above data into our streaming application and verify the processed status. You can find the complete code on GitHub.
I hope you have enjoyed learning about watermarking in Spark Streaming!