Kafka Streams Interactive Queries

Abhishek Gupta
9 min readDec 12, 2018

This blog post explores the Interactive Queries feature in Kafka Streams with help of a practical example. It covers the DSL API and how the state store information was exposed via a REST service

Everything is setup using Docker including Kafka, Zookeeper, the stream processing services as well as the producer app

Here are the key components

  • Metrics producer service — pushes machine metrics (simulated) to Kafka
  • Average Processor service — calculates the average of the stream of generated metrics and exposes REST APIs to query them
Photo by Mitchel Boot on Unsplash

Overview

Let’s start with a high level overview — the intention is to introduce basic concepts and most of this is from the Kafka documentation

Kakfa Streams

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It’s designed as a simple and lightweight client library, which can be easily embedded in any Java application. It has no external dependencies on systems other than Kafka itself and it’s partitioning model to horizontally scale processing while maintaining strong ordering guarantees. It has support for Supports fault-tolerant local state, employs one-record-at-a-time processing to achieve millisecond processing latency and offers necessary stream processing primitives, along with a high-level Streams DSL and a low-level Processor API

Kafka Streams DSL API

The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.

Kafka Streams state stores

Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data. Every stream task in a Kafka Streams application may embed one or more local state stores that can be accessed via APIs to store and query data required for processing. Kafka Streams offers fault-tolerance and automatic recovery for such local state stores.

Kafka Streams interactive queries

Kafka Streams enables your applications to be queryable. Interactive queries allow you to leverage the state of your application from outside your application. The full state of your application is typically split across many distributed instances of your application, and across many state stores that are managed locally by these application instances.

To enable remote state store discovery in a distributed Kafka Streams application, you must set the configuration property in the config properties. The application.server property defines a unique host:port pair that points to the RPC endpoint of the respective instance of a Kafka Streams application

configurations.put("application.server", streamsAppServerConfig);

To query the full state of your application, you should be able to combine the results in the local state store as well as reach out the to state stores of remote instances (via RPC etc.)

Here is a bird’s eye view of the overall solution looks

High level pverview

Lets skim through the implementation at a high level.. full code is available on GitHub

Metrics Producer service

It’s a simple producer which simulates random machine metric data and pushes them to a Kafka topic. It generates metrics for five machines i.e. machine1 to machine5 — the machine names (machine1 etc.) are used as keys and the actual metric (1,2, 14 etc.) are used as the values

...
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps);
for (int i = 1; i <= 5; i++) {
String key = KEY_PREFIX + i;
String value = String.valueOf(rnd.nextInt(20));
record = new ProducerRecord<>(TOPIC_NAME, key, value);
RecordMetadata rm = kafkaProducer.send(record).get();
}
...

The process has been made synchronous (blocking call) on purpose (for this example) —this is done by invoking get on the Future<RecordMetadata> object returned by the send call to KafkaProducer. It is modelled as a Runnable and is scheduled using a ScheduledExecutorService (after every 10 seconds)

ScheduledExecutorService kafkaProducerScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

//run producer after every 10 seconds
kafkaProducerScheduledExecutor.scheduleWithFixedDelay(new Producer(), 5, 10, TimeUnit.SECONDS);

Metrics Average Processor service

This is a Kafka Streams application which uses the DSL API to churn the average metric reading of each machine. In addition to the Kafka Streams components, it also exposes a REST API using JAX-RS (Jersey as the implementation) to query the metrics average info (details to follow)

Kafka Streams DSL implementation for metrics average

The implementation depends on aggregation to get the job done

I thought of implementing this as a cumulative moving average using the Kafka Streams Processor API, but got a hint thanks to this — https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns

It starts by reading the raw metrics from the cpu-metrics-topic

KStream<String, String> metricsStream = builder.stream(SOURCE_TOPIC);

Then it groups the records by key i.e. machine name — metricsStream.groupByKey()

This is followed by the aggregation where a MetricsCountAndSum (POJO) object is used to represent the count and sum

Note that it uses a custom Serde (serializer and de-serializer) to handle how the MetricsCountAndSum object is persisted to and read from Kafka

The aggregated value is then converted to average. It uses mapValues to convert MetricsCountAndSum into the average reading using sum/count . The machine and their average is saved to a state store average-metrics-store — note the custom Serde (Serdes.Double()) since the average is of type java.lang.Double

