Kafka StateStore walkthrough

Abhishek Giri
DataPebbles
Published in
3 min readDec 4, 2023

In this blog, we will discuss a brief introduction to state stores in Kafka streams.

Kafka Streams stateful processing enables the grouping of related events that arrive at different times by capturing and storing them. Kafka Streams are backed by a persistent or in-memory state store, themselves being backed by Kafka changelog topics, providing full fault tolerance.

To manage the state in Kafka, stream processing applications typically use the Kafka Streams library, which provides a number of abstractions for working with the state. Kafka Streams provides a key-value store abstraction, which can be used to maintain an arbitrary state for a stream processing application.

The state store in the application is split across many distributed instances. It can be managed locally by these application instances or remotely across all instances. Let's see these two ways to query the stream applications via

  • Local State
  • Remote State

A Kafka Streams application typically runs on multiple instances. The state that is locally available on any given instance. Querying the local stores on an instance will only return data locally available on that particular instance.

Source: Confluent

The application can query from the state in two ways

Local State

An application instance can query the locally managed portion of the state and directly query its own local state stores.

Remote state

To query the full state of your application, you must connect the various fragments of the state, including:

  • query local state stores
  • discover all running instances of your application in the network and their state stores
  • communicate with these instances over the network

Querying Local Key-Value Store

To query a local key-value store, we first need to create a topology with a key-value store. This example creates a key-value store named “TradeKeyValueStore”. This store will hold the Trade count that is found on the topic “trade-count-input”.

Properties props = new Properties();
StreamsBuilder builder = new StreamsBuilder;
KStream<String, String> trades = new KStream<String, String>();

KGroupedStream<String, String> groupedByTrades = trades
.flatMapValues(value -> Arrays.asList(value.contains("stocks")))
.groupBy((key, trade) -> trade, Grouped.with(stringSerde, stringSerde));

groupedByTrades.count(Materialized.<String, String, KeyValueStore<Bytes, byte[]>as("TradeKeyValueStore"));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Querying Remote state store for the Entire App

To query remote states for the entire app, you must expose the application’s full state to other applications, including applications that are running on different machines.

For example, you have a Kafka Streams application that processes user events in a Trade Application, and you want to retrieve the latest status of each Trade directly and display it on a User Application. Here are the required steps to make the full state of your application queryable:

  • Add an RPC layer to the application
  • Expose RPC endpoint
  • In the RPC layer, discover Remote application instances and their state stores and query locally available state stores to make the full state of your application queryable.
Properties props = new Properties();
String rpcEndpoint = "host:4460";
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, rpcEndpoint);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "trade-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 500);
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> trades = builder.stream(stringSerde, stringSerde, "trade-count-input");

final KGroupedStream<String, String> groupedByTrade = trades
.flatMapValues(value -> Arrays.asList(value.getSymbol().contains("ASML")))
.groupBy((key, trade) -> trade, Grouped.with(stringSerde, stringSerde));

groupedByTrade.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("trade-count"));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Start the Restful proxy for servicing remote access to state stores
TradeRPCService rpcService = new TradeRPCService(streams, rpcEndpoint);
rpcService.start();

Conclusion

In this article, we learned about how state store can be managed in two ways i.e. local state and remote state. Local state is where you can query within the instances, whereas in the Remote state, you can query among all the application instances.

Kafka State-store can be beneficial in real-time applications because of its event-driven and fault-tolerant behavior. State Store can be considered as a savepoint from where you can resume your data processing or it might also contain some useful information needed for further processing. It enables applications to efficiently store and retrieve intermediate results, perform complex computations, and maintain context while processing real-time data streams.

For any queries, please reach out to me @ agiri@datapebbles.com

Till then, Happy Streaming :)

--

--