Peering through the ‘Window’ of Structured Spark Streaming

Manisha Jain
7 min readJun 3, 2020

--

source : www.publicdomainpictures.net

Time is critical in streaming applications as compared to batch.
The choice to go ahead with batch or streaming always depends on the expected latency in data processing. Use-cases that require processing at lower latency can be classified as streaming.

Does that mean events in streaming processing systems are processed as they arrive? Not always!

The concept of ‘Event Time’ is essential in designing streaming applications.

Event time is the time at which the event occurs, which is different from the time at which the processing system receives the data.

Receiving multiple events does not guarantee the order in which the event occurred as there can be a number of factors which can cause the event to delay on the way from the source to the processing system.
For this reason, we need to work with ‘Event Time’ contained in the data.

Assume the following streaming events give info related to the shows on a streaming platform watched by a user at some point in time.
Here watching_at is the event time column which depicts when the show was watched.

{"show_title":"Breaking Bad","user_id":"1","watching_at":"2020-06-03 04:00:00"}
{"show_title":"Friends","user_id":"6","watching_at":"2020-06-03 04:01:00"}
{"show_title":"Game Of Thrones", "user_id":"2","watching_at":"2020-06-03 04:01:00"}
{"show_title":"Friends","user_id":"3","watching_at":"2020-06-03 04:02:00"}
{"show_title":"Big Bang Theory", "user_id":"4","watching_at":"2020-06-03 04:02:00"}
{"show_title":"Big Bang Theory", "user_id":"9","watching_at":"2020-06-03 04:03:00"}
{"show_title":"Breaking Bad","user_id":"5","watching_at":"2020-06-03 04:03:00"}
{"show_title":"The Office", "user_id":"1","watching_at":"2020-06-03 04:04:00"}
{"show_title":"Friends","user_id":"2","watching_at":"2020-06-03 04:05:00"}
{"show_title":"The Office","user_id":"3","watching_at":"2020-06-03 04:06:00"}
{"show_title":"Friends","user_id":"4","watching_at":"2020-06-03 04:07:00"}
{"show_title":"Friends","user_id":"5","watching_at":"2020-06-03 04:07:00"}

The below code snippet simulates a streaming source on a folder with json files(/data/events) by setting configuration “maxFilesPerTrigger” as 1 to tell Spark when to check input for data.

In this case, every new file added to ‘/data/events’ folder will act as a trigger.

val static = spark.read.json(“/data/events”)val streaming = spark.readStream.schema(static.schema).option(“maxFilesPerTrigger”, 1).json(“/data/events”)streaming.printSchema
root
|-- show_title: string (nullable = true)
|-- user_id: string (nullable = true)
|-- watching_at: string (nullable = true)

The first step to event-time analysis is to cast the column to Spark SQL timestamp type.

val withEventTime= streaming.selectExpr(“*”,”cast(watching_at as timestamp) as event_time”)withEventTime.printSchema
root
|-- show_title: string (nullable = true)
|-- user_id: string (nullable = true)
|-- watching_at: string (nullable = true)
|-- event_time: timestamp (nullable = true)

The simplest operation is to simply count the number of occurrences of an event in a given time window(for e.g, 2 minutes).

‘Time windows’ are generally defined as group by on the time column.

  • The first parameter signifies which column needs to be treated as time, in this case we will choose the the event time as ‘watched_at’ column.
  • The second parameter signifies the window duration. A window duration can be in seconds, minutes, hours, days or weeks.
    In this example, the time window is 2 minutes.
  • queryName is used to uniquely identify the query.
  • outputMode complete will write the full result every time.
    (Other outputModes are append and update.)
withEventTime.groupBy(window(col(“event_time”),“2 minutes”)) .count().writeStream.queryName(“events_in_window”).format(“memory”).outputMode(“complete”).start()spark.sql("SELECT * FROM events_in_window order by window").show(truncate=false)+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|[2020-06-03 04:00:00, 2020-06-03 04:02:00]|3 |
|[2020-06-03 04:02:00, 2020-06-03 04:04:00]|4 |
|[2020-06-03 04:04:00, 2020-06-03 04:06:00]|2 |
|[2020-06-03 04:06:00, 2020-06-03 04:08:00]|3 |
+------------------------------------------+-----+

