Integration Tests for Kafka Consumer

Using embeddedKafka and JUnit5

billydharmawan
Nov 24, 2020 · 8 min read
a stack of newspapers filed upright
a stack of newspapers filed upright
Photo by AbsolutVision on Unsplash

Introduction

Recently, I wrote a comprehensive tutorial on how to build an event-driven application with Kafka and DynamoDB. The tutorial covers the end-to-end of building a working application which exposes 2 x endpoints:

  • 1 x POST endpoint to receive a request to create a new user
  • 1 x GET endpoint to return a user information

The POST endpoint publishes a message containing a user creation request from the client to a Kafka topic. There’s a Kafka listener pipeline that subscribes to this topic, consumes the messages, and, finally, persists the new user information in a DynamoDB table. Check out the tutorial for more information.

In this tutorial, we will be looking at how to write an integration test for our POST route, particularly on the Kafka Producer component. We will be using embeddedKafka and JUnit5.

TLDR — Github repo here.

Please note the dependencies in build.gradle.kts and the Gradle wrapper version have been upgraded as per this pull request.

Prerequisites

To be able to follow along, please clone this Github repo to your local machine. Though not compulsory, it is highly recommended that you follow the tutorial on building the application with Kafka and DynamoDB, which is what we will be writing the integration test for.

Integration Test Dependencies

We need to add the following library to the build.gradle.kts to support our Kafka integration test: org.springframework.kafka:spring-kafka-test. This library provides the EmbeddedKafka, which is an in-memory Kafka that we will use in our integration test.

Another test dependency that we need is org.springframework.kafka:spring-kafka, which provides the KafkaTestUtils class. We’ll use this class to construct our Kafka producer properties in the test class.

We will also need com.nhaarman.mockitokotlin2:mockito-kotlin library to help with the mocking of methods. This is pretty much the Kotlin implementation of Mockito.

The JUnit5 dependency has been covered by this library: org.springframework.boot:spring-boot-starter-test, which is already in our build.gradle.kts file.

The dependencies section of our build gradle file becomes as follows.

build.gradle.kts — with Kafka test dependencies

Kafka Listener Test

Let’s go ahead and create a new file to contain our integration test in the test directory. Name it KafkaListenerTest.kt.

Side note: if you develop on IntelliJ, you can do this to generate the test class in the right location. Place your cursor on the class name, right-click, and select Show Context Actions, or press on your keyboard shortcut keys for this.

Image for post
Image for post

Once you’ve done this, you will see the KafkaListenerTest class is created as below.

Image for post
Image for post

Write the basic skeleton

We are going to write a Spring Boot-based test since our Kafka listener is a Spring component. Our initial test class looks like this.

KafkaListenerTest.kt — initial test class structure

Let’s go through briefly what those annotations are for.

@DirtiesContext is used to tell Spring that the application context created as part of this test is dirty and should be cleared after the test is finished. We use this because the EmbeddedKafka might be “dirty” if we have multiple test classes that rely on it.

@EmbeddedKafka is used to provide an in-memory Kafka instance in Spring-based tests. It also provides an instance of EmbeddedKafkaBroker bean that we can inject and use in our test.

@SpringBootTest is used to run a Spring Boot-based test. This means that Spring will manage the lifecycle of the beans, dependency injection, and so on.

Mock the handler bean

Because we are only interested in testing the Kafka consumer part, we are going to mock the CreateUserRequestHandler bean using yet another Spring annotation for testing: @MockBean.

This annotation replaces the bean of the same type found in the application context with a mock. This means we can control the behaviour quite easily for our testing purposes.

@MockBean
private lateinit var mockCreateUserRequestHandler: CreateUserRequestHandler

Create application properties file for test

Just as the main Spring Boot application loads and uses the configurations defined in application.yml, our Spring Boot test classes also do the same. The difference is the test classes will look for the file in the resources folder within the test folder and not the main folder. We need to provide this for our test classes.

