How to Test Kafka Streams Applications

Validating topologies through testing

Abhishek Gupta
Mar 16 · 6 min read
Image for post
Image for post
Photo by JR Korpa on Unsplash

The previous blog posts in the “Kafka Streams” series covered Stateless and Stateful operations in the DSL API. In this blog, we will explore a few examples to demonstrate how to use the testing utilities to validate topologies based on the Kafka Streams DSL API.

Kafka Streams provides testing utilities to execute unit tests for your stream processing pipelines without having to rely on an external or embedded Kafka cluster. In addition to testing, these utilities also serve as a great learning tool to grok various API features.

Let’s start with a high-level overview of testing related APIs

Code is available on GitHub and tests can be executed by cloning the repo followed by mvn test.

Key Concepts

Initially, there were a few classes in the org.apache.kafka.streams.test package. They are now deprecated in favor of the following classes:

An instance of TestInputTopic represents an input topic and you can send records to it using the pipeInput method (and its overloaded versions). Create TestInputTopic instances using TopologyTestDriver (explained below) and use custom serializers if needed. You can then send key-value pairs, just values one at a time or in a batch (using a List).

TestOutputTopic is the other half of the send-receive equation and complements a TestInputTopic. You can use it to read records from output topics that your topology operations write to. Its methods include reading records (key-value pairs), only the value, querying the size (no. of current records which have not been consumed), etc.

TopologyTestDriver contains a reference to the Topology as well the configuration related to your Kafka Streams application. As mentioned earlier, it is used to create instances of TestInputTopic,TestOutputTopic, provide access to state stores etc.

High-Level Flow

If you’re using Maven, you can include the testing utility as a dependency.

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>

You will (most likely) use JUnit and hamcrest to write matching rules.

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>

Here is how a test case might look (similar to how you would unit test any Java code with JUnit etc.):

  • Set up global state (if any) using @BeforeClassannotated method.
  • Set up state for each test run using @Beforeannotated method - this is where you would typically create TopologyTestDriver etc.
  • @Test methods which validate the Topology.
  • @After (and/or @AfterClass) methods for tearing down any state (be it global or otherwise).

Please ensure that you call TopologyTestDriver.close() to clean up processors in the topology and their associated state. Failure to do so might result in test failures due to inconsistent state

Now that you have an understanding of the concepts and basic setup, let’s look at a few concrete examples. We’ll start off with stateless operations.

Testing Stateless Operations

Here is the Topology which uses the filter method to only allow values that have a length greater than five.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);

stream.filter((k, v) -> v.length() > 5).to(OUTPUT_TOPIC);

And here is the corresponding test:

    @Test
public void shouldIncludeValueWithLengthGreaterThanFive() {
topology = App.retainWordsLongerThan5Letters();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());
assertThat(outputTopic.isEmpty(), is(true)); inputTopic.pipeInput("key1", "barrrrr");
assertThat(outputTopic.readValue(), equalTo("barrrrr"));
assertThat(outputTopic.isEmpty(), is(true));
inputTopic.pipeInput("key2", "bar");
assertThat(outputTopic.isEmpty(), is(true));
}

We start by choosing the Topology we want to test, creating the TopologyTestDriver instance along with the TestInputTopic and TestOutputTopic objects.

Next, we confirm whether the output topic is empty before sending any data — assertThat(outputTopic.isEmpty(), is(true));

Now data/records can be sent to the input topic using inputTopic.pipeInput("key1", "barrrrr");. This is a synchronous process and triggers the Topology which in this case executes the filteroperation and pushes this to the output topic since the value length is more than five. We confirm the same using assertThat(outputTopic.readValue(), equalTo("barrrrr")); and double-check to see whether the output topic is empty.

Finally, we send the value bar and confirm that it was not was sent to the output topic because its length is smaller than five.

As explained in part one of this series (stateless operations), here is a flatMap operation:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
String[] values = csv.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
}).to(OUTPUT_TOPIC);

In the above example, each record in the stream gets flatMapped such that each CSV (comma separated) value is first split into its constituents and a KeyValuepair is created for each part of the CSV string.

To test this:

topology = App.flatMap();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());
inputTopic.pipeInput("random", "foo,bar,baz");
inputTopic.pipeInput("hello", "world,universe");
inputTopic.pipeInput("hi", "there");
assertThat(outputTopic.getQueueSize(), equalTo(6L)); assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "foo")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "bar")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "baz")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "world")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "universe")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hi", "there")));assertThat(outputTopic.isEmpty(), is(true));

As usual, we set up the required test util classes and push input records to the input topic. e.g. for key random and its comma-separated values foo,bar,baz will be split into individual key-value pairs i.e. they will result in three records being pushed to the output table. The same applies to other input records as well.

We confirm the number of records in the output topic assertThat(outputTopic.getQueueSize(), equalTo(6L)); and validate each key-value pair to confirm the flatMap behavior.

What About Stateful Operations?

Here is an example of a Topology which uses groupByKey followed by count and stores the results in an output topic:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey()
.count()
.toStream()
.to(OUTPUT_TOPIC);

Testing a stateful operation is not very different than that of a stateless one.

topology = App.count();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
TestOutputTopic<String, Long> ot = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.Long().deserializer());
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key1", "value2");
inputTopic.pipeInput("key2", "value3");
inputTopic.pipeInput("key3", "value4");
inputTopic.pipeInput("key2", "value5");
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 2L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key3", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 2L)));

Individual records are sent to an input topic and the output topic and then the counts are validated. As expected, keys key1, key2 and key3 have counts 2, 2, 1 respectively.

Things get interesting when Topology consists of a state store. In this example, instead of sending the counts to an output topic, an intermediate state store is used (this can be queried via Interactive Queries):

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count(Materialized.as("count-store"));

TopologyTestDriver provides access to the state store (KeyValueStore) via getKeyValueStore. The state store count is validated after each record is sent to the input topic e.g. assertThat(countStore.get("key1"), equalTo(1L));:

topology = App.countWithStateStore();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());

KeyValueStore<String, Long> countStore = td.getKeyValueStore("count-store");
inputTopic.pipeInput("key1", "value1"); assertThat(countStore.get("key1"), equalTo(1L));inputTopic.pipeInput("key1", "value2");
assertThat(countStore.get("key1"), equalTo(2L));
inputTopic.pipeInput("key2", "value3");
assertThat(countStore.get("key2"), equalTo(1L));
inputTopic.pipeInput("key3", "value4");
assertThat(countStore.get("key3"), equalTo(1L));
inputTopic.pipeInput("key2", "value5");
assertThat(countStore.get("key2"), equalTo(2L));

Note that in our tests, we had created the Topology, TopologyTestDriver, TestInputTopic and TestOutputTopic in each of the tests method. This was simply because we were testing different topologies. If you were testing a single Topology using a bunch of test cases as a part of single JUnit class, you could very easily move this to setup method annotated with @Before so that it runs automatically before the start of each test case.

That’s all for now! This was a short but hopefully useful introduction to testing your Kafka Streams-based processing pipelines.

Thanks for reading!

Better Programming

Advice for programmers.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store