This count aggregation will update in real time.
That means when a new event is added, the counts will be updated accordingly.
For eg, a new file with the below event is added to ‘/data/events’ folder.

{"show_title":"Game Of Thrones", "user_id":"2","watching_at":"2020-06-03 04:08:00"}

This will trigger Spark to read the input and the result will be as below .

spark.sql(“SELECT * FROM events_in_window order by window”).show(truncate=false)+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|[2020-06-03 04:00:00, 2020-06-03 04:02:00]|3 |
|[2020-06-03 04:02:00, 2020-06-03 04:04:00]|4 |
|[2020-06-03 04:04:00, 2020-06-03 04:06:00]|2 |
|[2020-06-03 04:06:00, 2020-06-03 04:08:00]|3 |
|[2020-06-03 04:08:00, 2020-06-03 04:10:00]|1 |
+------------------------------------------+-----+

Aggregation can also be performed on on multiple columns.
For example, we can add “show_time” column to get the number of views in each window by “show_title”.

withEventTime.groupBy(window(col(“event_time”),“2 minutes”) ,col(“show_title”)).count().writeStream
.queryName(“events_per_window_by_show”).format(“memory”)
.outputMode(“complete”).start()
spark.sql("SELECT * FROM events_per_window_by_show order by Window").show(truncate=false)+------------------------------------------+--------------+-----+
|window |show_title |count|
+------------------------------------------+--------------+-----+
|[2020-06-03 04:00:00, 2020-06-03 04:02:00]|Breaking Bad |1 |
|[2020-06-03 04:00:00, 2020-06-03 04:02:00]|Friends |1 |
|[2020-06-03 04:00:00, 2020-06-03 04:02:00]|Game Of Thr...|1 |
|[2020-06-03 04:02:00, 2020-06-03 04:04:00]|Breaking Bad |1 |
|[2020-06-03 04:02:00, 2020-06-03 04:04:00]|Friends |1 |
|[2020-06-03 04:02:00, 2020-06-03 04:04:00]|Big Bang The..|2 |
|[2020-06-03 04:04:00, 2020-06-03 04:06:00]|Friends |1 |
|[2020-06-03 04:04:00, 2020-06-03 04:06:00]|House of Cards|1 |
|[2020-06-03 04:06:00, 2020-06-03 04:08:00]|Friends |2 |
|[2020-06-03 04:06:00, 2020-06-03 04:08:00]|The Office |1 |
|[2020-06-03 04:08:00, 2020-06-03 04:10:00]|Game Of Thr...|1 |
+------------------------------------------+--------------+-----+

The above window, which is non overlapping is also referred as a Tumbling Window.

Now, let us say we need to find the most “Trending” show. The idea of a trend depends on the time frame we are looking at — Trending in the last 1 hour, last 10 min, etc.

In this case, let us find the show which is trending in the last 10 minutes.

