Queryable Kafka Topics with Kafka Streams
In today’s data processing architectures, Apache Kafka is often used at the ingress stage. Usually, this step is used to enrich and filter the incoming message data; however, it is not possible to make interactive queries until a later stage in the data processing pipeline. This is because, although every message in a Kafka topic is persisted by default, no mechanism is available as of yet that allows for fast lookups of a specific message in a topic.
Nevertheless, being able to query new data at this early stage in the pipeline would avoid the delays of traditional processing pipelines that usually include long-running batch-preprocessing steps and would give end-users almost instant access to incoming data.
For building data processing applications with Kafka, the Kafka Streams library, which is maintained as part of the Kafka project, is commonly used to define data transformations and analyses. One important feature of Kafka Streams are state stores, offering an abstraction of a fast local Key-Value Store that can be read and written to when processing messages with Kafka Streams. These Key-Value stores can be continuously filled with new messages from a Kafka topic by defining an appropriate stream processor, so that it is now possible to quickly retrieve messages from the underlying topic.
Building on top of this Kafka Streams functionality, we create a unified REST API that provides a single querying endpoint for a given Kafka topic.
In summary, combining Kafka Streams processors with State Stores and an HTTP server can effectively turn any Kafka topic into a fast read-only key-value store.
Architecture of a Kafka Streams application with state stores
Kafka Streams is built as a library that can be embedded into a self-contained Java or Scala application. It allows developers to define stream processors that perform data transformations or aggregations on Kafka messages, ensuring that each input message is processed exactly once. Using the Kafka Streams DSL, which is inspired by the Java Stream API, stream processors, and state stores can be flexibly chained.
Furthermore, when starting multiple instances of a Kafka Streams-based application, the processes automatically form a load-balancing, highly available processing cluster without depending on external systems other than Kafka.
To illustrate the architecture of a Kafka Streams application that employs state stores, imagine the following scenario: As a railway operator, every time a customer books a trip on our website, a new message consisting of the customer id and a timestamp is inserted into a Kafka topic. Specifically, one or more Kafka producers insert these messages into a topic with nine partitions. Because the customer id is chosen as the key for each message, data belonging to a given customer will always be inserted into the same partition of the topic. Implicitly, Kafka producers use the
DefaultPartitioner to assign messages to partitions.
Now, assume we have a Kafka Streams application that reads messages from this topic and persists them in a state store. Because the key of each message only consists of the customer id, the corresponding value in the state store will always be the timestamp of the customer’s latest booking. In order to achieve the maximum degree of parallel processing, we can start up to nine instances of the Kafka Streams application. In this case, each application instance will be assigned exactly one of the topic partitions. For each input partition, Kafka Streams creates a separate state store, which in turn only holds the data of the customers belonging to that partition.
The resulting application architecture is illustrated in the diagram below.
It is possible to either use in-memory or persistent state stores in the application. Operations on in-memory state stores are even faster compared to the persistent variant, which internally uses a RocksDB store. On the other hand, persistent state stores can be restored faster in case a Kafka Streams application has failed and needs to restart. Moreover, the data volume per store is not limited by the amount of main memory when using persistent state stores.
In our scenario, it is not necessary to have a changelog topic that records write operations to the state stores: All data necessary to recover a state store can be obtained from the original input topic.
Adding a REST endpoint to stream processors
With the architecture presented so far, we have nine state stores that can be used to retrieve the latest booking date of the customers belonging to the respective input topic partition.
Now, in order to make this information accessible from outside of the Kafka Streams processors, we need to expose a service endpoint on each of the stream processor application instances and answer incoming requests from the internal state store that is managed by Kafka Streams.
As an additional requirement, we cannot expect the requesting application to know which Kafka Streams instance is currently responsible for processing the data of a given customer. Consequently, each service endpoint is responsible to redirect the query to the correct application instance if the customer data is not locally available.
We chose to implement the service endpoint as a REST API, which has the benefit of being accessible from any client that supports HTTP, and allows to add in a transparent load balancer very easily.
KafkaStreams object, which is available in every Kafka Streams application, provides read-only access to all local state stores and can also determine the application instance responsible for a given customer id. When using this object to build our REST service, the architecture looks as follows:
Summarizing, our proposed architecture makes use of Kafka topics to reliably store message data at rest and maintains a second representation of the data in state stores to support fast queries.
Ensuring scalability of the application
To obtain a scalable application, we need to ensure that the processing load is equally balanced over all instances of the Kafka Streams application. The load on an individual stream processor depends on the amount of data and queries it has to handle.
More specifically, it is important to choose a partitioning scheme for the Kafka topic such that the load of incoming messages and queries is well-balanced across all partitions and consequently also all stream processors.
If in-memory state stores should be used, the number of partitions in the topic must be big enough so that each stream processor is able to keep the data volume of one partition in main memory. Note that in case of fail-overs, a stream processor might even need to hold two partitions in main memory.
To account for stream processor failures, the number of standby replicas can be configured using the
num.standby.replicas setting in Kafka Streams, which ensures that additional stream processors also subscribe to messages from a given processor partition. In case of a failure, those processors can quickly take over answering queries, instead of starting to reconstruct the state store only after a failure has already occurred.
Finally, the desired number of stream processors has to fit to the available hardware. For each stream processor application instance, at least one CPU core should be reserved. On multicore systems, it is possible to increase the number of stream threads per application instance, which mitigates overhead incurred by starting a separate Java application per CPU core.
Building the Kafka Streams application
The complete source code for this post is available on GitHub.
This walkthrough uses the Java API for Kafka Streams. Sticking to the example use case from above, we will assume that the customer ids and timestamps have a string format for simplicity, so both the keys and values of our Kafka messages can be (de-)serialized into Java Strings. Other serialization formats can be adopted in a straight-forward manner for real applications.
Our Kafka Streams topology consists of a single
KTable, which expresses that we are only interested in the latest value for each key in the input topic. This
KTable is materialized as an in-memory state store, which facilitates fast lookups. Implicitly, this means that for each incoming message an update operation is performed on the key-value store using the exact same key and value that are defined in the message.
The stream topology is defined in App.java:
The above definition of our
KTable will allow us to retrieve exactly one value per customer id (key) later on. However, we can also make use of the range-querying capabilities that are offered by state stores to retrieve multiple values per key. To achieve this, we need a secondary component for the keys that we insert into the state store.
In our example, this could be the message timestamp. Assuming that the combination of customer id and timestamp is unique, we would now create a new entry in the key-value store for each incoming message. The range query capability then allows us to retrieve all bookings of a customer in a given time period.
Alternatively, we could also use a windowed state store, which implicitly adds the timestamp to the key. Range queries on a windowed state store include a time window and a key range.
Creating a REST service for the state store
Finally, we can define our REST API, which will perform the fast lookups provided by the state stores accessible from outside the stream processors. This functionality is referred to as interactive queries in Kafka Streams, and the following code examples are inspired by Confluent’s example applications.
Our HTTP server is built with Jetty and uses Jackson to serialize the responses as JSON strings. In its constructor, it receives a reference to the
KafkaStreams object and the name of our state store as well as the host name and port that can later be used to make queries to this application instance.
From our App.java, we can start the
RestService just before calling the actual Kafka Streams start method.
As we will see in the next step, it is sometimes necessary to redirect incoming queries among the stream processor instances. Thus, we have to make sure that the stream processors know the REST endpoint addresses of each other. Kafka Streams provides a builtin infrastructure for sharing this information automatically. All that we have to do is to provide the local processor address to the
application.server property on startup:
Finally, we can define the REST endpoint that answers the query for a specific key, which is a customer id in our scenario. To provide a response, we have to consult the Kafka Streams state to provide us metadata for the requested key and our state store. If the data for the key is stored on a different host, we will issue an HTTP request directly to the stream processor holding the requested information. Otherwise, we can obtain it from the local store, or return a HTTP 404 response if the key does not exist.
With that, our REST API is ready to use. Once one or more instances of the Kafka Streams application were started, we can provide clients with a list of their REST endpoint addresses (i.e. DNS names or IP addresses with ports). The clients can then send requests to any of the stream processors (e.g. in a round-robin scheme), and the REST endpoints will return the results, internally forwarding the request to the correct processor if necessary.
To avoid having to implement the round-robin selection of stream processors in every consumer application, it is often useful to install a reverse proxy, such as nginx or haproxy. As most of our customers deploy their Kafka Streams applications on Kubernetes, a Kubernetes Service using the integrated LoadBalancer service type is also a good fit for this use case:
Selecting the right processor in the load balancer
With a central reverse proxy in place, we can make a more educated decision about which stream processor to consult for a given request. By making Kafka Streams internal lookup table available to the proxy, the additional hop from one stream processor to another can be avoided most of the times.
Only in case the lookup information is not accurate on the reverse proxy, the internal forwarding mechanism is still used as a fallback. However, for the largest part of the incoming queries, the HTTP server threads of our stream processors are freed up from waiting for other stream processors to respond, but can instead process requests to their local State Stores.
We can add another REST endpoint that provides the reverse proxy with the necessary information to choose the correct stream processor. The response contains a list of the available stream processors together with the input partition numbers that are assigned to each of them.
With this information, the proxy is able to determine the correct REST endpoint for an incoming request in a two-step process: Firstly, it needs to compute the correct partition number for the requested key. In other words, the
DefaultPartitioner used in our Kafka producers needs to be re-implemented in the reverse proxy. Now, the proxy can look up the correct stream processor by using the information provided by our new REST endpoint.
We have seen that Kafka Streams state stores in combination with intelligent reverse proxies enable fast point queries into the messages of key-partitioned Kafka topics. Although the complexity of queries are limited, for example in comparison to KSQL, the proposed architecture can still provide a lightweight and easily scalable foundation for many types of applications. Kafka Streams has proven to be a flexible foundation for such interactive querying applications, by providing great freedom in the implementation of the service endpoints and the request protocol that is used.