Setting Up a Local Kafka Using Testcontainers in Spring Boot

Truong Bui
7 min readMay 31, 2023

--

Part of the Testcontainers Article Series: If you haven’t read my other articles yet, please refer to the following links:
1. Setting Up a Local MariaDB Using Testcontainers in Spring Boot
2. Setting Up a Local Redis Cluster Using Testcontainers in Spring Boot
3. Setting Up a Local Jira Instance Using Testcontainers in Spring Boot

As you may already know, in project development, it is commonplace to come across three fundamental environments: Development (Dev), Staging (Stg), and Production (Prod).

Consider a scenario where your application leverages Kafka to generate events consumed by external services. In the Staging (Stg) and Production (Prod) environments, dedicated Kafka instances simplify the connections. However, in the Development (Dev) environment, Kafka is typically unnecessary for most use cases. But what if you need to test a Kafka connection locally when no existing Kafka instance is available in the Development (Dev) environment? What actions can be taken?

  1. Pre-configuring Kafka is an option, either by installing it on your personal machine or utilizing a Docker image. Make sure to run the Kafka in advance whenever you launch the application locally.
  2. Alternatively, connecting to the Staging (Stg) Kafka may be necessary, although it can be inconvenient in certain scenarios. This setup can encounter issues due to discrepancies in Authentication Protocol between Staging (Stg) and Development (Dev) environments. Even when using the same Authentication Protocol, it may still not work due to differences in environment infrastructure, and so on.

Both ways have inconveniences as well as challenges. In a similar situation in the past, I have successfully tackled this issue using Testcontainers. By leveraging Testcontainers, we can overcome the mentioned obstacles. Today, I will create a small project to showcase the setup of a local Kafka using Testcontainers. (I won’t delve deeply into Testcontainers concepts, but I will make an effort to provide thorough explanations at each step and include official documentation links for reference)

Now let’s get started!!! 💪

Prerequisites

  • Java 17
  • Maven Wrapper
  • Spring Boot 3+
  • Swagger (for testing purposes)
  • Docker runtime in advance (Docker Install)

Defining Dependencies

Create the project as a Spring Boot project with the dependencies provided inside below POM file. I have named it kafka-testcontainers.

https://github.com/buingoctruong/springboot-testcontainers-kafka/blob/master/pom.xml

Defining Kafka Configuration Parameters Values

Kafka configuration parameters define the behavior and characteristics of a Kafka cluster. In our sample project, we focus on a specific subset of these parameters, with their corresponding values defined in the application.yaml file. If you find this information abstract at the moment, no worries! I will delve into more details and provide further explanations in the upcoming sections.

kafka:
config:
bootstrap-server: localhost:9092
max-block-ms: 5000
max-request-size: 10485880
retry: 3
buffer-memory: 10485880
batch-size: 100000
linger-ms: 10
metadata-max-age: 500
acks: 1
security-protocol: PLAINTEXT

The full version of the application.yaml file can be found here: https://github.com/buingoctruong/springboot-testcontainers-kafka/blob/master/src/main/resources/application.yaml

Building Kafka Configuration Parameters

Using the @ConfigurationProperties annotation, we can easily bind properties from the application.yaml file to member variables of a specific class. In this case, all properties in the application.yaml file with the prefix “kafka.config” will be automatically bound to the corresponding member variables of the “KafkaProperties” class. (Detailed explanations on member variables are included as comments)

@Data
@ConfigurationProperties(prefix = "kafka.config")
public class KafkaProperties {
// obtained from kafka.config.bootstrap-server
private final String bootstrapServer;
// obtained from kafka.config.max-block-ms
private final int maxBlockMs;
// obtained from kafka.config.max-request-size
private final int maxRequestSize;
// obtained from kafka.config.retry
private final int retry;
// obtained from kafka.config.buffer-memory
private final long bufferMemory;
// obtained from kafka.config.batch-size
private final int batchSize;
// obtained from kafka.config.linger-ms
private final int lingerMs;
// obtained from kafka.config.metadata-max-age
private final int metadataMaxAge;
// obtained from kafka.config.acks
private final String acks;
// obtained from kafka.config.security-protocol
private final String securityProtocol;
...
}

After obtaining values for the member variables, we are now ready to construct the Kafka Configuration Parameters. Fill in the “” section in the above class with the following snippet.

public Map<String, Object> kafkaConfigs() {
Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", bootstrapServer);
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
props.put("max.block.ms", maxBlockMs);
props.put("max.request.size", maxRequestSize);
props.put("buffer.memory", bufferMemory);
props.put("retries", retry);
props.put("reconnect.backoff.ms", 500);
props.put("batch.size", batchSize);
props.put("linger.ms", lingerMs);
props.put("metadata.max.age.ms", metadataMaxAge);
props.put("acks", acks);
props.put("security.protocol", securityProtocol);
return props;
}

The map we are constructing consists of Kafka configuration parameters as the “keys” and their corresponding values as the “values”. This map will be utilized to create an instance of the “ProducerFactory” in the next section.

For further details regarding Kafka Configuration Parameters, you can refer to this link: producer-configs

To simplify the demonstration, I have chosen to use “PLAINTEXT” as the security protocol. However, Kafka also supports other protocols such as SSL, SASL_PLAINTEXT, and SASL_SSL. I’ll leave the exploration of these protocols as homework for you. 😃

