Introducing Azkarra Streams : The first micro-framework for Apache Kafka Streams
Kafka Streams is a powerful library for building complex streaming applications on top of Apache Kafka. With time, and after multiple projects, we found ourselves writing same code for running and interacting with Kafka Streams applications in production.
We are convinced that the development of a simple micro-service based on Kafka Streams, with a minimum of features to get into production safely, should be a matter of days and not weeks.
So we decided to build our own framework in an effort to ease the development and the operation of Kafka Streams applications.
Today, we are excited to announce Azkarra Streams, a new open-source micro Java framework that lets you focus on writing Kafka Streams topologies code, not boilerplate code necessary for executing them.
Key Features
Azkarra Streams provides a set of features to quickly debug and build production-ready Kafka Streams applications. This includes, among other things:
- Lifecycle management of Kafka Streams instances (no more
KafkaStreams#start()
). - Easy externalization of Topology and Kafka Streams configurations (using Typesafe Config).
- Embedded HTTP server for querying state store (Undertow).
- HTTP endpoint to monitor streams application metrics (e.g : JSON, Prometheus).
- Web UI for topologies visualization.
- Encryption and Authentication with SSL or Basic Auth.
- Etc.
Why Azkarra ?
Before writing your first Azkarra application, let’s take some time to describe the different parts of a standard Kafka Streams application in order to get a better understanding of Azkarra benefits.
For doing this, we are going to use the famous word-count example which is available on the official Kafka Streams documentation.
First of all, we have to declare and build a Topology
.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder
.stream("streams-plaintext-input");
textLines.flatMapValues(value ->
Arrays.asList(value.toLowerCase().split("\\W+"))
)
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
.to(
"streams-wordcount-output",
Produced.with(Serdes.String(), Serdes.Long())
);Topology topology = builder.build();
After that, we need to define the application configuration.
Properties props = new Properties();
props.put(APPLICATION_ID_CONFIG, "streams-word-count");
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
and to create the KafkaStreams
instance.
final KafkaStreams streams = new KafkaStreams(topology, props);
Finally, we have to manage the runtime part of the Kafka Streams application. This means, starting the KafkaStreams
instance and managing the clean shutdown of the application using a shutdown hook.
final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(
new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
N.B : Here is the complete code : GitHub Gist.
Of course, a streaming application is rarely that simple. For example, you will need to add error handling and monitor the state of the Kafka Streams instance. In addition, you may wish to query internal stores using interactive queries, etc.
Last but not least, sometimes you have to deal with some issues (e.g: https://issues.apache.org/jira/browse/KAFKA-7380).
But the reality is that, as a Developer, you should always focus your development efforts on the definition and optimization of your topology. The reason is simple - this is the part that delivers value to your business.
First Steps with Azkarra
One of the first aspects we wished to resolve was the separation of concerns between the build of Topology
and its execution. Indeed, we thought that creating and starting a new KafkaStreams
instance should not be managed directly by developers.
So, let’s rewrite the WordCount example using the Azkarra API.
First, we are going to use a Azkarra Streams Maven Archetype for creating a simple project structure. You can run this following command :
$ mvn archetype:generate -DarchetypeGroupId=io.streamthoughts \
-DarchetypeArtifactId=azkarra-quickstart-java \
-DarchetypeVersion=0.3 \
-DgroupId=azkarra.streams \
-DartifactId=azkarra-getting-started \
-Dversion=1.0-SNAPSHOT \
-Dpackage=azkarra \
-DinteractiveMode=false
The pom.xml
already contains the Azkarra Streams and Kafka Streams dependencies :
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.3.0</version>
</dependency> <dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>azkarra-streams</artifactId>
<version>0.3</version>
</dependency>
</dependencies>
Using your favorite IDE or editor, open the Maven project. For this introduction, we will remove the bundle example and start our first app from scratch.
$ cd azkarra-getting-started
$ rm -rf src/main/java/azkarra/*.java
After that, let’s create a new file SimpleStreamsApp.java
under directory src/main/java/azkarra
, with a basic java main(String[] args)
method and our topology definition as follows :
package azkarra;@AzkarraStreamsApplication
public class SimpleStreamsApp {
public static void main(String[] args) {
AzkarraApplication.run(SimpleStreamsApp.class, args);
}
@Component
public static class WordCountTopology implements TopologyProvider {
@Override
public Topology get() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines =
builder.stream("streams-plaintext-input");
textLines
.flatMapValues(value ->
Arrays.asList(value.toLowerCase().split("\\W+"))
)
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
.to(
"streams-wordcount-output",
Produced.with(Serdes.String(), Serdes.Long())
);
return builder.build();
}
@Override
public String version() {
return "1.0";
}
}
}
As you can see, we have only implement a interface namedTopologyProvider
in order to build the Topology
. Azkarra Streams is responsible for automatically configuring and starting the so-called KafkaStreams
instance.
Next, we have to configure our application. We will create a simple file namedapplication.conf
in src/main/resources/application.conf
to contain the following code :
N.B : You can also leave the file already present in the project as it is.
azkarra {
context {
streams {
bootstrap.servers = "localhost:9092"
default.key.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde"
default.value.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde"
}
}
}
Congratulation! You just write your first streaming application using Azkarra.
Running Your App on Docker
For executing our application, we have to first start a Kafka Cluster. For that purpose we will use the official Kafka Docker images maintain by Confluent.Inc.
To start a single-node Kafka Cluster run the filedocker-compose.yml
contains in the project.
$ cd azkarra-getting-started
$ docker-compose up -d
Then, create the two topics (source, sink) used by the topology. For that, you can run the provided script :
$ chmod u+x quickstart-create-wordcount-topics.sh
$ ./quickstart-create-wordcount-topics.sh
As a last step, we will package and run the Maven project :
$ mvn clean package && java -jar target/azkarra-quickstart-java-0.3.jar
To verify, your streams application is running, check the health endpoint :
$ curl -sX GET ‘http://localhost:8080/health' | grep 'UP'
Finally, let’s produce some messages to Kafka topic streams-plaintext-input
:
$ docker exec -it azkarra-cp-broker /usr/bin/kafka-console-producer --topic streams-plaintext-input --broker-list kafka:9092Azkarra Streams
WordCount
I Heart Logs
Kafka Streams
Making Sense of Stream Processing
To consume output topic streams-wordcount-output
:
$ docker exec -it azkarra-cp-broker /usr/bin/kafka-console-consumer --from-beginning --property print.key=true --property key.separator="-" --topic streams-wordcount-output --bootstrap-server kafka:9092
Embedded HTTP Server
One of the key features of Azkarra, is the embedded web-server that exposes endpoints for managing and monitoring local streams applications
For example, you can list the stream instances running locally (i.e : the instances executed in the JVM).
$ curl -sX GET http://localhost:8080/api/v1/streams | jq .[
"word-count-topology-1–0"
]
You can also get information about a specific local streams instance.
$ curl -sX GET http://localhost:8080/api/v1/streams/word-count-topology-1-0/ | jq .
{
"id": "word-count-topology-1–0",
"since": "2019–11–26T13:48:17.35+01:00[Europe/Paris]",
"name": "WordCountTopology",
"version": "1.0",
"state": {
"state": "RUNNING",
"since": "2019–11–26T13:48:18.772+01:00[Europe/Paris]"
}
}
Finally, you can export the streams application metrics into Prometheus format :
$ curl -sX GET ‘http://localhost:8080/api/v1/streams/word-count-topology-1-0/metrics?format=prometheus'# HELP streams_incoming_byte_rate The number of incoming bytes per second
# TYPE streams_incoming_byte_rate counter
streams_incoming_byte_rate{group=”admin-client-node-metrics”,id=”word-count-topology-1–0",client_id=”word-count-topology-1–0-e04c076a-7d46–4bdc-9bc2–14d181370762-admin”,node_id=”node — 1",} 0.0
# HELP streams_incoming_byte_total The total number of incoming bytes
# TYPE streams_incoming_byte_total counter
streams_incoming_byte_total{group=”admin-client-node-metrics”,id=”word-count-topology-1–0",client_id=”word-count-topology-1–0-e04c076a-7d46–4bdc-9bc2–14d181370762-admin”,node_id=”node-1",} 1066.0
Azkarra Streams exposes additional endpoints : For more information, have a look to the REST API Reference.
Azkarra WebUI
Another cool feature of Azkarra Streams, is the default embeded Web UI available on : http://localhost:8080/ui that let you manage your streams application.
Azkarra WebUI was first designed to facilitate development but has quickly evolved into a mini-administration interface.
For example, you can stop, restart a streams application using the “Available Actions” button, explore metrics, configuration and so on.
Azkarra WebUI also ships with a simple DAG representation for the streams topology.
Interactive Queries
Finally, Kafka Streams has a great mechanism to query the states materialized by streams applications. Usually, as developers we build HTTP endpoints to expose these states using the public Kafka Streams API.
Azkarra Streams provides a default endpoint for that purpose which is accessible directly through the Azkarra WebUI.
The digram below, shows a query ‘all’ on the state store ‘WordCount’
Under the hood, the Web UI runs the following HTTP request :
curl -sX POST 'http://localhost:8080/api/v1/applications/word-count-demo-1-0/stores/count' — data '{ "type":"key_value”, "query" : {"all":{} } }' | jq
Here is another example to only query a specified key.
Going further
If you want to read more about using Azkarra Streams, the documentation can be found on GitHub Page. The documentation contains a step by step getting started to learn basic concepts of Azkarra.
The project also contains some examples.
Conclusion
Azkarra Streams is an initiative to enrich the Kafka Streams ecosystem and facilitate its adoption by developers through a lightweight micro-framework.
We hope this project will be well received by the open-source and Kafka community. Azkarra is still evolving and some features need improvements.
To support Azkarra Streams project, please ⭐ the Github repository or Tweet if this project helps you!
Thank you very much!
About Us :
StreamThoughts is an open source technology consulting company. Our mission is to inspire companies to create ever more innovative services that make the most of the opportunities offered by real-time data streaming.
We deliver high-quality professional services and training, in France, on the Apache Kafka ecosystem and Confluent.Inc Streaming platform.