Using Apache Kafka like database with KafkaStreams Processor API and StateStore

Gökhan Gürgeç
cloudnesil
Published in
5 min readFeb 28, 2021

Kafka Streams is a client library for processing and analyzing data stored in Kafka. Kafka Streams provides capabilities to process the Apache Kafka data as streams and State Store that can be used by Kafka Streams applications for storing and querying data in Apache Kafka. We can interactively query Apache Kafka data via State Stores.

Kafka Streams proposes two API’s for processing streaming data.

  1. Kafka Streams DSL: Kafka Streams DSL proposes a high-level API that is built on Processor API and supports built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable.
  2. Processor API: Processor API is a low level API that allows developers to define and connect custom stream processors, also to build interaction with state stores.

In this story we implement a Kafka Streams Application with Processor API and query the processed data via state stores by using Spring Boot.

I selected a popular case for the implementation. In the pandemic era e-commerce has grown exponentially and people want to instantly track their orders. To manage the location of orders effectively and instantly, Apache Kafka is a good candidate. We will have a simple application that consumes an orderLocation topic from Apache Kafka and will implement a REST Controller for querying the location of order without persisting the location to the database.

Implementation is available in GitHub.

Architecture

This is the architecture of the application.

We do not deal with the producer and producing messages for the orderLocation topic in this story. There are lot’s of articles about producing messages to Apache Kafka. We recommend to have a look at log compaction. We will create a topic in my Apache Kafka Cluster as log compacted since I want to retain the last value for my key in the topic.

Let’s dive into implementation.

  1. Prerequisites & Dependencies:

Naturally we need an Apache Kafka Cluster.

Topic Creation
  • For development we use Java 11 and Spring Boot 2.4.3.
  • We use spring-kafka library for Spring support of Apache Kafka and Spring Boot 2.4.3 version uses kafka client version of 2.6.0.

Here are the libraries we need.

2. Kafka Streams Configuration:

a. First of all we add following configurations to application.yml

We need to define the bootstrap server address, topic name and streams applicationId and stateStoreName. applicationId is an identifier for our streaming application and applicationId must be unique within Kafka Cluster.

b. KafkaStreamsConfiguration.java

We add a Spring Configuration class for configuring our Kafka Streams application and building topology for Kafka Streams.

Spring for Apache Kafka offers StreamsBuilderFactoryBean for managing Kafka Streams in Spring context but we do not use this class.

We create a KafkaStreams instance as a Spring managed Bean to use within the application context.

KafkaStreams instance is the entry point for our KafkaStreams application and after it is started our stream is being started to process.

Since we use Kafka Streams Processor API, we start KafkaStreams by defining a topology for KafkaStreams.

Topology means the same for most of the streaming libraries.

Taken from slide for defining Akka Streams. https://www.slideshare.net/Lightbend/understanding-akka-streams-back-pressure-and-asynchronous-architectures

First we define a Source which is the source of our stream. Source is orderLocation topic in our Kafka Cluster.

Second we define Process(same with Flow in Akka Streams) We define a OrderLocationStreamsProcessor class that implements Processor.

Lastly we define a Sink for Streams. We add a stateStore for our KafkaStreams application.

StateStore is the core of our application and the structure we used Apache Kafka like database for querying data.

“To implement a stateful Processor or Transformer, you must provide one or more state stores to the processor or transformer (stateless processors or transformers do not need state stores). State stores can be used to remember recently received input records, to track rolling aggregates, to de-duplicate input records, and more. Another feature of state stores is that they can be interactively queried from other applications, such as a NodeJS-based dashboard or a microservice implemented in Scala or Go.

There are memory and persistent stateStore types and default is the persistent state store.

3. OrderLocationStreamsProcessor.java

This is the Streams Processor class that processes the incoming messages from Kafka Topic.

We implement init, process and close methods that are defined in the Processor interface.

Messages are processed in process as the name suggests. We can add processing logic in this method like saving to SQL or NOSQL database, search Engine like Elastic Search or make aggregations etc. In our method we simply put our message to state store.

4. OrderLocationController.java

This is a REST Controller class. We define an GET endpoint that accepts orderNumber. We call OrderLocationService for getting the OrderLocation value.

5. OrderLocationService.java

In this service we inject our KafkaStreams instance to our class and in getOrderLocation method we query our stateStore.

Running the Application

We run the application with:

  • ./gradlew clean bootRun
  • send the location to Apache Kafka topic with KafkaTool
  • call the endpoint with Postman

And that’s it. Thank you for reading. Please do not hesitate to send comments, ideas and make corrections.

Resources

  1. https://kafka.apache.org/27/documentation/streams/developer-guide/
  2. https://docs.confluent.io/platform/current/streams/developer-guide/write-streams.html

--

--

Gökhan Gürgeç
cloudnesil

IT professional worked on various positions(test engineer, developer, project manager) of software development, passionate to good quality software development