Producer Configuration

To publish messages to a Kafka topic, we require producer instances. These instances are created by “ProducerFactory”. So let’s configure “ProducerFactory” and define its properties using the kafka configuration parameters map in the previous section.

@Bean
public ProducerFactory<String, String> producerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(kafkaProperties.kafkaConfigs());
}

Then we need a “KafkaTemplate” instance, which wraps a producer instance and provides convenience methods for sending messages to Kafka topics.

@Bean
public KafkaTemplate<String, String> kafkaTemplate(KafkaProperties kafkaProperties,
ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

Controller

Going to this section indicates that we have completed the Kafka configurations tasks. Let’s create a simple controller to test Kafka on the local machine.

@Slf4j
@RestController
@RequestMapping("/kafka/topic")
@RequiredArgsConstructor
public class KafkaController {
private static final String TOPIC = "kafka_topic";
private final KafkaTemplate<String, String> kafkaTemplate;
@GetMapping(path = "/{message}", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> sendMessage(@PathVariable("message") String message) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, "key",
message);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Sent message=[" + message + "] with offset=["
+ result.getRecordMetadata().offset() + "]");
} else {
log.warn("Unable to send message=[" + message + "] due to : " + ex.getMessage());
}
});
return new ResponseEntity<>(
"Finish sending [" + message + "], Check log to know the result", HttpStatus.OK);
}
}

Local Kafka Setup

Okay, let’s go with the most interesting section 😆

As you may have noticed, the value of “kafka.config.bootstrap-server” in the application.yaml file is merely a placeholder. There is no actual Kafka exists with the host/port pair connection “localhost:9092”.

During application startup, the following steps need to be taken:

  1. Utilize Testcontainers for constructing a container implementation for Kafka. (Make sure to add necessary environment variables to create a “kafka_topic” topic in advance. This step is specifically for controller testing when sending messages to that topic)
  2. Starts the container using docker, pulling an image if necessary.
  3. Retrieve the host/port pair connection of the recently started container.
  4. Update the value of “kafka.config.bootstrap-server” in the application.yaml file.

Now, the question arises: How can we execute code during application startup? The solution is to implement ApplicationContextInitializer interface, which accepts ApplicationContextInitializedEvent. This event is sent when ApplicationContext becomes available, but before any bean definitions are loaded.

@Configuration
public class LocalKafkaInitializer
implements
ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(@NonNull ConfigurableApplicationContext context) {
kafkaLocalSetup(context);
}

private void kafkaLocalSetup(ConfigurableApplicationContext context) {
ConfigurableEnvironment environment = context.getEnvironment();
KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.2.2.arm64"))
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.withEnv("KAFKA_CREATE_TOPICS", "kafka_topic");
kafka.start();
setProperties(environment, "kafka.config.bootstrap-server", kafka.getBootstrapServers());
}

private void setProperties(ConfigurableEnvironment environment, String name, Object value) {
MutablePropertySources sources = environment.getPropertySources();
PropertySource<?> source = sources.get(name);
if (source == null) {
source = new MapPropertySource(name, new HashMap<>());
sources.addFirst(source);
}
((Map<String, Object>) source.getSource()).put(name, value);
}
}

I will assign the exploration of constructing container implementations using Testcontainers and the ApplicationContextInitializer interface as homework for you.

Testcontainers should only be utilized in the local environment. This configuration class is created to facilitate local application execution with a Kafka. Therefore, it should be moved to the test folder.

Local Application Startup Class

To start up the Spring ApplicationContext, we need a Spring Boot application’s main class that contains a public static void main() method.

Inside the test folder, there exists a class named “KafkaTestcontainersApplicationTests”. I have renamed it to “KafkaAppRunner” 😆 and made the following updates.

@SpringBootTest
@ComponentScan(basePackages = "io.github.truongbn.kafkatestcontainers")
@ConfigurationPropertiesScan(basePackages = "io.github.truongbn.kafkatestcontainers")
class KafkaAppRunner {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaAppRunner.class).initializers(new LocalKafkaInitializer())
.run(args);
}
}

Time to play with Testcontainers

Now, everything is ready! 😎

To launch the application, run KafkaAppRunner.main() method, it should run successfully on port 8080.

The initial run may take some time as Testcontainers needs to set up all the necessary components for the docker instance. 😅 However, this setup process only occurs once. The positive side is that from now on, we can launch the application locally at any time without the need for manual configuration of kafka-related tasks.

If you encounter this issue during your initial run, and you find yourself in a similar situation as I did, you can refer to this link for more details: https://github.com/testcontainers/testcontainers-java/discussions/6045

  • Try out with “/kafka/topic/{message}”, and the expected result should be as follows

The log:

2023-05-31T09:27:46.203+09:00  INFO 73567 --- [ad | producer-1] i.g.t.k.controller.KafkaController       : Sent message=[This is the Kafka message] with offset=[0]

We have just completed a brief demonstration to observe the setup of a local Kafka using Testcontainers. Additionally, Testcontainers can be utilized for writing integration tests. Isn’t amazing? 😃 hope it’s working as expected you guys!

Completed source code can be found in this GitHub repository: https://github.com/buingoctruong/springboot-testcontainers-kafka

I would love to hear your thoughts!

Thank you for reading, and goodbye!

--

--