Integration Tests for Kafka Consumer
Using embeddedKafka
and JUnit5
Introduction
Recently, I wrote a comprehensive tutorial on how to build an event-driven application with Kafka and DynamoDB. The tutorial covers the end-to-end of building a working application which exposes 2 x endpoints:
- 1 x
POST
endpoint to receive a request to create a new user - 1 x
GET
endpoint to return a user information
The POST
endpoint publishes a message containing a user creation request from the client to a Kafka topic. There’s a Kafka listener pipeline that subscribes to this topic, consumes the messages, and, finally, persists the new user information in a DynamoDB table. Check out the tutorial for more information.
In this tutorial, we will be looking at how to write an integration test for our POST
route, particularly on the Kafka Producer component. We will be using embeddedKafka
and JUnit5
.
TLDR — Github repo here.
Please note the dependencies in build.gradle.kts
and the Gradle wrapper version have been upgraded as per this pull request.
Prerequisites
To be able to follow along, please clone this Github repo to your local machine. Though not compulsory, it is highly recommended that you follow the tutorial on building the application with Kafka and DynamoDB, which is what we will be writing the integration test for.
Integration Test Dependencies
We need to add the following library to the build.gradle.kts
to support our Kafka integration test: org.springframework.kafka:spring-kafka-test
. This library provides the EmbeddedKafka
, which is an in-memory Kafka that we will use in our integration test.
Another test dependency that we need is org.springframework.kafka:spring-kafka
, which provides the KafkaTestUtils
class. We’ll use this class to construct our Kafka producer properties in the test class.
We will also need com.nhaarman.mockitokotlin2:mockito-kotlin
library to help with the mocking of methods. This is pretty much the Kotlin implementation of Mockito.
The JUnit5 dependency has been covered by this library: org.springframework.boot:spring-boot-starter-test
, which is already in our build.gradle.kts
file.
The dependencies section of our build gradle
file becomes as follows.
Kafka Listener Test
Let’s go ahead and create a new file to contain our integration test in the test
directory. Name it KafkaListenerTest.kt
.
Side note: if you develop on IntelliJ, you can do this to generate the test class in the right location. Place your cursor on the class name, right-click, and select Show Context Actions, or press on your keyboard shortcut keys for this.

Once you’ve done this, you will see the KafkaListenerTest
class is created as below.

Write the basic skeleton
We are going to write a Spring Boot-based test since our Kafka listener is a Spring component. Our initial test class looks like this.
Let’s go through briefly what those annotations are for.
@DirtiesContext
is used to tell Spring that the application context created as part of this test is dirty and should be cleared after the test is finished. We use this because the EmbeddedKafka
might be “dirty” if we have multiple test classes that rely on it.
@EmbeddedKafka
is used to provide an in-memory Kafka instance in Spring-based tests. It also provides an instance of EmbeddedKafkaBroker
bean that we can inject and use in our test.
@SpringBootTest
is used to run a Spring Boot-based test. This means that Spring will manage the lifecycle of the beans, dependency injection, and so on.
Mock the handler bean
Because we are only interested in testing the Kafka consumer part, we are going to mock the CreateUserRequestHandler
bean using yet another Spring annotation for testing: @MockBean
.
This annotation replaces the bean of the same type found in the application context with a mock. This means we can control the behaviour quite easily for our testing purposes.
@MockBean
private lateinit var mockCreateUserRequestHandler: CreateUserRequestHandler
Create application properties file for test
Just as the main Spring Boot application loads and uses the configurations defined in application.yml
, our Spring Boot test classes also do the same. The difference is the test classes will look for the file in the resources folder within the test folder and not the main folder. We need to provide this for our test classes.
For now, let’s go ahead and just copy-paste the content from application.yml
in the main folder.
Prepare a Kafka producer
Since we are testing our Kafka consumer, we need a Kafka producer that publishes the message to the same topic so that our consumer will react and consume it from the topic. This Kafka producer depends on the auto-wired EmbeddedKafkaBroker
instance as it needs the broker address. Let’s define it as a late initialised variable.
@Autowired
private lateinit var embeddedKafkaBroker: EmbeddedKafkaBrokerprivate lateinit var producer: ReactiveKafkaProducerTemplate<String, CreateUserRequest>
Additionally, we want the Kafka producer to be using the correct type of serialiser to ensure our Kafka consumer can deserialise the message. To do this, we can use the same value as the one defined in the KafkaConfigProperties
bean. Let’s auto-wire it to the test class.
@Autowired
private lateinit var kafkaConfigProperties: KafkaConfigProperties
Finally, we need to initialise this Kafka producer before all the test cases run. To do this, we can use the @BeforeAll
annotation from JUnit5 on a method in the test class. What @BeforeAll
does is that it specifies that the annotated method is to be run before all the test cases in the test class. The method will only be executed once.
NOTE: The test class needs to be annotated with @TestInstance(TestInstance.Lifecycle.PER_CLASS)
for the @BeforeAll
annotated method to work or else you will get an exception like this.

