How to do Structured Streaming like a boss

Part 1 — Event time and Watermarking

Milivoje Popovac
SmartCat.io
4 min readSep 19, 2019

--

So, you want to do Real-Time data processing in a short period of time — and consequently provide almost instant output. Great! You’re in good company, since it’s one of the hottest topics in BIG DATA today. But! How do you ***do it***?

This is the part where Spark’s powerful Structured Streaming comes in.

Structured Streaming provides “fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.” It’s built on the top of Spark SQL engine, where the streaming computation is expressed in the same way as a batch. It’s based on 3 design points:

  • declarative APIs — instead of you being responsible for tracking the state and dropping it after, or reacting on duplicated events, you define only what to compute — not how to compute it
  • Event Time — process data based on timestamps embedded in records at their creation time, as opposed to when the record is received
  • Continuous execution for lowest possible latency and micro-batch for high throughput

This is all nice and pretty, but what if: 1) you have records that come to the system at a random order, 2) if you have to specify how late you expect to see new events in the input stream? These are all important questions because you can’t assume that the size of the state store (where the intermediate information is saved) is infinite.

Let’s take a break for a minute and look at this example, to learn some new terms. Let’s say you’ve got an input stream of Instagram hashtags, and you want to count them so you find the most frequent ones.

Tumbling Window

As you can see on the image above, there’s an aggregation being performed over a window of time — in this case 30 minutes. Note: event time with windowing only works with aggregations. After the time expires for one window, the calculation is triggered. The engine will operate on the data received since the last trigger. You’ll notice that there aren’t any overlaps between windows. One event can fall into only one window. This type of window is called a Tumbling Window.

There’s another type, Sliding. They allow each event to fall into multiple windows, like this:

Here’s a sliding window with 30 minute increments, but you get the state every 15 minutes (the blue arrows).

However, these examples have one major weakness. The state used for storing intermediate data will grow INDEFINITELY. Why? Because we never specified how late we expect to see our data. But if there’s a way to do this, we might be able to age-out data and clear state store.

The solution? Watermarks.

A watermark is “an amount of time following a given event or set of events after which we do not expect to see any more data from that time”. All stateful processing that operates on event time applies to this as well.

Going back to our hashtag example, we will have a timestamp when it’s produced and also define a 25 minute threshold — the max amount of time hashtags are allowed to be late.

So, the watermark is defined as a maximum event time seen in the previous window minus threshold for late data. The watermark is calculated at the beginning of every trigger (at 01:15, 01:30, 01:45, etc.). For example, #me came at [01:36] to the processing system, but it was created at [01:08]. Given that the current value for our watermark (01:01) was below the tracked event (#me at 01:08), the engine will update the state for the window [00:45–01:15] and maintain it.

However, when the next trigger had occurred the updated value for the watermark (01:18) was greater than the end timestamp for the previous considered window. This led to the clearing of the intermediate state. Plus, all data after that, which is below the watermark, is skipped.

All of this brings us to the next question, what will be the output of this stream and how frequent will it be?

Enter Output modes.

They help us define what will be printed to the external storage. There’s 3 of them:

  • Update — results will be printed after each trigger, where only updated values have been forwarded. E.g at 01:15, 01:30, 01:45, etc.
  • Append — only when the intermediate state is dropped, the final result is printed to the sink (e.g at [02:00] results from window [00:45–01:15] will be printed)
  • Complete — doesn’t clear the old state since this mode by definition will keep all results from all windows (it should not be used)

And there you have it! A neat way to use Event Time and Watermark to track states and clear them later. But that’s just one piece of the puzzle.

End of part 1

What if you want more complex windows? Like if they’re based on counts and not event times? Or if you want variable window sizes? Those are some pretty cool questions and they’re hot and ready for you in PART2: Arbitrary Stateful Processing.

Coming soon.

--

--