How to Use Stateful Operations in Kafka Streams
Explore stateful operations in the Kafka Streams DSL API
In this part, we will explore stateful operations in the Kafka Streams DSL API. We will focus on aggregation operations such as
reduce along with a discussion of related concepts.
The aggregation operation is applied to records of the same key. Kafka Streams supports the following aggregations:
reduce. As mentioned in the previous article, grouping is a prerequisite for aggregation. You can run
groupBy (or its variations) on a
KStream or a
KTable, which results in a
KTable grouping was not covered in the stateless operations blog.
aggregate function has two key components:
Aggregator. When the first record is received, the
Initializer is invoked and used as a starting point for the
Aggregator. For subsequent records, the
Aggregator uses the current record along with the computed aggregate (until now) for its calculation. Conceptually, this is a stateful computation being performed on an infinite data set. It is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc.
Here is an example of how you can calculate the count (i.e. the number of times a specific key was received). Code examples are available on GitHub:
count is such a commonly used form of aggregation that it is offered as a first-class operation. Once you have the stream records grouped by key (
KGroupedStream), you can count the number of records of a specific key by using this operation.
aggregate way of doing things can be replaced by a single method call!
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);stream.groupByKey().count();
You can use
reduce to combine the stream of values. The
aggregateoperation that was covered earlier is a generalized form of
reduce. You can implement functionality such as
max, etc. Here is an example of
Note: All the aggregation operations ignore records with the
null key, which is obvious since the very goal of these sets of functions is to operate on records of a specific key.
Aggregation and state stores
In the examples above, the aggregated values were pushed to an output topic. This is not mandatory, though. It is possible to store the aggregation results in local state stores. Here is an example:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);stream.groupByKey().count(Materialized.as("count-store"));
In this example, the call to
count also creates a local state store named
count-store that can then be introspected using Interactive Queries.
These state stores can either be in memory or stored on disk using
RocksDB. This allows for
scalability since each state store is present locally in the specific Kafka Streams application that processes inputs from different partitions of a topic. Thus, the overall state is distributed across (potential) multiple instances of your application (except in the case of
GlobalKTables). Another key property is
high availabilitybecause the contents of these state stores are backed up into Kafka as
compacted topics (although this can be disabled), which provides high availability. If an app instance crashes, its state store contents can be restored from Kafka itself.
KGroupedTable is obtained when
groupBy* operations are invoked on a
KTable. Just like
KGroupedStream, having a
KGroupedTable is a prerequisite for applying aggregation on a
reduce work the same way in
KGroupedTable as they do with a
But there is an important difference that needs to be highlighted. A
KTable is conceptually different from a
KStream in the sense that it represents a snapshot of the data at a point in time (very much like a database table). It is a mutable entity as opposed to a
KStream, which represents an immutable + infinite sequence of records. To factor this difference, the
reduce functions in a
KGroupedTable also add an additional
Aggregator (often known as a subtractor) that is invoked when a key is updated or a
null value is obtained.
Stateful Kafka Streams operations also support
Windowing. This allows you to scope your stream processing pipelines to a specific time window/range (e.g. track the number of link clicks per minute or unique page views per hour).
Windowed aggregations on a group of records, you will have to create a
KGroupedStream (as explained above) using
groupBy on a
KStream and then using the
windowedBy operation (available in two overloaded forms). You can choose between traditional windows (tumbling, hopping, or sliding) or session-based time windows.
windowedBy(Windows<W> windows) on a
KGroupedStream returns a
TimeWindowedKStream on top of which you can invoke the above-mentioned aggregate operations. For example, if you want the number of clicks over a specific time range (say five minutes), choose a tumbling time window. This will ensure that the records are clearly segregated across the given time boundaries. In other words, clicks from user 1 from 10-10:05 a.m. will be aggregated (counted) separately and a new time block (window) starts from 10:06 a.m., during which the clicks counter is reset to zero and counted again.
Other window types include:
Tumblingtime windows, which never overlap. A record will only be part of one window.
Hoppingtime windows where records can be present in one or more time ranges/windows.
Slidingtime windows are meant for use with
There is another type of stateful operation called
Joining. It is an extensive topic that deserves an entire article (or maybe another series) by itself.
If you want to take into account the “session” (the period of activity separated by a defined gap of inactivity), please use
windowedBy(SessionWindows windows), which returns a
That’s all for this part of the Kafka Streams blog series. Stay tuned for the next part, which will demonstrate how to test Kafka Streams applications using the built-in test utilities.