Here’s our setup()
method that is annotated with @BeforeAll
.
@BeforeAll
fun setup() {
val producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.brokersAsString)
.apply {
this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = kafkaConfigProperties.serializer
this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = kafkaConfigProperties.serializer
this[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = kafkaConfigProperties.schemaRegistryUrl
}
producer = ReactiveKafkaProducerTemplate(SenderOptions.create(producerProps))
}
Great! Now our Kafka producer is ready to be utilised to send messages to the topic that the KafkaListener
bean is listening to.
Use dynamic Kafka broker address
Notice that the KafkaListener
is configured to be talking to a Kafka broker at the address specified by the broker
property of the KafkaConfigProperties
. The current value is set to localhost:9092
in the application.yml
file.
On the other hand, EmbeddedKafka
will run on a random port every time, so we cannot predetermine the address of the broker. To resolve this situation, we are going to modify the application.kafka.broker
property in our application.yml
file in the test folder.
application:
...
... kafka:
broker: "${spring.embedded.kafka.brokers:#{localhost:9092}}"
The above expression is known as the SpEL (Spring Expression Language), and what it does here is it will get the value of spring.embedded.kafka.brokers
if it exists or else defaults to localhost:9092
. EmbeddedKafka
is the one that sets the spring.embedded.kafka.brokers
property. This way, our KafkaListener
will be able to reference the correct broker.
Write the test case
We are pretty much ready to write the test case to see if our Kafka consumer does what it is supposed to do. The test case is simple:
- Given a
CreateUserRequest
message is published to the create user request topic, - the KafkaListener bean should consume that message, and
- then call the
handleCreateUserRequest
method of theCreateUserRequestHandler
class.
Let’s first write the part of the test case that publishes the message to the topic.
@Test
fun `should consume CreateUserRequest message and calls the handler method`() {
val createUserRequest = CreateUserRequest("email@some.com", "Joe", "Jones")
val producerRecord = ProducerRecord(
kafkaConfigProperties.createUserRequestTopic,
UUID.randomUUID().toString(),
createUserRequest
)
StepVerifier.create(producer.send(producerRecord))
.expectNextMatches { it is SenderResult }
.verifyComplete()
}
Here we have created the producer record with the CreateUserRequest
message containing the user details to be created, and we’ve published the message to the topic as defined by kafkaConfigProperties.createUserRequestTopic
with a random UUID for the message key.
We wrap the producer.send(producerRecord)
call with StepVerifier
because it enables us to verify an asynchronous publisher and its expected event when it is subscribed to. If we look at the method signature of send()
, its return value type is Mono<SenderResult<Void>>
.
Mono
is an asynchronous publisher type that is provided by Project Reactor. From this, we know that the expected event is going to have a type of SenderResult
. Once verified, we know that the message has been published successfully. This is essentially what we are doing with the StepVerifier
.
Now that the message has been published, we expect that the KafkaListener
bean consumes it. But how do we make sure of this? It is by verifying whether there is any interaction with mockCreateUserRequestHandler
and especially that its handleCreateUserRequest
method is invoked, with the published CreateUserRequest
as its input, exactly one time.
Before writing the verification, we also need to mock the handleCreateUserRequest
method to return Mono<Unit>
. If we don’t do this, we will end up with a null pointer exception when the code reaches the part that invokes handleCreateUserRequest()
.
given(mockCreateUserRequestHandler.handleCreateUserRequest(createUserRequest)).willReturn(Mono.just(Unit))
verify(mockCreateUserRequestHandler, timeout(5000L).times(1))
.handleCreateUserRequest(createUserRequest)
This is how the test case looks after we have incorporated all the above steps.
Now, go ahead and run the test case.
Executing…
Waiting…
Ouch! We got an exception!

Let’s bring our attention to this line here.
Failed to send HTTP request to endpoint: http://localhost:8081/subjects/create-user-request-key/versions
That looks like our schema registry URL. The problem here is that we don’t have a real schema registry running and our Kafka producer is trying to connect to it as it is sending the message.
How do we fix this? By using the MockSchemaRegistryClient
from the Kafka Confluent library.
Mock Kafka Avro Serializer and Deserializer
We have to define two new classes in our test folder. One class will extend KafkaAvroSerializer
and the other KafkaAvroDeserializer
. Both will use MockSchemaRegistryClient
as their schema registry client.
Important: Both mocks that are to be created have to use the same MockSchemaRegistryClient
instance, or else when the Kafka consumer tries to deserialise a message from the topic, it will throw a deserialisation error like so.

So we need a singleton of the MockSchemaRegistryClient
instance to be shared by the mock serialiser and deserialiser. Here are the three new files we will have in our test folder.
Additionally, we also need to update the values of the Kafka serialiser and deserialiser in the application.yml
file in the test folder.
application:
...
...
kafka:
broker: "${spring.embedded.kafka.brokers:#{localhost:9092}}"
serializer: io.codebrews.kotlinkafkadynamodemo.MockKafkaAvroSerializer
deserializer: io.codebrews.kotlinkafkadynamodemo.MockKafkaAvroDeserializer
Now, if we run the test again, it will pass. Happy days!

Conclusion
We have learned how to set up an integration testing for Kafka consumer by using SpringBootTest
and EmbeddedKafka
. The EmbeddedKafka
provides an in-memory Kafka instance, which is very convenient. On the other hand, SpringBootTest
manages the lifecycle of all our Spring beans in the test environment, so then we can focus on writing the test cases.
Here’s the link to the pull request if you want to see the things that were added to write the integration test for Kafka consumer.
I am going to write another article on the integration testing for Kafka producer in the near future. Stay tuned.