Kafka Streams — Stateful Aggregation — Part 1 (Example and Q&A)

M Ilyas
7 min readJan 9, 2023

--

Example (Aggregated Sales using Kafka Streams)

In this series we will look at how can we use Kafka Streams stateful capabilities to aggregate results based on stream of events. We will try to address important questions about maintaining the state in containerized world where containers (services/tasks) can be stopped/restarted due to re-balancing or deploying new changes etc.

This series contains below articles.

  • Kafka Streams — Stateful Aggregation — Part 1 (Example and Q&A)
  • Kafka Streams — Stateful Aggregation — Part 2 (How to retain/rebuild state on restarts)
  • Kafka Streams — Stateful Aggregation — Part 3 (Production ready deployment)
  • Kafka Streams — Stateful Aggregation — Part 4 (Standby Replicas)

This article we will look at an example to aggregate information using Kafka streams. This article assumes that you already know basics of event driven design using Kafka and some know-how of CQRS pattern (Command and Query Responsibility Segregation).

When dealing with events and aggregating results: we need to query the previous state to calculate the final result after that particular event. Kafka provides a solution to this problem by providing state stores. We will be looking at the challenges using state stores in microservices (containerized) connected to kafka topics.

We can save the state outside Kafka as well, but that will add another dependency and potential latency to query the state. For example we can use Redis backed by some NoSQL database and microservices can query the state and calculate the results. Kafka promotes the use of internal capabilities and get benefit of milliseconds latency and resilience built-in.

Example (Aggregated Sales using Kafka Streams).

Sales events are published against each product and a service aggregates the product sales value and publishes a notification event when a product’s sales exceeds beyond a configured threshold. You can see source code of this here (https://github.com/ilyasjaan/kafkastreams_stateful-aggregation)

Look at README to try this example in the context of Part-1. (All should work command-line as long as you have the versions of tech needed for this example)

Sales Event

Every sale of a product we get this event published to sales-events-v1 topic.

{
"product": "PRODUCT_123",
"value": 100
}

Notification Event

For any product that hits a target sale, we get notification event published to notification-events-v1 topic.

{
"product": "PRODUCT_123",
"value": 20000
}

Sales Aggregator (kafkastreams_stateful-aggregation)

This microservice is a Kafka Streams application that aggregates the sales value of every product and when it crosses the target sales value it publishes notification event. Here is the simple code snippet that does this job.

@Component
@AllArgsConstructor
public class Processor {

private ObjectMapper objectMapper;

@Autowired
public void process(StreamsBuilder builder) {
builder
.stream("sales-events-v1", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(
() -> 0L,
this::aggregate,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("PRODUCT_AGGREGATED_SALES")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withCachingDisabled()
)
.filter((product, salesValue) -> salesValue >= 2000)
.mapValues(((key, value) -> new NotificationEvent(key, value)))
.toStream()
.to("notifications-events-v1", Produced.with(
Serdes.String(),
Serdes.serdeFrom(new JsonSerializer<NotificationEvent>(), new JsonDeserializer<NotificationEvent>()))
);
}

private Long aggregate(String key, String value, Long aggregate) {
try {
SaleEvent saleEvent = objectMapper.readValue(value, SaleEvent.class);
return aggregate + saleEvent.getValue();
} catch (JsonProcessingException e) {
// Ignore this event
return aggregate;
}
}

}

The logic is tested by a simple component test.

Input events are:

[
{
"product": "product2",
"value": 1000
},
{
"product": "product1",
"value": 1500
},
{
"product": "product1",
"value": 1000
},
{
"product": "product2",
"value": 1300
},
{
"product": "product5",
"value": 1000
}
]

Sales threshold is 2000 so we are expecting product1 and product2 to be notified as target achieved.

package uk.co.streams

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.awaitility.Awaitility
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.core.io.ClassPathResource
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.TempDir

import java.nio.file.Path
import java.time.Duration

@SpringBootTest(classes = StreamsSpringBootApplication)
@EmbeddedKafka(
partitions = 3,
brokerProperties = ["listeners=PLAINTEXT://localhost:9092", "port=9092"]
)
class EndToEndComponentSpec extends Specification{

@TempDir
@Shared
Path stateDir

@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker

KafkaMessageListenerContainer<String, String> container
Queue<ConsumerRecord<String, String>> records
Producer<String, String> producer

def setupSpec() {
System.out.println("Setting State Directory To="+stateDir.toAbsolutePath().toString())
System.setProperty("spring.kafka.streams.properties..state.dir", stateDir.toAbsolutePath().toString())
}

def setup() {
startTestConsumerForNotificationEvents()
initProducerTemplate()
}

def "should generate notification event for sale beyond given threshold for a product"() {
given:
def events = testSaleEvents()

when:
events.stream().forEach(input -> {
def inputJson = new JsonSlurper().parseText(input)
producer.send(new ProducerRecord("sales-events-v1", inputJson["product"], input))
});

then:
Awaitility
.await()
.timeout(Duration.ofSeconds(30))
.pollDelay(Duration.ofMillis(100))
.until(() -> {
return records.size() == 2
})
records.stream()
.filter({record -> ["product1", "product2"].contains(record.key())})
.map({record -> new JsonSlurper().parseText(record.value())})
.filter({
json -> json["value"] >= 2000
})
.findAll().size() == 2
}

List<String> testSaleEvents() throws IOException {
List<Object> list = new JsonSlurper().parse(new File(new ClassPathResource("input.json").getURI()))
list.collect({input ->
new JsonBuilder(input).toString()
})
}

def initProducerTemplate() {
// set up the Kafka producer properties
Map<String, Object> senderProperties =
KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString())
senderProperties.put("key.serializer", StringSerializer)
senderProperties.put("value.serializer", StringSerializer)

// create a Kafka producer factory
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(
senderProperties)

// create a Kafka producer
producer = producerFactory.createProducer();
}

def startTestConsumerForNotificationEvents() {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties =
KafkaTestUtils.consumerProps("sender", "true",
embeddedKafkaBroker)
consumerProperties.put("key.deserializer", StringDeserializer)
consumerProperties.put("value.deserializer", StringDeserializer)

// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<String, String>(
consumerProperties)

// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties("notifications-events-v1")

// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)

// create a thread safe queue to store the received message
records = new LinkedList<>()
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Test Consumer Received Key="+record.key())
System.out.println("Test Consumer Received Value="+record.value())
records.add(record)
}
})