rest of the stuff..

.. here is the POJO,

static class MetricsCountAndSum { private final Long count;
private final Long sum;
public MetricsCountAndSum(Long count, Long sum) {
this.count = count;
this.sum = sum;
}
//ommitted for brevity...

and the Serde implementation

.. and finally, everything is tied together and bootstrapped in KafkaStreamsAppBootstrap class, which starts up the JAX-RS (Grizzly container) and the Kafka Streams application

...
String port = “8080”;
//Start Grizzly containerURI baseUri = UriBuilder.fromUri(“http://” + HOSTNAME + “/”).port(Integer.parseInt(port)).build();ResourceConfig config = new ResourceConfig(MetricsResource.class)
.register(MoxyJsonFeature.class); //to-from JSON (using JAXB)
HttpServer server = GrizzlyHttpServerFactory.createHttpServer(baseUri, config);
server.start();
GlobalAppState.getInstance()
.hostPortInfo(Utils.getHostIPForDiscovery(), port);
//Start Kafka Streams appKafkaStreams theStream = AverageMetricsService.startStream();GlobalAppState.getInstance().streams(theStream);
...

Metrics Query Service

The information in the state stores is available only within the JVM in which the streams app is running. The average processing service exposes REST APIs for external clients to be able to query the metrics e.g. to query metrics of machine1, you can execute curl http://host:port/metrics/machine1 or use curl http://host:port/metrics to get metrics for all the machines

MetricsResource class is where the bulk of the implementation resides. To query metric for a specific machine, the metadataForKey method is used — it’s a simple key-value lookup where the name of KV store a.k.a state store is passed in

The interesting thing to note is that It makes it possible to query for the metric details and irrespective of whether the key (machine name) exists in the state store in that JVM — if it does not, a call is made to a remote (REST) endpoint. This is made possible since the StreamsMetadata object provides the host and port information associated with the key, which is then used to make the call to the remote HTTP endpoint of another stream processing app instance

...
@GET
@Path(“remote/{machine}”)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Metrics remote(@PathParam(“machine”) String machine) {
....

Similarly, you also get endpoints to query average metrics for all the machines

...
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response all_metrics() throws Exception {
Response response = null;
...

here is the remote counterpart of the above

...
@GET

@Path(“remote”)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Metrics remote() {
...

The workhorse methods which are invoked by the above mentioned endpoint implementations are as follows

getLocalMetrics(String machine) — for a single machine’s metric

getLocalMetrics — for all metrics

Finally, the objects which are returned by the endpoints are simple POJOs which are sent back as JSON (or XML if needed)

List of metrics

But, how do the Docker containers communicate ?

Each of the Kafka Streams app is running in a Docker container. For Interactive Queries to work, there are two key requirements

  • An external HTTP client (curl, browser etc.) should be able to reach the Docker container — this is achieved using a random port (e.g. 32679) using -P in docker run and the host IP (which is localhost if you’re not running Docker in VirtualBox or something which might be something like 192.168.99.100 in that case). This means you that the URL will look something like http://localhost:32769/metrics. Alright, this is simple enough !
  • The Kafka Streams Docker containers should be able to talk to each other, since they need to access remote state stores data via HTTP calls (as explained above, the REST API implementation provides dedicated remote endpoints for this) — how this is made possible is explained below

Each Kafka Streams app accepts the RPC host and port in as a part of its configuration e.g.

configurations.put("application.server", streamsAppServerConfig);

In this case, streamsAppServerConfig is derived from the HOSTNAME of the Docker container in which it is running. This is part of the HostDiscovery class

public interface HostDiscovery {
public String getHost();

public static final HostDiscovery localHostDiscovery = () -> “localhost”;

public static final HostDiscovery dockerDiscovery = () -> {

System.out.println(“Docker based host discovery..”);
String dockerHostName = System.getenv(“HOSTNAME”);

System.out.println(“Docker container host name — “+ dockerHostName);
return dockerHostName;
};
}

Notice the dockerDiscovery implementation which reads from the HOSTNAME environment variable inside the Docker container. As far as the port is concerned, this can be 8080 since thats what is defined in the code as well as the Dockerfile (using EXPOSE) — this is not a problem since this is inside the container.

To put this all together, the Kafka Streams app config has a reachable endpoint e.g. 5691ab353dc4:8080 which the other instance(s) can invoke over HTTP to query for remote state store data.

Don’t worry if all this does not sink in at once — the next section, where you will test the end to end scenario, will help clarify things. So, let’s move on and run the service….

Run the app…

Clone the repo first

git clone https://github.com/abhirockzz/kafka-streams-interactive-queries

Start by…

pulling Confluent Zookeeper and Kafka Docker images

docker pull confluentinc/cp-zookeeperdocker pull confluentinc/cp-kafka

Start the core infra..

i.e. Kafka cluster (along with Zookeeper of course)

./kafka-start.sh//outputError: No such network: confluent
Error: No such container: kafka
Error: No such container: zookeeper
4e4388574e9401a271fa1f06b6c9500c85317d590aab71a8c0dc8ca939f0834d
48137112e1a85983b65eadb7228755ded2dcc0ce497aad8af333f8ec4d55be63
88c489dbbc13dbc58303530e3b210f2d4ac09222cb34b924f74446fc9c3b5c61
Created topic "cpu-metrics-topic".

you can safely ignore the Error: messages

This will start Zookeeper, Kafka along and create topic cpu-metrics-topic with 2 partitions

Metrics producer service

Start this to initiate simulated metric data being pushed to cpu-metrics-topic topic

./metrics-producer-service-start.sh

Sanity test

Use a Kafka Console Consumer to test if the producer is working as expected

./consume-data.sh//output must be similar to...machine1 0
machine2 5
machine3 15
machine4 7
machine5 10
machine1 8
machine2 3
machine3 18
machine4 4
machine5 5
.....
Press Ctrl+C to stop...

Ok, the first half is setup. Let’s move on the …

Kafka Streams application …

… which will crunch these metrics

Start a single instance of average processor service

./average-processing-service-start.sh

and extract the port it’s running on

docker ps|grep ‘averageprocessorservice-*’|sed ‘s/.*0.0.0.0://g’|sed ‘s/->.*//g’

this will give back the random port which docker allocated for the service e.g. 32769

Query time!

Now you can use the HTTP endpoint exposed by the service to check the average metric reading from machines

To get the reading for a specific machine

curl http://localhost:[port]/metrics/[machine]e.g. curl http://localhost:32769/metrics/machine1//you should get back a JSON{
"metrics": [
{
"machine": "machine1",
"cpu": "8.703125",
"source": "5691ab353dc4:8080"
}
]
}

Although its obvious, here is what the JSON attributes represent

  • machine — the name of machine
  • cpu — the current average metric reading of the machine
  • source — this is the hostname of the Docker container from where this reading was obtained

Before we proceed, start another instance of the average processing service — since the cpu-metrics-topic topic has two partitions, the processing will be divided amongst the two instances

./average-processing-service-start.sh

The metrics producer service is churning along.. so just wait for a while (15–30 seconds) for data to get re-distributed

Now, query the average readings for all machines

curl http://localhost:[port]/metrics/e.g.curl http://localhost:32769/metrics///this too gives a JSON - this time with ALL machine metric readings{
"metrics": [
{
"machine": "machine4",
"cpu": "9.688311688311689",
"source": "815e0c107ef1:8080"
},
{
"machine": "machine1",
"cpu": "8.878048780487806",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine2",
"cpu": "8.39240506329114",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine3",
"cpu": "9.871794871794872",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine5",
"cpu": "9.291139240506329",
"source": "5691ab353dc4:8080"
}
]
}

Notice the reading for machine4 — its source is different that of other machines. This is because

  • the data for machine4 was sent to a different partition (as compared to machine1 etc.) and,
  • it was processed by a different instance of the average processing service

Hence the different source value represented by the docker container hostname of the stream processing service instance.

You can also try the port number for the second service instance — you should see the same result

As you saw, Kafka Streams Interactive Queries is a flexible and scalable way to expose your application state/data. Although this was a relatively simple example, this is really valuable in situations where you need to tap into your service data when working with multiple systems e.g. joining information from your purchases and users

That’s it for this post. Stay tuned for more….

Cheers!

--

--

Abhishek Gupta

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes