Kafka Streams — Stateful Aggregation — Part 2 (How to retain/rebuild state on restarts)

M Ilyas
7 min readFeb 12, 2023

--

This is Part-2 of the series. It is best to read and try the example in the context of Part-1 before Part-2. (Kafka Streams — Stateful Aggregation — Part 1 (Example and Q&A).

In this article we will go a step further and try out changing the location of the state store and then re-use the state on restarts.

One question in Part-1 is best as starting point of this article.

Q5. Can we re-use the state built locally?

Yes. And that is very much needed to make sure the rebuilding the state is done quick enough. It checks the last checkpoint been flushed to changelog topic to load the remaining keys from topic. So we should design that our pods (or containers) are using some Persistent Volume so when the tasks (consumers) are restarted they are in-service ASAP.

How is your microservice deployed?

Well normally we think of docker containers and probably Kubernetes for container orchestration. But this is not the only way, we might have different orchestration or different containers.

Docker with persistent volume

To make sure we have some persistent state saved, we need to assign persistent volume to containers. The below image is taken from this link (Volumes).

The below paragraph is also copied from above link, in case you are like me who like shortcuts in finding information :).

Share data between machines

When building fault-tolerant applications, you may need to configure multiple replicas of the same service to have access to the same files.

There are several ways to achieve this when developing your applications. One is to add logic to your application to store files on a cloud object storage system like Amazon S3. Another is to create volumes with a driver that supports writing files to an external storage system like NFS or Amazon S3.

Volume drivers allow you to abstract the underlying storage system from the application logic. For example, if your services use a volume with an NFS driver, you can update the services to use a different driver, as an example to store data in the cloud, without changing the application logic.

Kafka setup in our example.

  • We have 3 partitions of sales-events-v1 topic.
  • We have 3 consumer threads (single instance) for 3 partitions. This is because we will be testing on local machine.
  • Every consumer will have state store copy for relevant partition on local disk.

We will start with setting up a volume so we can use that volume to spin up the docker container.

Docker volume setup

A. Create volume

$ docker volume create stateful-aggregation_example

# verify listing all volumes
$ docker volume list

B. Change docker-compose.yml to use that volume

version: '2'

volumes:
stateful-aggregation_example:

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
container_name: local_kafka_docker
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_DOCKER://kafka:9092,PLAINTEXT://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
example:
image: docker.io/library/kafka-streams-statestores-example:latest
container_name: example
volumes:
- stateful-aggregation_example:/shared-volume
depends_on:
- kafka
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092

See “volumes” and “example” sections.

On production it depends where you want to persist your state. For example if you are deploying on AWS, you might want to create volume mounted to AWS EFS or EBS or any driver to persist somewhere else in cloud. Here I am chosing the quickest way to show how to use a volume.

View state store local copy

We are now all set to run our docker instances. Just to be 100% sure lets add Kafka Stream property to specifiy location of statestore.

logging.level.org.apache.kafka.streams.processor.internals.ProcessorStateManager=INFO
spring.kafka.streams.properties.state.dir=/shared-volume/state_store

Now start the whole network of service by magical command.

$ docker-compose up -d
$ docker exec -ti example /bin/bash
$ cd /shared-volume/state_store/spring-boot-streams
$ ls
0_0 0_1 0_2 1_0 1_1 1_2 kafka-streams-process-metadata

With this change let’s play a little. We have enabled log level to INFO for ProcessorStateManager to see what happens on startup.

Look at this below log when it was started first time.

# We have already started the service - Let's copy logs to view
$ docker-compose logs example > startup-1.txt
ProcessorStateManager  : task [1_0] State store PRODUCT_AGGREGATED_SALES did not find checkpoint offset, hence would default to the starting offset at changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-0
StreamTask : task [1_0] Initialized

ProcessorStateManager : task [1_1] State store PRODUCT_AGGREGATED_SALES did not find checkpoint offset, hence would default to the starting offset at changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-1
StreamTask : task [1_1] Initialized

ProcessorStateManager : task [1_2] State store PRODUCT_AGGREGATED_SALES did not find checkpoint offset, hence would default to the starting offset at changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-2
StreamTask : task [1_2] Initialized

We can see three tasks (as we have three partitions) and all are initialized and every task didn’t find any checkpoint locally as there were no sales events at all.