// start the container and underlying message listener
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic())
}

}

You can also test the application using docker-compose. Please refer to README.

State Stores — Q & A

Some key questions that we need to be aware of before making use of Kafka Streams. Mostly these questions address the main issue that “Can we reuse the state built locally when an instance is restarted?”

We will look in more details in Part 2 (how to retain/rebuild state on restarts) and Part 3 (standby replicas).

Q1. How does the state store work?

Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations. By default Kafka Stream uses RocksDB to maintain local stores.

More on state stores (https://docs.confluent.io/platform/current/streams/architecture.html#state)

RocksDB is a key value store that stores information in local files. Here we are looking for persistent store to compliment memory. (http://rocksdb.org/)

This means we will have majority of keys in-memory and everything else backed up in local files.

Q2. Do we lose state on restarts?

Never. Kafka keeps all the state that is in-memory (and local files on disk) backed up by changelog topic. This means we get same resilience of Kafka for state stores.

Hint: The in-memory state contains every key being aggregated. So when container is restarted, it will build-up the state from topic. This can be time consuming based on how many keys to consume to rebuild the state fully.

When consumer is restarted OR we have rebalancing triggered — it will re-build the state from changelog topic. It can hold consumption from all topics till that state is built “named as stop-the-world protocol”. In later versions of Kafka the rebalancing protocol is improved to only hold consumption from the partition that is re-assigned.

Q3. Will it ever be the case that we have some state in memory but not flushed in changelog topic?

No. It’s two steps process: Kafka first flushes pending writes to the changelog; only after the writes got acknowledged by the broker, it commits input offsets.

Q4. How big that state store can grow?

Mainly every key will be saved in state store. In above example every product will exist in state store. Kafka uses some kind of compression to store in changelog topic. Imagine if we have 10 partitions and 10 consumers, Kafka will be looking to keep store for every consumer (task). So it will depend how much state we need to store for every task.

Q5. Can we re-use the state built locally?

Yes. And that is very much needed to make sure the rebuilding the state is done quick enough. It checks the last checkpoint been flushed to changelog topic to load the remaining keys from topic. So we should design that our pods (or containers) are using some Persistent Volume so when the tasks (consumers) are restarted they are in-service ASAP.

Q6. Can I keep only related keys in state store and delete the rest?

Absolutely! We are talking event driven processing here. So in above example if a product is taken off from catalog we don’t really need that product in state anymore. OR when it hits the target value we can delete and let it hit again the target.

This means at some point there should be an event on a topic that we can process and delete the key from state.

This is basically done by publishing the key with null value (in this case to state store). These messages are called tombstone messages <KEY, NULL>.

In above example, we are not clearing up the keys what have been notified of sales target. We will see in Part 2 (how to retain/rebuild state on restarts).

Q7. What is standby task?

Part-3 of this series dedicated to try this out. In short we keep a stand-by task for every partition with a simple configuration. A task that was previously running on a failed instance is preferred to restart on an instance that has standby replicas so that the local state store restoration process from its changelog can be minimized. We will look into more details in Part 3 (standby replicas).

Further Reading

--

--