withEventTime.groupBy(window(col("event_time"),"10 minutes",
"5 minutes"),col("show_title")).count().writeStream
.queryName("trending_events_per_window_by_show")
.format("memory").outputMode("complete").start()
spark.sql("SELECT * FROM trending_events_per_window_by_show order by Window").show(truncate=false)+------------------------------------------+--------------+-----+
|window |show_title |count|
+------------------------------------------+--------------+-----+
|[2020-06-03 03:55:00, 2020-06-03 04:05:00]|House of Cards|1 |
|[2020-06-03 03:55:00, 2020-06-03 04:05:00]|Friends |2 |
|[2020-06-03 03:55:00, 2020-06-03 04:05:00]|Big Bang Th...|2 |
|[2020-06-03 03:55:00, 2020-06-03 04:05:00]|Game Of Thr...|1 |
|[2020-06-03 03:55:00, 2020-06-03 04:05:00]|Breaking Bad |2 |
|[2020-06-03 04:00:00, 2020-06-03 04:10:00]|The Office |1 |
|[2020-06-03 04:00:00, 2020-06-03 04:10:00]|Game Of Thr...|2 |
|[2020-06-03 04:00:00, 2020-06-03 04:10:00]|Big Bang Th...|2 |
|[2020-06-03 04:00:00, 2020-06-03 04:10:00]|Friends |5 |
|[2020-06-03 04:00:00, 2020-06-03 04:10:00]|Breaking Bad |2 |
|[2020-06-03 04:00:00, 2020-06-03 04:10:00]|House of Cards|1 |
|[2020-06-03 04:05:00, 2020-06-03 04:15:00]|The Office |1 |
|[2020-06-03 04:05:00, 2020-06-03 04:15:00]|Game Of Thr...|1 |
|[2020-06-03 04:05:00, 2020-06-03 04:15:00]|Friends |3 |
+------------------------------------------+--------------+-----+

In the above output, the start times for each window are now in 5 min intervals. That is, in every 5th minute,the events being looked at will be from the last 10 minutes. This is known as a “Sliding Window”.

+----------+----------+------------------------------------------+
|max(count)|show_title|Window |
+----------+----------+------------------------------------------+
|3 |Friends |[2020-06-03 04:05:00, 2020-06-03 04:15:00]|
+----------+----------+------------------------------------------+

F.R.I.E.N.D.S is the trending show in the last 10 minutes window.

Source : Tumblr

Handling late events

Handling late events is an important aspect in streaming where events arrive late due to delays in network, loss of connection, etc but need to be factored in the outcome.

In a window based operation, handling late events are important as without it, Spark needs to store the intermediate data and wait infinitely which will overwhelm the system for a long period of time.
This is addressed by something called as “Watermarks”.

A Watermark is an amount of time following an event after which we do not expect to see any more data for that time.

Watermarks are specified using “withWatermark” API.

withEventTime
.withWatermark(“event_time”,”30 minutes”)
.groupBy(window(col(“event_time”),“10 minutes”,”5 minutes”), col(“show_title”)).count().writeStream
.queryName(“trending_events_per_window_by_show”)
.format(“memory”).outputMode(“complete”).start()

A Watermark is generally defined on a time column.

  • The first parameter signifies which column needs to be treated as time, in this case we will choose the event time as the ‘watched_at’ column.
  • Second parameter signifies the watermark duration. A watermark duration can be in any unit of time.
    In this example, the watermark duration is 30 minutes.

A late event within the watermark is allowed to aggregate, but any late event beyond the watermark is considered “too late ” and dropped.
In this example, any event later than 30 minutes will be dropped.

Fault Tolerance

Fault Tolerance is addressed using ‘Checkpointing

In case of a failure or an intentional shutdown, the state of a previous query can be saved and the application can continue from where it left off using checkpointing and write-ahead logs.

A data loss between triggers can be handled by storing the aggregates between triggers using checkpointing. Just by configuring a query with a checkpoint location, the query will save all the progress information
(i.e. range of offsets processed in each trigger) and the running aggregates.

withEventTime
.withWatermark(“event_time”,”30 minutes”)
.groupBy(window(col(“event_time”), “10 minutes”,”5minutes”),col(“show_title”)).count()
.writeStream.queryName(“trending_events_per_window_by_show”)
.option("checkpointLocation", "path/to/HDFS/dir")
.format(“memory”).outputMode(“complete”).start()

Conclusion :

Using Spark structured streaming allows you to build streaming applications as easily as batch.

This article explains some of the key concepts applied in building streaming aggregations using simple APIs .

The best way to learn Spark is by practicing the APIs on sample datasets. The Spark examples explained as part of the Spark-The Definitive Guide book provide a simple way to learn with hands-on examples in Scala and Python.

Thanks for reading!

--

--