State store PRODUCT_AGGREGATED_SALES did not find checkpoint offset, hence would default to the starting offset at changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-0

Lets populate the state store. Please follow the git repository for this series (https://github.com/ilyasjaan/kafkastreams_stateful-aggregation). “sample_events.txt” is also there in repository to be used for this step.

The below script (step-by-step) pushes 100 sales events for different products just to put some data in the changelog topic and aggregate some information in KTABLE. Just execute the below script step by step.

# 1
$ docker cp sample_events.txt local_kafka_docker:/home/appuser/sample_events.txt

# 2
$ docker exec -ti local_kafka_docker /bin/bash

# 3
$ tail -n +1 sample_events.txt | /usr/bin/kafka-console-producer \
--bootstrap-server localhost:29092 \
--topic sales-events-v1 \
--property "parse.key=true" \
--property "key.separator=:"

sample_events.txt” is provided in repository that contains 100 sales events. You can modify and make it bigger or small if you want to.

Now stop your service and restart and check logs again. This will show us the starting checkpoint

$ docker-compose stop example
$ docker-compose rm example
$ docker-compose up -d example
$ docker-compose logs example > startup-2.txt
StoreChangelogReader   : Finished restoring changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-2 to store PRODUCT_AGGREGATED_SALES with a total number of 0 records
StoreChangelogReader : Finished restoring changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-0 to store PRODUCT_AGGREGATED_SALES with a total number of 0 records
StoreChangelogReader : Finished restoring changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-1 to store PRODUCT_AGGREGATED_SALES with a total number of 0 records

StreamTask : task [1_0] Restored and ready to run
StreamTask : task [1_1] Restored and ready to run
StreamTask : task [1_2] Restored and ready to run

Mainly we see below two lines. changelog and local state are already in-sync so nothing to be loaded at startup.

Finished restoring changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-1 to store PRODUCT_AGGREGATED_SALES with a total number of 0 records

task [1_0] Restored and ready to run

Now delete the local copy and restart, here the changelog topic still have the records. This action should build up the local cache on startup. This is same as deploying kafka streams application containers without persistent volume. That way we lose local copy on restarts.

$ docker exec -ti example /bin/bash
$ cd /shared-volume/state_store
$ e
$ exit

$ docker-compose stop example
$ docker-compose rm example
$ docker-compose up example -d
$ docker-compose logs example > startup-3.txt
StoreChangelogReader   : Finished restoring changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-2 to store PRODUCT_AGGREGATED_SALES with a total number of 0 records
StoreChangelogReader : Finished restoring changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-0 to store PRODUCT_AGGREGATED_SALES with a total number of 20 records
StoreChangelogReader : Finished restoring changelog spring-boot-streams-PRODUCT_AGGREGATED_SALES-changelog-1 to store PRODUCT_AGGREGATED_SALES with a total number of 80 records

StreamThread : Restoration took 2766 ms for all tasks [0_0, 0_1, 0_2, 1_0, 1_1, 1_2]

We see 20 and 80 records being fetched from changelog topic.

I only stopped and restarted “example” service — not the kafka docker. Just a note because I wanted to keep the data in the kafka topics and only play around losing state and rebuilding it.

Try killing the “example” service and restart multiple times, if volume has the state the restore will not fetch any records from changelog topic.

When you will start your container, keep looking at logs it takes bit of time to starts loading data in local store. Imagine if you have tens of thousands of records being aggregated and you lost local state store files.

This local store build up from changelog topic can halt all the consumers depending on the Kafka Version (This is known as “stop-the-world” protocol). Latest Kafka versions can start listening to messages on partitions that didn’t lose the local state store OR the ones that caught up sooner than others.

We just tried restarts, now imagine in microservices environment, pods (containers) can be moved to different hosts without your intentional restarts. This is very common scenario, mostly microservices are stateless because they can be stopped and started, scaled up and down and end user experience is not affected. Kafka Streams aggregation makes microservices stateful so we need to link containers to a persistent state mechanism so we don’t lose the basic benefits of containers.

Just taken snapshot for imagining state store on filesystem

Production deployment

In this article we have only tried one instance consuming all three partitions. On production obviously it is going to be multiple instances for more resilliance. That will open up more debate of scale up and down and rebalancing as it is very usual to move pods (containers) around to different hosts.

We will see how in real life we deploy Kafka Streams application in Part 3 of this series.

Further Reading

--

--