Kafka Streams Processor API in Spring Boot

Ceyhun Uzunoglu
6 min readMay 3, 2020

--

Hello folks. I hope you’re well in this pandemic era. In this post I’m going to talk about Kafka Streams Processor Api which in Spring Boot. Processor API is a life saver for use cases like including scheduled jobs and needing more effective use of state stores.

Surfers on the river, Munich.

Before continuing, I assume that you have most basic knowledge of Kafka Streams and state stores.

As an introduction, here are some definitions:

Apache Kafka® is a distributed streaming platform. https://kafka.apache.org/intro

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. https://kafka.apache.org/documentation/streams/

Kafka is a great tool for streaming and developed by great minds. If you want to know the design of Kafka and be amazed 🤩: https://kafka.apache.org/documentation/#design

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process every received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic. https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html

When we are creating our streaming pipeline with Kafka Streams for data manipulation, we generally use; KTable, KStream, GloblaKTable and aggregation of input via map, mapValues, filters, filterNot, flatMap, flatMapValues, branch, selectKey, aggregation, reduce, etc. Other ingredients of our recipe are input topic(s), state-store(s) for aggregations or reduce operations, intermediary topic(s), joins and output topic(s).

In this post, I will create my Kafka Streams pipeline via Topology building.

Topology: A logical representation of a ProcessorTopology. A topology is an acyclic graph of sources, processors, and sinks. A source is a node in the graph that consumes one or more Kafka topics and forwards them to its successor nodes. A processor is a node in the graph that receives input records from upstream nodes, processes the records, and optionally forwarding new records to one or all of its downstream nodes. Finally, a sink is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. A Topology allows you to construct an acyclic graph of these nodes, and then passed into a new KafkaStreams instance that will then begin consuming, processing, and producing records. https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/Topology.html

Talk is cheap, show me the code

You can find the whole code in:
https://github.com/mrceyhun/kafka-producer-api-example

About development environment:

  • Java 14
  • Spring Boot 2.2.6
  • Spring Kafka 2.4.5
  • Apache Kafka Streams 2.4.1

KafkaProducerApiExample Project

It is a dummy project, don’t judge for silliness of inputs😇

Project Structure:

  • Create CustomInfrastructureCustomizer in Spring configuration.
  • CustomInfrastructureCustomizer creates topology which consist of source(input topic), processor (CustomProcessor), state store and sink(output topic).
  • CustomProcessor process each log and also create schedule. Schedule runs CustomPunctuator every 60 seconds.
  • CustomPunctuator deletes older logs from state store.
  • A Spring service (StateStoreQueryService) query the state store and gets some data without effecting the running streaming processes.

TOPIC

Input topic:
Key: listener Id<Long>
Value: date and list of favorite singers of listener<FavoriteSinger>that consist of favorite singers of a listeners with listenerId key.
ie:
10002:{“date”: “2020–05–03 02:01:00”, “singers”: [“Athena”, “Shaggy”]}

  • FavoriteSingers java object. (getters and setters omitted.)

Spring Configuration

Create our Spring configuration for Kafka Stream.

There are some StreamsConfig for Kafka producer, Kafka consumer and for Kafka Streams internal topics. You can find detailed explanation of these configs at the References part.

We created our StreamsBuilderFactoryBean and sets its InfrastructureCustomizer as our new CustomInfrastructureCustomizer.

KafkaConfig.java

Infrastructure Customizer

CustomInfrastructureCustomizer implements KafkaStreamsInfrastructureCustomizer which has two override methods: configureBuilder and configureTopology. configureBuilder provides the StreamsBuilder object of the StreamsBuilderFactoryBean bean. StreamsBuilder’s build method builds our topology.

Our topology is so simple:

CustomInfrastructureCustomizer also implement StateStore builder from Stores object’s keyValueStoreBuilder method.

There are mainly two types of store:

- InMemoryKeyValueStore

- PersistentKeyValueStore

InMemoryKeyValueStore does not write to disk, all data is on the memory, however PersistentKeyValueStore uses RockSB and data is written to disk. There is no need for installation of RocksDB, it is a part of Kafka Streams application.

We use persistentKeyValueStore, because it persists data in disk in any unexpected fail scenarios.

CustomInfrastructureCustomizer.java:

Processor

CustomInfrastructureCustomizer implements the topology by adding CustomProcessor processor. CustomProcessor implements Processor<Long, FavoriteSingers>.

Processor interface has three methods to override:
- init
- process
- close
- There was also punctuator method, but it is depreciated.

  • init(ProcessorContext context) method defines initial states of Processor. In our project. init method gets the state store object that we defined in CustomInfrastructureCustomizer -topology building with StreamsBuilder- from ProcessorContext with state store name.
    Also init method has scheduling feature. You can define a schedule and do some operations on your state store in scheduled manner. Scheduler will be explained more detailed in punctuator part.
  • process(K key, V value) method processes the every new record that come to source of processor, input topic in this project, with the given key and value.

In CustomProcessor, we uppercased all singers, manipulated a singer name and then put into state store.

In process method, you can do any manipulation on any incoming record before putting into the state store.

CustomProcessor.java

Punctutator

Why scheduling is important for us. Assume that you are listening a topic with endless logs. What if your endless topic logs consist of mostly unique keys and your state store capacity (Kafka Streams application disk size) is limited. Probably you want to delete older logs from state store. There exist retention time for topics for Kafka, however retention time for state stores have not implemented yet. One of the solutions is manual deletion as recommended in the post of Alex Brekken : https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/ .
In this project, I used scheduler in CustomProcessor for deleting old logs.

As we mentioned above, we implemented a schedule in init method of CustomProcessor. Our schedule uses WALL_CLOCK_TIME which depends on the actual time. There is also a STREAM_TIME which depends on the incoming record. Detailed explanation: https://kafka.apache.org/23/documentation/streams/core-concepts#streams_time

We scheduled a CustomPunctuator object that runs every 60 seconds(for test purpose). CustomPunctuator implements Punctuator object and it has one override method punctuate. Our CustomPunctuator deletes the records that are older than 5 days in every 60 seconds. And we are writing some listener information in this schedule.

CustomPunctuator.java

Queryable Store

So now, our project listens the topic, makes some modifications, schedule a punctuator and deletes older logs from state store and it is running perfectly. Furthermore, we know that we have a key value store (in our project it is RocksDB), so the question comes to our minds:

Can we query records in the state store?

Answer: yes. Kafka Streams provides querying feature for state stores 🥳

In our project, we created a Spring service: StateStoreQueryService. We need the StreamsBuilderFactoryBean bean that we defined in configuration and name of the state store that we want to query. Thats all.

ReadOnlyKeyValueStore<Long, FavoriteSingers> queryableStateStore = streamsBuilderFactoryBean.getKafkaStreams().store(stateStoreName, QueryableStoreTypes.keyValueStore())

It was great feature: we can query the state store, which consist of the topic we are listening and it keeps records at least 5 days. Also it does not affect any running streaming processes.

However, we have limited query options for state store. We can get a value with key, get values from range of keys and get all state store records. I need to say that it is so fast.

StateStoreQueryService.java

THANKS FOR READING

Special thanks to Hasan Eken for his help.

Any feedback and comment is appreciated.

--

--