Fluent Kafka Streams Tests

Photo by Jérôme Prax on Unsplash
For the impatient, here is the link to our Github repository https://github.com/bakdata/fluent-kafka-streams-tests

Unit Tests in Kafka Streams

As with most kind of applications, streaming applications also need to covered with tests to increase maintainability. Confluent mainly recommends unit tests for Kafka Streams applications. Accordingly, complex business logic should be encapsulated in separate domain classes and unit tested as if it was a traditional application. To test topologies, Kafka Streams even offers a test driver that mocks distributed components and thus facilitates unit tests* in Java.

However, we found the test driver too low level and more suited for testing operator-like topologies built with the Processor API. For more complex topologies using the Streams DSL, we rather wished for a more concise and fluent API, such as mockedstreams for Scala. Furthermore, the test driver supports topologies with Avro serde in a rather complicated way.

* The term unit tests is probably not correct as we are testing multiple units embedded in a larger framework. We have heard people referring calling them integrated tests, but there doesn’t seem to be a large consent, so we just stick with unit tests.

Fluent Kafka Streams Tests

We, at bakdata, built the fluent Kafka Streams Test, which is a wrapper around the existing Kafka Streams Testdriver, to simplify testing our Kafka Streams business logic. Our aim is to combine the bests of all worlds:

  • Concise and expressive tests revolving around data,
  • The full power of the test driver, and
  • A transparent schema registry mock for all Avro serializers as described in a separate article.

We will first demonstrate our fluent test library with a simple word count application and later by testing more complex topologies. The following code snippet shows a simple JUnit test that feeds 3 strings into a word count topology.

Simple Word Count Test

The 3 strings are added to the only input of the topology, which produces 3 outputs that are validated with expectations. Because we are testing a streaming word count, the first word is immediately output with a count of 1, even though it is later repeated with respective updated counter.

Let’s walk through the code in more detail: In Line 1, we get a new instance of the class that we want to test. In Line 4, the testTopology is defined for the app. It is the the class with which the user interacts to actually test the code. We will explain it in more detail later on. In Line 9, we get the only test input topic that the application uses via .input() and can then add single values to the stream with .add(T value) (Lines 10–12). We can disregard the key here because it is not relevant to the application. In Line 14, analogously to the input, we get the only output topic that the application writes to with .streamOutput(). To tell the framework how to interpret the stream entries, we also specify the deserializer via .withSerde(Serde keySerde, Serde valueSerde). To check the content of the output stream, we provide two main methods, .expectNextRecord() and .expectNoMoreRecord(). As we expect three records in the output stream, we have three .expectNextRecord() calls. The .hasKey() and .hasValue() calls are optional, but highly recommended to check the content of the records. After those three entries, we do not expect any further records, so we call .expectNoMoreRecord().

Setting Up the Test Topology

To setup the test topology, we use the JUnit 5 extension mechanism. For the word count example, we initialize the testTopology as a non-private instance field of the test class.

Registering the TestTopology as an Extension

The constructor of TestTopology expects a topology factory that creates the topology under test. To ensure proper isolation, we want to start with a fresh topology for different test cases inside the same test class, hence the factory. Additionally, the properties of the KafkaStreams client need to be specified. Broker and application-id must be present (Kafka testutil limitation), but are actually ignored. Most importantly, if the application expects default serde for key and value, these must be present in the properties or explicitly specified with withDefaultKeySerde(Serde serde) or withDefaultValueSerde(Serde serde). The schema registry URL can be omitted, as it is overridden by the test framework before executing a test case, as described later.

As a general hint for an easily testable streaming application, consider the extended snippet in our test class.

Registering the TestTopology as an Extension with a certain App

Our Streaming application is encapsulated in a class WordCount, which exposes the .getTopology() method and the .getKafkaProperties(). Both methods are used in the main(String[] args) to set up the KafkaStreams and can be easily called in the test.

Sample Word Count Kafka Streams Application

Table Output

In Kafka Streams, we can also interpret data streams as tables via KTables in a similar way as Kafka treats log-compacted topics. If a topology writes in such a KTable, we also want to formulate our expectations accordingly.

Simple Word Count Test with KTable-Semantics

This test is basically the same as the test above, but here we check Kafka Stream’s table semantic by calling .tableOutput() instead of .streamOutput(). Thus, we expect only the newest count for each word, in the order that they are written into the stream.

Output as Iterable for More Complex Assertions

While we can formulate basic expectations with our framework, we did not want to provide an exhaustive set of assertions over the stream output. From our experience with other testing frameworks, non-trivial tests quickly reach the point where certain checks cannot be expressed and require workarounds, and thus have limited maintainability.

Instead, outputs implement the Iterable interface over ProducerRecords, so that developers can use their favorite testing framework to express non-trivial assertions. The following snippet uses AssertJ to test the output of a click count application.

Simple Word Count Test with AssertJ on iterable Output

