How to Use Stateful Operations in Kafka Streams

Explore stateful operations in the Kafka Streams DSL API

Abhishek Gupta
Mar 12 · 5 min read
Image for post
Image for post
Photo by Nathan Dumlao on Unsplash.

The first part of the Kafka Streams API blog series covered stateless functions such as filter, map, etc.

In this part, we will explore stateful operations in the Kafka Streams DSL API. We will focus on aggregation operations such as aggregate, count, and reduce along with a discussion of related concepts.

Aggregation

The aggregation operation is applied to records of the same key. Kafka Streams supports the following aggregations: aggregate, count, and reduce. As mentioned in the previous article, grouping is a prerequisite for aggregation. You can run groupBy (or its variations) on a KStream or a KTable, which results in a KGroupedStream and KGroupedTable, respectively.

Note: KTable grouping was not covered in the stateless operations blog.

aggregate

The aggregate function has two key components: Initializer andAggregator. When the first record is received, the Initializer is invoked and used as a starting point for the Aggregator. For subsequent records, the Aggregator uses the current record along with the computed aggregate (until now) for its calculation. Conceptually, this is a stateful computation being performed on an infinite data set. It is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc.

Here is an example of how you can calculate the count (i.e. the number of times a specific key was received). Code examples are available on GitHub:

count

count is such a commonly used form of aggregation that it is offered as a first-class operation. Once you have the stream records grouped by key (KGroupedStream), you can count the number of records of a specific key by using this operation.

The aggregate way of doing things can be replaced by a single method call!

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count();

reduce

You can use reduce to combine the stream of values. The aggregateoperation that was covered earlier is a generalized form of reduce. You can implement functionality such as sum, min, max, etc. Here is an example of max:

Note: All the aggregation operations ignore records with the null key, which is obvious since the very goal of these sets of functions is to operate on records of a specific key.

Aggregation and state stores

In the examples above, the aggregated values were pushed to an output topic. This is not mandatory, though. It is possible to store the aggregation results in local state stores. Here is an example:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count(Materialized.as("count-store"));

In this example, the call to count also creates a local state store named count-store that can then be introspected using Interactive Queries.

These state stores can either be in memory or stored on disk using RocksDB. This allows for scalability since each state store is present locally in the specific Kafka Streams application that processes inputs from different partitions of a topic. Thus, the overall state is distributed across (potential) multiple instances of your application (except in the case of GlobalKTables). Another key property is high availabilitybecause the contents of these state stores are backed up into Kafka as changelog aka compacted topics (although this can be disabled), which provides high availability. If an app instance crashes, its state store contents can be restored from Kafka itself.

KGroupedTable

A KGroupedTable is obtained when groupBy* operations are invoked on a KTable. Just like KGroupedStream, having a KGroupedTable is a prerequisite for applying aggregation on a KTable. aggregate, count, and reduce work the same way in KGroupedTable as they do with a KGroupedStream.

But there is an important difference that needs to be highlighted. A KTable is conceptually different from a KStream in the sense that it represents a snapshot of the data at a point in time (very much like a database table). It is a mutable entity as opposed to a KStream, which represents an immutable + infinite sequence of records. To factor this difference, the aggregate and reduce functions in a KGroupedTable also add an additional Aggregator (often known as a subtractor) that is invoked when a key is updated or a null value is obtained.

Windowing

Stateful Kafka Streams operations also support Windowing. This allows you to scope your stream processing pipelines to a specific time window/range (e.g. track the number of link clicks per minute or unique page views per hour).

To perform Windowed aggregations on a group of records, you will have to create a KGroupedStream (as explained above) using groupBy on a KStream and then using the windowedBy operation (available in two overloaded forms). You can choose between traditional windows (tumbling, hopping, or sliding) or session-based time windows.

Using windowedBy(Windows<W> windows) on a KGroupedStream returns a TimeWindowedKStream on top of which you can invoke the above-mentioned aggregate operations. For example, if you want the number of clicks over a specific time range (say five minutes), choose a tumbling time window. This will ensure that the records are clearly segregated across the given time boundaries. In other words, clicks from user 1 from 10-10:05 a.m. will be aggregated (counted) separately and a new time block (window) starts from 10:06 a.m., during which the clicks counter is reset to zero and counted again.

Other window types include:

  • Tumbling time windows, which never overlap. A record will only be part of one window.
  • Hopping time windows where records can be present in one or more time ranges/windows.
  • Sliding time windows are meant for use with Joining operations.

There is another type of stateful operation called Joining. It is an extensive topic that deserves an entire article (or maybe another series) by itself.

If you want to take into account the “session” (the period of activity separated by a defined gap of inactivity), please use windowedBy(SessionWindows windows), which returns a SessionWindowedKStream:

That’s all for this part of the Kafka Streams blog series. Stay tuned for the next part, which will demonstrate how to test Kafka Streams applications using the built-in test utilities.

References

Please don’t forget to check out the following resources for Kafka Streams

Better Programming

Advice for programmers.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store