Spring Boot 3 Crash Course Part 8: Integrating Apache Kafka

Ben Meehan
9 min readApr 27, 2024

--

In this part, we will be integrating Apache Kafka with our Spring Boot application. Apache Kafka, a distributed streaming platform, is the go-to solution for real-time data processing and event-driven architectures. Paired with Spring Boot, a powerful framework for building Java applications.

Part 7: Link

Table of Contents:

  1. Understanding Apache Kafka
  2. Setting Up A Test Kafka Cluster
  3. Creating A Topic
  4. Integrating Using Spring Kafka Module
  5. Writing The Code: Producer Application
  6. Writing The Code: Consumer Application
  7. How does the partition get decided?
  8. All Configurations
  9. The Finale

Understanding Apache Kafka:

Apache Kafka is a distributed streaming platform that serves as a highly scalable and fault-tolerant messaging system. It is designed to handle large volumes of data in real-time across multiple applications or systems.

Kafka topics at a high level

Imagine you’re at a busy restaurant. There are lots of orders coming in, and the kitchen needs to keep track of them all. Each order is like a piece of information or a message.

Now, think of Apache Kafka as a super organized waiter. Instead of taking one order at a time and rushing to the kitchen, this waiter collects all the orders from different tables and groups them together. Then as previous orders get processed in the kitchen, the waiter delivers the new orders one by one.

Core Concepts:

At its core, Kafka employs a publish-subscribe model where messages are organized into logical divisions called topics. Producers publish messages on these topics, and consumers subscribe to them to receive the messages. This decouples the production and consumption of data, allowing for asynchronous communication between different components of a distributed system.

Each Kafka topic is further divided into one or more partitions

Kafka’s architecture is distributed, consisting of multiple brokers that form a Kafka cluster. Each broker is responsible for storing and managing a subset of the data across partitions. Partitions allow messages within a topic to be spread across multiple servers, enabling parallel processing and scalability.

Key Features:

One of Kafka’s key features is fault tolerance. It achieves this through data replication, where each message is replicated across multiple brokers within the cluster. If a broker fails, another broker can seamlessly take over, ensuring data availability and durability.

Each partition is further divided/replicated across multiple nodes, with one being the leader and multiple followers storing the same data, waiting to take over if the leader fails

Additionally, Kafka offers strong ordering guarantees, ensuring that messages within a partition are delivered in the order they were produced. This is essential for applications that require sequential processing of data.

Scalability is another hallmark of Kafka. As data volumes increase, new brokers can be added to the cluster to distribute the workload and accommodate the growing demand. Kafka’s distributed nature allows it to scale horizontally without downtime or data loss.

Setting Up A Test Kafka Cluster:

Kafka Getting Started provides a quick start to set up a Kafka cluster locally on your PC. You can also use Docker to set up Kafka. Please note that older versions of Kafka need Apache Zookeeper running to work.

In this tutorial, I will be using docker compose to bring up Kafka as described in this article by Mahdi Mallaki. This brings up a nice UI as well for us to use.

Run the docker-compose.yaml file using

 docker-compose -f docker-compose.yaml up -d

To bring down the Kafka cluster, Run the following

 docker-compose -f kafka-docker-compose.yaml down

Note: Make sure to have docker and docker-compose installed

To confirm if it’s working, visit http://localhost:8080 in your browser and you should see the Kafka UI.

Creating A Topic:

Click on the ‘Topics’ tab in the Kafka UI and click ‘Add a Topic’. Enter the following parameters and click ‘Create’.

I have created a test topic with 3 partitions and 2 of my brokers should store each partition for redundancy. The messages will be deleted after 1 day from my Kafka cluster. You can change the values as you like.

Integrating Using Spring Kafka Module:

Spring Boot provides the Spring Kafka module for integrating with Apache Kafka. Spring Kafka provides a comprehensive abstraction over the Apache Kafka Java client library, making it easier to configure, produce, and consume messages from Kafka topics within a Spring Boot application. It offers a set of convenient APIs and annotations that simplify the development of Kafka-based applications.

Key components of Spring Kafka include:

  1. KafkaTemplate: A high-level abstraction for sending messages to Kafka topics. It simplifies the process of producing messages by providing methods to send messages asynchronously or synchronously.
  2. @KafkaListener: An annotation used to define message listener methods within Spring components. Methods annotated with @KafkaListener automatically subscribe to Kafka topics and receive messages from them.
  3. ProducerFactory: A factory interface for creating Kafka producer instances. It is used to configure producer properties and serialize message payloads before sending them to Kafka.
  4. ConsumerFactory: A factory interface for creating Kafka consumer instances. It is used to configure consumer properties and deserialize message payloads received from Kafka.
  5. ConcurrentMessageListenerContainer: A container for message listener instances that allows concurrent consumption of messages from Kafka topics. It provides options for configuring concurrency, error handling, and message acknowledgment.

Writing The Code:

To get started, create two projects from Spring Initializr, one for the producer and one for the consumer. We will be running them separately. Make sure to add Spring for Apache Kafka and Spring Web as a dependency.

In the producer/src/main/resources/application.properties file, add the following

spring.kafka.bootstrap-servers=localhost:9192

This tells Spring the URL in which our Kafka broker is running. It can be any one of our brokers. Spring will automatically query and find the rest of the brokers.

Producer Application:

Create a KafkaProducer.java file and add the following,

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
System.out.println("Message sent: " + message);
}
}

Explanation:

KafkaTemplate Autowiring:

KafkaTemplate<String, String>: This is a Spring Kafka class used for sending messages to Kafka topics. It's parameterized with two types: the key type (String in this case) and the value type (String in this case). These types specify the type of the key and value of the messages being sent to Kafka. In this example, both the key and value are strings.

The @Autowired annotation injects an instance of KafkaTemplate into this field when the bean is created. This instance allows us to interact with Kafka.

SendMessage Method:

SendMessage takes three parameters:

  • topic: The name of the Kafka topic to which the message will be sent.
  • key: (Optional) The key of the message. Keys are used for partitioning messages in Kafka topics.
  • message: The payload of the message.

Inside the method, kafkaTemplate.send(topic, key, message) sends the message to the specified topic using the injected KafkaTemplate.

— — — — — — — — — —

Now, create a MessageController.java file and add the following,

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

private final KafkaProducer kafkaProducer;

@Autowired
public MessageController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}

@GetMapping("/send")
public String sendMessageToKafka(@RequestParam String message) {
kafkaProducer.sendMessage("test", "test", message);
return "Message sent to Kafka: " + message;
}
}

This code defines a REST controller MessageController that handles GET requests to /send, sending a message to a Kafka topic using a KafkaProducer instance injected via Spring's dependency injection, and returning a confirmation message.

Now, if you run the producer using

mvn spring-boot:run

and visit http://localhost:8081/send?message=Hello, You should see the message in the Kafka UI.

How does the partition get decided?

We can see the message is published to the 0th partition of the test topic. How was this decided? We did not specify this in our code, right?

Yes, when we don’t specify a partition, Kafka producer code automatically decides one based on the Message Key. For most use cases, it is good to keep it this way. But if you want, you can specify it in the Java code like this,

kafkaTemplate.send(topic, partition, key, message);

But this will introduce new problems like, what happens if in the future you decide to add or remove partitions? Then you will have to change your logic in the Java code as well.

Consumer Application:

Similar to the producer, create a KafkaConsumer.java application with the following code,

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
public static void main(String[] args) {
// Set up Kafka consumer properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-group");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());

// Create Kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

// Subscribe to Kafka topic
consumer.subscribe(Collections.singletonList("test"));

// Poll for new messages
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}

Explanation:

Properties Setup:

The Properties object is used to set up the configuration properties required by the Kafka consumer.

bootstrap.servers: Specifies the list of Kafka brokers in the format host:port. In this case, it's set to localhost:9092.

group.id: Defines the consumer group to which this consumer belongs. Consumers within the same group share the load of consuming messages from Kafka partitions.

key.deserializer and value.deserializer: These properties specify the deserializer classes for keys and values of Kafka messages. Here, StringDeserializer.class.getName() is used, indicating that the keys and values are deserialized as strings.

Creating Kafka Consumer:

An instance of KafkaConsumer<String, String> is created with the properties configured earlier. This class is provided by the Kafka client library.

It is parameterized with <String, String> to indicate that both the keys and values of the messages are strings.

Subscribing to Kafka Topic:

The consumer subscribes to the Kafka topic named “test” using the subscribe method. It expects a collection of topic names, so Collections.singletonList("test") is used to subscribe to a single topic named "test".

Polling for New Messages:

Inside an infinite loop (while (true)), the consumer continuously polls Kafka for new messages.

The poll method is called to fetch records (messages) from Kafka. It takes a Duration parameter specifying the maximum time to wait for new records.

The fetched records are stored in a ConsumerRecords object.

The ConsumerRecords object is then iterated over, and for each ConsumerRecord, the value of the message is printed to the console.

— — — — — — — — — —

Now, run this consumer code as well using Maven or Gradle. Whenever you push a message to Kafka, the consumer will poll for it and print it to the terminal.

Received message: Hello

All Configurations:

Kafka has so many more configurations and tweaks you can do, covering all of em’ would be impossible in a single article. You can explore everything about the Spring Kafka module Here.

The Finale:

So, that was an introduction to Kafka and Spring Boot. We explored the fundamental concepts of Apache Kafka, including its publish-subscribe model, distributed architecture, fault tolerance, and scalability. Apache Kafka with Spring Boot provides a powerful solution for building real-time data processing and event-driven architectures.

“Success, after all, loves a witness, but failure can’t exist without one.”
Junot Díaz, The Brief Wondrous Life of Oscar Wao

--

--

Ben Meehan

Software Engineer at Razorpay. Sharing knowledge and experiences.