For now, let’s go ahead and just copy-paste the content from application.yml in the main folder.

application.yml — for test classes

Prepare a Kafka producer

Since we are testing our Kafka consumer, we need a Kafka producer that publishes the message to the same topic so that our consumer will react and consume it from the topic. This Kafka producer depends on the auto-wired EmbeddedKafkaBroker instance as it needs the broker address. Let’s define it as a late initialised variable.

@Autowired
private lateinit var embeddedKafkaBroker: EmbeddedKafkaBroker
private lateinit var producer: ReactiveKafkaProducerTemplate<String, CreateUserRequest>

Additionally, we want the Kafka producer to be using the correct type of serialiser to ensure our Kafka consumer can deserialise the message. To do this, we can use the same value as the one defined in the KafkaConfigProperties bean. Let’s auto-wire it to the test class.

@Autowired
private lateinit var kafkaConfigProperties: KafkaConfigProperties

Finally, we need to initialise this Kafka producer before all the test cases run. To do this, we can use the @BeforeAll annotation from JUnit5 on a method in the test class. What @BeforeAll does is that it specifies that the annotated method is to be run before all the test cases in the test class. The method will only be executed once.

NOTE: The test class needs to be annotated with @TestInstance(TestInstance.Lifecycle.PER_CLASS) for the @BeforeAll annotated method to work or else you will get an exception like this.

Image for post
Image for post
JUnitException

Here’s our setup() method that is annotated with @BeforeAll.

@BeforeAll
fun setup() {
val producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.brokersAsString)
.apply {
this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = kafkaConfigProperties.serializer
this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = kafkaConfigProperties.serializer
this[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = kafkaConfigProperties.schemaRegistryUrl
}

producer = ReactiveKafkaProducerTemplate(SenderOptions.create(producerProps))
}

Great! Now our Kafka producer is ready to be utilised to send messages to the topic that the KafkaListener bean is listening to.

Use dynamic Kafka broker address

Notice that the KafkaListener is configured to be talking to a Kafka broker at the address specified by the broker property of the KafkaConfigProperties. The current value is set to localhost:9092 in the application.yml file.

On the other hand, EmbeddedKafka will run on a random port every time, so we cannot predetermine the address of the broker. To resolve this situation, we are going to modify the application.kafka.broker property in our application.yml file in the test folder.

application:
...
...
kafka:
broker: "${spring.embedded.kafka.brokers:#{localhost:9092}}"

The above expression is known as the SpEL (Spring Expression Language), and what it does here is it will get the value of spring.embedded.kafka.brokers if it exists or else defaults to localhost:9092. EmbeddedKafka is the one that sets the spring.embedded.kafka.brokers property. This way, our KafkaListener will be able to reference the correct broker.

Write the test case

We are pretty much ready to write the test case to see if our Kafka consumer does what it is supposed to do. The test case is simple:

  • Given a CreateUserRequest message is published to the create user request topic,
  • the KafkaListener bean should consume that message, and
  • then call the handleCreateUserRequest method of the CreateUserRequestHandler class.

Let’s first write the part of the test case that publishes the message to the topic.

@Test
fun `should consume CreateUserRequest message and calls the handler method`() {
val createUserRequest = CreateUserRequest("email@some.com", "Joe", "Jones")

val producerRecord = ProducerRecord(
kafkaConfigProperties.createUserRequestTopic,
UUID.randomUUID().toString(),
createUserRequest
)

StepVerifier.create(producer.send(producerRecord))
.expectNextMatches { it is SenderResult }
.verifyComplete()
}

Here we have created the producer record with the CreateUserRequest message containing the user details to be created, and we’ve published the message to the topic as defined by kafkaConfigProperties.createUserRequestTopic with a random UUID for the message key.

We wrap the producer.send(producerRecord) call with StepVerifier because it enables us to verify an asynchronous publisher and its expected event when it is subscribed to. If we look at the method signature of send(), its return value type is Mono<SenderResult<Void>>.

