Schema Registry mock for Kafka Streams Tests
For the impatient, here is the link to our Github repository https://github.com/bakdata/fluent-kafka-streams-tests/tree/master/schema-registry-mock
The Need for a Transparent Schema Registry
While Confluent recommends unit tests for testing Kafka Streams topologies, it lacks Avro support for tests. As there is no mock for a schema registry, the only way to test Avro serde is to inject a mock client for the schema registry to the topology under test, which requires significantly more non-productive code, as we will show later.
We recently introduced a fluent Kafka Streams testing framework, which provides an easy-to-use, fluent DSL abstracting from the Kafka Streams testutils
provided by the Kafka project. As part of the framework, we implemented out-of-the-box Avro support with a transparent HTTP mock for the schema registry. The schema registry mock can be used independently of the main testing framework and can be used with little to no code change.
Running Example
For this article, we want to count the number of people living in a particular city. The input for this application is stored in Kafka with the following Avro schema.
The application will produce a new topic with cities and the respective number of inhabitants counted so far. The output is also stored in Avro with the following schema.
The schemas are used to generate Java classes extending Avro’s SpecificRecord
, which are (de)serialized in Kafka Streams with the SpecificAvroSerde
provided by Confluent. The following CountInhabitantsWithAvro
application sets this serde as the default value serializer with the respective URL for the schema registry (Line 13–14) and thus, the topology does not need to define any specific serde.
When consistently using SpecificAvroRecord
for input and output, all the hassle with serde becomes manageable quickly. The topology creation focuses entirely on the actual operators, and it is much easier to understand and maintain when the serde clutter disappears. For that reason, we usually also use Avro for keys.
Testing the Topology
Now, we will first highlight how we test the code with our fluent Kafka Streams test library and then compare that to the previous way.
The test first adds three people with two distinct cities to the input and then expects the respective counts. Coincidentally, it is the same structure as in the word count example described in the article about the fluent Kafka Streams testing framework, except that now the values are Avro records. Thus, for the user, there is no difference between primitive types and Avro records anymore, which is in stark contrast to how Kafka’s built-in test driver feels, as you can see below.
Using the Test Driver
For comparison, the next snippet shows how the test looks like without our testing framework. We added some comments for the non-trivial parts that can easily cost 1–2 hours to get right.
The worst part of the above test is not immediately visible: We need to change the CountInhabitantsWithAvro
application to pass the schema registry mock to the used serdes (Line 5). Using default serde does not allow us to explicitly pass the schema registry client, which results in hard-to-understand exceptions during test execution.
The code is more cluttered with the explicit serde that only needs to be added so that we can pass the mock schema registry client. Often, a refactoring for better testability also yields better code quality. However, in this case, we argue that the readability and maintainability worsen without any structural benefit. Instead, a proper dependency injection solution would work much better. However, to avoid code changes in the provided Avro serde, we went another way.
How It Works
We chose to actually mock the schema registry protocol with Wiremock, such that the topology under test uses the schema registry transparently. In particular, new schemas can be registered on-the-fly, when the respective Avro serde uses the auto_register_schemas flag and (pre-)registered schemas can be retrieved.
Looking at the test execution, the test driver initializes a new mock schema registry for each test to enforce a strong separation of tests. The newly created mock uses a random port for each test case and is automatically registered before any test case is executed (=overridden in the properties).
The test topology also exposes the schema registry client with .getSchemaRegistryClient()
to programmatically interact with the schema registry. This client can be used to pre-register schemas when auto_register_schemas
is not desired or to check that expected schemas have been registered successfully during the test.
The schema registry mock can be used separately from the remaining framework. The implementation is very light-weight: Next to the dependencies to Avro and Avro Serde, it has only an additional dependency to Wiremock. The runtime overhead of the wiremock is not noticeable.
Conclusion
Testing topologies with Avro was quite painful before. Passing the mock schema registry client to the topology required a large amount of code changes that decreased readability and maintainability.
With our transparent schema registry mock, the code under test works as expected with proper code structure. We have shown that the test and code structure is the same independent of whether we aggregate words or Avro records.