Step by Step Integration Tests on Spring Kafka Producer

Abdullah YILDIRIM
Trendyol Tech
4 min readSep 1, 2019

--

Nowadays, one of the most important things in development is testing our code. The code tests are classified according to the purpose: unit tests, integration tests, and contract tests, etc. We pay strict attention to follow the TDD technique at Trendyol, particularly with unit tests and integration tests.

In this article, I will explain step by step how we can write an integration test if we use Kafka Streaming Platform in our applications developed with Spring Boot. Integration testing is defined as a type of testing where software modules are tested as a group and integrated logically.

Spring Kafka Producer Test

Spring Kafka Test is a Java Archive File that contains some helpful utilities to test your application. This jar has some useful methods for getting results and static methods for setting up the consumer/producer. In addition to this, there is an ‘EmbeddedKafkaBroker’ in this jar. I will address what ‘EmbeddedKafkaBroker’ is in the following section.

I will use Maven, Spring Boot, and Spring Kafka in the main project, and I will depend on the ‘spring kafka test’ in the test project. Let’s see how the test application can be used with a coding example.

Test Scenario

In e-commerce applications, the Product is the main domain. Brand data is an attribute of a Product. Whenever the Brand data changes, it must trigger some actions to be taken.

In my sample test scenario, I want to be notified when any brand change occurs. So, when some brands are added to or removed from the brands in the database, I will generate an event for other applications that should be consumed by this event. For this, I must first create the event model.

Kafka Configuration

To download and install Kafka, I need to add the spring-kafka dependency to my pom.xml:

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

I need to add the KafkaAdmin Spring bean, which will add topics for all beans of type NewTopic automatically:

The creation of the KafkaTemplate and KafkaProducerService is handled in the Configuration class. The class is annotated with @Configuration, indicating that the class can be used by the Spring IoC container as a source of bean definitions.

To use the Spring Kafka template, I need to configure a ProducerFactory and provide it in the template’s constructor.

Spring Kafka Test and Embedded Kafka

The spring-kafka-test jar contains some useful utilities to help in testing your applications. A JUnit 4 @Rule wrapper for the EmbeddedKafkaBroker is provided to create an embedded Kafka and an embedded Zookeeper server.

The EmbeddedKafkaRule constructor function takes the following values as parameters.

  • The number of Kafka servers to start.
  • Whether a controlled shutdown is required.
  • Issues that need to be created on the server.

Producer Test

Let’s create a sample test scenario. In this scenario, I will create a topic when any brand is updated. I will send a message on this topic and then test whether the message has been sent or not. This section will consist of three different methods. Two of these methods will trigger before and after the test.

Before:

The generated message needs to be kept in a queue. Therefore, I use the BlockingQueue object and use the container’s setupMessageListener method to consume the message. Then, I create two properties. The first one is a container with topic information, and the second one is a consumer with a group, autoCommit, and embedded server information. I send the consumerProperties object that I have created as a parameter to the constructor method when creating the DefaultKafkaConsumerFactory object. Then, consumer and containerProperties are conveyed to the KafkaMessageListenerContainer constructor. And then, the consumerProperties and DefaultKafkaConsumerFactory objects will be conveyed to the constructor method of the KafkaMessageListenerContainer object as a parameter. Finally, I transfer the messages I have created to the Blocking queue and the listener starts with the start of the container before starting the container.

To avoid sending a message before the container has required the number of assigned partitions, and I use the waitForAssignment () method on the ContainerTestUtils class.

For the test to work properly, I use the @DirtiesContext annotation. DirtiesContext is a test annotation that indicates that the ApplicationContext associated with a test is dirty and accordingly, it should be closed and removed from the context cache.

Test:

I need a data transfer object called UpdatedBrandDto to be notified of the brand change during the testing phase. After creating a UpdateBrandDto in my test method, I will check whether or not the message has been received to see if I can produce this object.

I use Hamcrest Matchers and AssertJ Condition to check the value.

After:

Now, I can stop the Container.

Kafka Producer Service

For this example, I will use the send() method that takes as input an UpdatedBrandEvent payload that needs to be sent. For sending messages, I use the KafkaTemplate which wraps a producer to send data to Kafka topics. The template provides asynchronous send methods which return a ListenableFuture. For details on the basics of Spring support for Apache Kafka, visit this link.

If I want to block the sending thread and get the result about the sent message, I can call the get API of the ListenableFuture object. The thread will wait for the result, but it will slow down the producer. You can visit this link for details of Spring Kafka.

The source code of this article is also available in my GitHub account.

References

--

--

Abdullah YILDIRIM
Trendyol Tech

Software Engineer, OOP, Has a Tendency Towards System Programming