Mastering Stream Processing: A Guide to Windowing in Kafka Streams and Flink SQL

bbejeck
Confluent
Published in
7 min readJan 22, 2024

Stream processing is the best way to work with event data. While batch
processing still has its use cases, and probably always will, only
stream processing offers the ability to respond in real-time to events.

But if we zoom in, what does it look like to respond to events? By now,
I’m sure you’re familiar with the oft-quoted fraud scenario — a person
with nefarious intent gets a hold of an unaware consumer’s credit card
number. Still, due to the bank’s responsiveness processing system, the
fraudulent charge gets declined.

Other uses of stream processing require an immediate response but are
not tied to one single event. Consider monitoring the heat of a
manufacturing process; if the average temperature reaches a certain
threshold in a given period, then the monitoring process should generate
an alert. But this isn’t about one temperature spike. It’s about a
consistent upward trend. In other words, what are the temperature
readings doing during a fixed period?

I’m talking about windowing in event streams, if you have not guessed by
now. While aggregations (an aggregation is a grouping of events by a
common attribute) are a vital tool to leverage an event stream, an
aggregation over all time doesn’t shed any light on specific periods of
activity. Consider the following illustration:

Average temperature aggregated over all time

Over time the average temperature reading has increased some over time,
but it doesn’t tell the whole story. Now let’s take a look at capturing
the average temp readings over specific intervals:

Windowed average temp readings

Now by getting readings at specific intervals (windows) you can spot the
issue with a large jump in the average value.

This is not to say that an aggregation over all time isn’t helpful, but
that, in many cases, you’ll want to aggregate over specific intervals.
In other cases, you’ll want an aggregation not defined by fixed time
boundaries but by behavior, e.g., session windows whose boundaries are
based on periods of *inactivity.* We’ll get into session windows in a
post later in the blog series.

This blog post marks the first in a series about windowing in the two
dominant stream processing technologies today: Kafka Streams
Flink, specifically Flink SQL.
It’s important to note that the point of this blog series is not a
direct comparison between the two APIs. Instead, it is a resource for
windowed operations in Kafka Streams and Flink SQL. While comparing the
two in a competitive analysis is natural, it’s not the main focus here.

The blog series will discuss:

- The different types of windowing, semantics, and potential use cases.

- Time semantics

- Interpretation of the results

- Testing windowed applications

I will assume basic familiarity with Kafka Streams and Flink SQL, so the
examples will start by covering windowing.

But before we get into windowing, let’s discuss how Kafka Streams and
Flink SQL structure windowing applications. We’ll only cover this level
of detail in this initial post, and subsequent ones will assume
knowledge of how to assemble the program and focus on the windowing
aspect.

Kafka Streams windowing

You’ll need to specify an aggregation to do any windowing in Kafka Streams. Aggregations are a function that combines smaller components into a large composition, clustered around some attribute, which in Kafka Streams will be the key in the key-value pairs. You can also perform a reduce, a specialized form of aggregation, since a reduce operation will return the same type as its input components. Generally, an aggregation can return a completely different value from the inputs. But since windowing operates the same for either a reduce or aggregation will use an aggregation for our examples throughout the blog series.

KStream<String,Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));
iotHeatSensorStream.groupByKey() <1>
.windowedBy(<window specificatation>) <2>
.aggregate(() -> new IotSensorAggregation(tempThreshold), <3>
aggregator,
Materialized.with(stringSerde, aggregationSerde))
.toStream().to("sensor-agg-output",
Produced.with(windowedSerde, sensorAggregationSerde))<4>

Let’s walk through the essential points of setting up the Kafka Streams
window aggregation:

1. The first step is to group all records by key; this is required
before performing any aggregation. Here you’re using
KStream.groupByKey which assumes the underlying key-value pairs have the correct keys needed for clustering together. If not, you could use the
KStream.groupBy function where you pass a
KeyValueMapper instance that maps the current key-value pair into a new one which allows you to create a new key suitable for the aggregation
grouping. Note that changing the key for a group-by will lead to a
re-partitioning of the records.

2. You are specifying the windowing — we’ll cover the specific types in
later posts.

3. Point three is where you’re specifying how to aggregate records. The
first parameter is an Initializer represented as a lambda function,
which provides the initial value. The second parameter is the
Aggregator instance, which performs the aggregation action you
specify. Here, it’s a simple average and tracking the highest and
lowest values seen. The third parameter is a Materialized instance
specifying how to store the aggregation. Since the value type
differs from the incoming value, you must provide the appropriate
Serde instance for Kafka Streams to use when (de)serializing
records.