Mono is an asynchronous publisher type that is provided by Project Reactor. From this, we know that the expected event is going to have a type of SenderResult. Once verified, we know that the message has been published successfully. This is essentially what we are doing with the StepVerifier.

Now that the message has been published, we expect that the KafkaListener bean consumes it. But how do we make sure of this? It is by verifying whether there is any interaction with mockCreateUserRequestHandler and especially that its handleCreateUserRequest method is invoked, with the published CreateUserRequest as its input, exactly one time.

Before writing the verification, we also need to mock the handleCreateUserRequest method to return Mono<Unit>. If we don’t do this, we will end up with a null pointer exception when the code reaches the part that invokes handleCreateUserRequest().

given(mockCreateUserRequestHandler.handleCreateUserRequest(createUserRequest)).willReturn(Mono.just(Unit))

verify(mockCreateUserRequestHandler, timeout(5000L).times(1))
.handleCreateUserRequest(createUserRequest)

This is how the test case looks after we have incorporated all the above steps.

KafkaListenerTest.kt

Now, go ahead and run the test case.

Executing…

Waiting…

Ouch! We got an exception!

Image for post
Image for post
ConnectException

Let’s bring our attention to this line here.

Failed to send HTTP request to endpoint: http://localhost:8081/subjects/create-user-request-key/versions

That looks like our schema registry URL. The problem here is that we don’t have a real schema registry running and our Kafka producer is trying to connect to it as it is sending the message.

How do we fix this? By using the MockSchemaRegistryClient from the Kafka Confluent library.

Mock Kafka Avro Serializer and Deserializer

We have to define two new classes in our test folder. One class will extend KafkaAvroSerializer and the other KafkaAvroDeserializer. Both will use MockSchemaRegistryClient as their schema registry client.

Important: Both mocks that are to be created have to use the same MockSchemaRegistryClient instance, or else when the Kafka consumer tries to deserialise a message from the topic, it will throw a deserialisation error like so.

Image for post
Image for post
SerializationException

So we need a singleton of the MockSchemaRegistryClient instance to be shared by the mock serialiser and deserialiser. Here are the three new files we will have in our test folder.

MockSchemaRegistry.kt
MockKafkaAvroSerializer.kt
MockKafkaAvroDeserializer.kt

Additionally, we also need to update the values of the Kafka serialiser and deserialiser in the application.yml file in the test folder.

application:
...
...
kafka:
broker: "${spring.embedded.kafka.brokers:#{localhost:9092}}"
serializer: io.codebrews.kotlinkafkadynamodemo.MockKafkaAvroSerializer
deserializer: io.codebrews.kotlinkafkadynamodemo.MockKafkaAvroDeserializer

Now, if we run the test again, it will pass. Happy days!

Image for post
Image for post

Conclusion

We have learned how to set up an integration testing for Kafka consumer by using SpringBootTest and EmbeddedKafka. The EmbeddedKafka provides an in-memory Kafka instance, which is very convenient. On the other hand, SpringBootTest manages the lifecycle of all our Spring beans in the test environment, so then we can focus on writing the test cases.

Here’s the link to the pull request if you want to see the things that were added to write the integration test for Kafka consumer.

I am going to write another article on the integration testing for Kafka producer in the near future. Stay tuned.

Better Programming

Advice for programmers.

Sign up for The Best of Better Programming

By Better Programming

A weekly newsletter sent every Friday with the best articles we published that week. Code tutorials, advice, career opportunities, and more! Take a look

By signing up, you will create a Medium account if you don’t already have one. Review our Privacy Policy for more information about our privacy practices.

Check your inbox
Medium sent you an email at to complete your subscription.

billydharmawan

Written by

A passionate Software Engineer trying to leave a good legacy on earth

Better Programming

Advice for programmers.

billydharmawan

Written by

A passionate Software Engineer trying to leave a good legacy on earth

Better Programming

Advice for programmers.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store