Kafka Streams State Store at Scale

Managing Interactive Queries on multiple application instances Alternative solutions and their comparisons

Gökhan Gürgeç
cloudnesil
6 min readAug 2, 2022

--

I had a story about using Apache Kafka like database with Kafka Streams State Store. In the story there was an example application that consumes data from Apache Kafka and save the data in State Store and query data from State Store. It was an example of our application that runs in production. There was Kafka Streams State Store at the hearth of the topic. You can have a look at previous story here. https://medium.com/cloudnesil/using-apache-kafka-like-database-with-kafkastreams-processor-api-and-statestore-180bb4b5f9e5

State stores can be used to remember recently received input records, to track rolling aggregates, to de-duplicate input records, and more. Another feature of state stores is that they can be interactively queried from other applications. https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#state-stores

If you consume data from Apache Kafka and need to provide the consumed data directly or some data aggregated from the consumed data in your application, using Kafka State Store is a very logical choice. In our application querying data from the State Store decreased the response time and we had better performance.

By default State Store data is stored in local disk managed by high performant RocksDB storage engine. You can configure the state stores to store data in memory.

It works very well if you have only one application instance. But things change when you decide to scale your Kafka Streams application horizontally. As every good thing, State Stores come with a cost and you need to make extra work in order to keep your queries consistent.

The architecture of our application was like this.

Architecture of a single instance Kafka Streams application

The Problem:

The problem with this architecture is, if you have multiple instances of Kafka Streams application, each instance consumes a partition of the input orderLocation topic and they store data only they consume.

If the client asks to get all the order locations API gateway will choose one of the instance of application and client will not be able to get all the order locations. If Order-1 consumed by instance-1 and Order-2 consumed by instance-2 client will never be able to get both orders in a query.

The problem with multiple instances

The Alternative Solutions:

Since good architecture comes with considering the characteristics of your application and environment, one selected architecture may not fit for others.

While deciding the architecture for State Store in a scaled Kafka Streams application I think you should consider the following items.

i. What and how will you query from State Store? ( Do you need all the items in State Store or will you query by key?)

ii. How long will you keep records in State Store?

  1. Global State Store:

In Streams DSL we have KTable and GlobalKTable that are backed by StateStore and GlobalStore in Processor API. Since I chose the Processor API for the implementation, I will describe the architecture with GlobalStore. GlobalStore has the same characteristics as GlobalKTable.

The main feature of GlobalStore is that all input topic partitions are consumed by all instances of the application and all the records of topic are stored in all instances.

When I learned this, I was so happy that the scaling problem will be solved by just changing the local state store global state store. It was very easy.

However new issues have arisen when we change the local state store to global State Store. You need to be aware of the current implementation details of Global Store and what you want to store and query in State Store.

As indicated in the architecture, State Store has a concept of change log. The data stored in State Store is backed up to a changelog topic to ensure the fault tolerance of State Store. In any case the application fails and the restart State Store is recovered from changelog topic. GlobalStore does not create changelog topic and store the original message in State Store and use original topic as change log. You can check the details in this issue of Kafka. https://issues.apache.org/jira/browse/KAFKA-7663

Therefore if you store transformed or aggregated data in State Store GlobalStore may not be a good solution for you.

However there is a solution for this. It is using another topic for transformed/aggregated data.

Global State Store with Transformed Topic Architecture

The idea of this solution is:

  1. consuming the original topic
  2. make transformation on the original message
  3. send transformed message to another topic
  4. with a new Stream Processor, store the transformed message to State Store
  5. query the State Store

Here are the topologies of Kafka Streams written in Java:

in OrderLocationStreamProcessor, we process the original message, transform the original message and forward the transformed message to new topic defined in Sink in topology.

processorContext.forward(orderLocationDTO.getOrderNumber(), orderLocationTransformedDTO);

in OrderLocationTransformedStreamProcessor, we simply save the transformed message to our Global State Store.

stateStore.put(orderNumber, orderLocationTransformedDTO);

Let’s come back to our questions at the beginning.

I can say if you need to query all the items in State Store and item lifetime in State Store is relatively short this solution is best fit for you.

Because, you can directly respond all the items for all instances, you do not need to go to other instances. In our example this is the best fit since orders lifetime is limited. They are discarded from State Store after they are delivered and we generally need to query all active orders.

2. Adding RPC layer and querying Remote State Stores:

This solution is based on querying the remote State Stores of other Kafka Streams application instances.

RPC Layer Architecture

In order to implement this solution you need to add RPC Layer to your Kafka Streams application instances and all the instances can be aware of each other.

The following is the official Confluent documentation on querying the remote state stores.

Also the following two articles are great practical articles that shows the implementation of this solution.

Let’s come back to our questions at the beginning again.

I can say if you need to query items by key in State Store and item lifetime in State Store is long this solution is best fit for you.

Because, if you query by key, the only thing you should do is to find the State Store that stores the item you queried and get the item from that State Store. You do not need to deal with getting all the items from all State Stores, merging them etc.

3. Using distributed cache for store:

This solution is based on using a distributed cache for storing the items in State Store.

In order to implement this solution, there is an open source project https://github.com/andreas-schroeder/redisks

In this project Redis is used instead of local RocksDB in order to keep the State Store consistent for all instances.

I did not try this approach.

Also another approach can be forsaking the State Store and storing the items directly to distributed cache and query from cache.

If we come back to our questions at the beginning again.

This approach is very similar to first approach and can be used instead of that.

Only disadvantages of this approach related to first approach is you had to maintain a new component outside of your application and it’s performance can be lower than querying the data from local RocksDB.

--

--

Gökhan Gürgeç
cloudnesil

IT professional worked on various positions(test engineer, developer, project manager) of software development, passionate to good quality software development