4. The final point is where you provide the Serde instances for
producing the results back to Kafka. The key Serde is a different
type as Kafka Streams wraps the incoming record key in a Windowed
instance.

What’s not apparent from this aggregation example is where the
timestamps for the window are. But there’s a big hint in the explanation
of the aggregation example. At point four of the aggregation
description, Kafka Streams wraps the original key in a
Windowed object.

Windowed object

As shown in this illustration, the Windowed object contains the original
key and the Window instance for the aggregation values. The Window object has the start and end time for the aggregation window. It doesn’t contain the window size,
but you can easily calculate the size by subtracting the start time from
the end. We’ll cover reporting and analyzing the aggregation window
times in a follow-on blog post.

Wrapping the original key in a Windowed object changes the type, meaning
you’ll have to update Kafka Streams on serializing the results.
Fortunately, Kafka Streams provides the WindowedSerdes
utility class making it easy to get the correct Serde for producing
results back to Kafka:

Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, <1>
60_000L <2>
);

KStream<String,Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));
iotHeatSensorStream.groupByKey() <1>
.windowedBy(<window specificatation>)
.aggregate(() -> new IotSensorAggregation(tempThreshold),
aggregator,
Materialized.with(stringSerde, aggregationSerde))
.toStream().to("sensor-agg-output",
Produced.with(windowedSerde, sensorAggregationSerde))<3>

1. The class type for the original key

2. The size of the window in milliseconds

3. Providing the Serde for the Windowed key

So, by using the WindowedSerdes class, you provide the proper
deserialization strategy for Kafka Streams to produce windowed results
back to Kafka. Producing windowed results to a topic implies downstream
consumers will know how to handle the windowed results as well. We’ll
cover that situation in a later blog on reporting in a subsequent post
in this series.

Now, let’s move on to Flink SQL aggregation windows.

Flink SQL windowing

Flink offers windowing for event stream data as windowing table-valued
functions (TVF). The Flink TVFs implement the SQL 2016 standard Polymorphic Table Functions
(PTF). In a nutshell, PTFs allow for user-defined functions on a table
that returns a table.

PTF in action

The exciting thing about PTF is that the schema of the table returned by
the function is dynamic; it’s determined at runtime by the function
output. So, the PTFs enable windowing and aggregation functions on
existing tables, precisely what we get with the Flink SQL windowing. The
windowing TVFs in Flink replace the now deprecated Group Window Functions. Window TVFs provide more powerful window-based calculations like Window TopN and Window Deduplication.

Now, let’s move on to how you execute a windowed aggregation in Flink
SQL. As with the Kafka Streams example, we’ll review the structure of a
windowed aggregation, with specific window implementations covered in
later posts.


SELECT window_start,
window_end,
device_id,
AVG(reading) AS avg_reading <1>

FROM TABLE(<2>
<Window Function> ( <3>
TABLE device_readings, <4>
DESCRIPTOR(ts), <5>
INTERVAL '5' MINUTES, <6>
[INTERVAL '10' MINUTES]
)
)
GROUP BY window_start, <7>
window_end,
device_id

Here’s the breakdown of the query:

1. Selecting the columns and the aggregation using the Flink SQL AVG
function and providing a descriptive name; these columns form the
schema of the returned table.

2. The TABLE function

3. Here, you give a specific window function, either HOP, TUMBLING, or
CUMULATE. Support for a SESSION type is coming soon. We’ll cover the
specific types in later posts.

4. Next are the parameters for the window function, starting with the
table to use for the input

5. The DESCRIPTOR is the time attribute column the function uses for
the window.

6. Depending on the window function, the following 1 or 2 parameters
determine the window advance and size or just the size.

7. As with standard SQL aggregate functions, we need the same columns
in the GROUP BY clause in the SELECT clause.

Flink SQL inserts three additional columns into windowed operations,
window_start, window_end, and window_time. Flink SQL determines
window_time by subtracting 1ms from the window_end value.

This concludes our introduction to the structure of windowing
applications in Kafka Streams and Flink SQL. In the next edition, we’ll
cover hopping and tumbling windows.

Resources

- Apache Flink on Confluent Cloud

- Flink SQL Windows

- Kafka Streams windowing documentation

The views expressed in this article are those of the author and do not necessarily reflect the position of Confluent.

--

--