In this test again, we add three words to our input topic. We can now use AssertJ’s assertThat() to test out output like any regular Iterable, as the TestOutput class implements the Iterable interface. We extract the key of each ProducerRecord in our result-iterable and check if all three expected words are present. This is just a simple example of how a test could look like, the options here are as endless as your favorite testing framework’s testing tool’s capabilities.

We can use the full expressiveness of the testing frameworks to formulate our assertions by treating outputs as Iterables. However, please note that currently, streamOutputs are one-time Iterables; that is, their elements are used up after the first iteration. We might change that in the future, but to be on the safe side, outputs can be added to a collection to allow repeated traversal.

Explicit Event Time

Another important feature is the support of event time for each record to test topologies that use any kind of windowing operation. In that case, tests need to specify the record time explicitly to check the correctness of windowing operation and to guarantee reproducible results. The next snippet tests a topology that aggregates click events over a window of 1 minute.

Simple Windowed Aggregation Test for User Click-Events

For windowed tests, we introduce the concept of explicit event time. We can add events at a certain time to simulate the normal flow of incoming messages. For this, we provide two .at() methods, in which the user can specify the time at which this record should be “seen”. In the above test, we add three ClickEvents in the first one-minute-window (Lines 8–10) and then a fourth event just after that window ends (Line 11). This results in an increasing count in the first window (Lines 14–16) and a restart after the window is over (Line 17). The values contain either firstMinute or secondMinute as the records always store the begin timestamp of their window.

The preferred way is to use the .at(long ms) or .at(long time, TimeUnit unit) methods, to specify the timestamp of the following added records. Alternatively, it is possible to specify the timestamp explicitly per record with .add(key, value, timeInMs). However, the latter way makes test code harder to read when key or value is an integral type.

Multiple Inputs and Outputs

So far, we have seen only trivial topologies with exactly one input and one output. For more complex topologies, we have to specify which records are added to which inputs and from which outputs we expect the results. In the following, we left-join a click stream with a status code stream and perform a filter on the error output stream to produce an alert output stream.

Time-Windowed Test with Multiple Inputs and Outputs

This test is more complex than the previous tests, as it contains timed events plus two inputs and two outputs. Here, we demonstrate the use of .input() (and .output() , respectively) with more than one topics. We need to explicitly pass the name of the requested topic, as we cannot rely on the implicitness of a single topic being present. In Lines 3–5 we add some information regarding the status code 500, which represents an internal server error, to a static input stream against which we join in the application. Note, that we call .input(app.getStatusInputTopic()) instead of just .input(). Then, we add multiple events to the input topic of our click events, which mostly contain a 500-error. To check our output, we call .getOutput(app.getErrorOutputTopic()) and then expect our record to be present (Lines 17–22). The second output contains all “alerts” for errors with more than 5 occurrences in the last minute and as this is the case for the code 500 in our example, we expect it to be in the alertTopic (Lines 24–29). We can see that the code remains nearly identical compared to all previous tests, with the simple addition of explicitly getting the input and output topics.

The fluent API provides overloaded input and output methods that allow the user to specify the topic name. Calling the methods without the topic name results in an error if multiple inputs or outputs exist, respectively.

Under the Hood

Our Fluent test library is a thin wrapper around the test framework that Kafka Streams already provides. At the time of writing, the code base is just 500 LOC for the TestTopology and the input and output abstractions. In this way, you can see our test framework as syntactic sugar on top of the existing test framework.

However, we rather see it as a different abstraction over the existing framework in a similar way as the Streams DSL abstract over the Processor API. Sure, you can express everything in the Processor API, but it is much more readable and maintainable in the Streams DSL. In our experience, data-intensive tests are hard to maintain as is, so that any additional clutter code is just too much.

For comparison, we show the fluent test and the corresponding test using the TestTopologyDriver directly.

Example Word Count Test with Fluent Kafka Streams Tests

The test case using the TestTopologyDriver is less clear.

Example Word Count Test without Fluent Kafka Streams Tests

Most of the gained concision comes from treating input as first class objects that accept chained method invocations. The gain is even larger for multiple inputs. Similarly, outputs are also first-class citizens where serializers need to be specified only once. OutputVerifier and Expectations are roughly equivalent in expressing the assertions, but we also offer the Iterable abstractions for more advanced assertions.

For more complicated tests, we also offer access to the wrapped test driver, such that you may use combine methods in the same fashion as you can fall back to Processor API if the Streams DSL is not sufficient.

Conclusion

We have shown that our test frameworks helps to write more concise and maintainable tests without sacrificing any flexibility. The framework supports multiple inputs and outputs, has table semantics, offers out-of-the-box Avro support, and can be used to formulate much more complex assertions in conjunction with other matching frameworks.

We deliberately kept the implementation simple for now and plan on adding new features only on demand. Currently, we achieve ease of use with JUnit5, but we could support other testing frameworks when requested. Similarly, we could directly support state stores and punctuations, although the abstractions of the test driver feels good enough. Just open a ticket on Github for ideas. We also encourage contributions or any other kind of feedback.

Find us on Github

Authors: Arvid Heise and Lawrence Benson