Kafka Integration Made Easy with Spring Boot

Avinash Hargun
Simform Engineering
9 min readSep 20, 2023

Explore the seamless integration of Kafka messaging with Spring Boot in this comprehensive guide.

The importance of processing data in real-time and ensuring seamless communication between different parts of an application cannot be overstated. One technology that has gained significant traction for enabling such capabilities is Apache Kafka. In this blog, we will understand what Kafka is, its key features, and how it fits into the Spring Boot ecosystem.

What is Apache Kafka?

Apache Kafka is primarily an open-source platform for distributed event streaming and stream processing. It was originally developed by engineers at LinkedIn and later open-sourced as a part of the Apache Software Foundation. Kafka was designed to address the challenges of handling massive amounts of data in real-time, making it a perfect fit for applications that require high throughput, fault tolerance, and real-time data streaming.

Why Use Apache Kafka?

Kafka’s architecture and features make it a powerful tool for various scenarios.

  1. Real-time Data Processing: Kafka excels at handling real-time data streams, making it ideal for applications requiring instant data updates and event-driven processing.
  2. Scalability: Its distributed nature enables seamless scalability, allowing you to handle large volumes of data without sacrificing performance.
  3. Fault Tolerance: Kafka’s replication mechanism ensures that data is not lost even in the event of broker failures.
  4. Event Sourcing: Kafka is a fundamental component of event sourcing architectures, where changes to an application’s state are captured as a series of events.
  5. Log Aggregation: Kafka plays a pivotal role by facilitating the capture and storage of application state changes as a sequential series of events.

Key Concepts of Kafka

  • Topics: Kafka organizes data into topics, which are essentially categories or channels in which records are published.
  • Producers: Producers are responsible for pushing data to Kafka topics. They can be considered data stream sources.
  • Consumers: Consumers subscribe to topics and process the records pushed by producers. They are the recipients of the data streams.
  • Brokers: Kafka clusters consist of brokers that store data and manage the distribution of records across topics.
  • Partitions: Each topic can be divided into partitions, which allow for parallel processing and the distribution of data across the cluster.

The Core of Kafka's Architecture

In the Kafka architecture, messages are the heart of the system. Producers create and send messages to specific topics, which act as categories. These topics are divided into partitions to enable efficient parallel processing.

Consumers subscribe to topics and retrieve messages from partitions. Each partition is assigned to only one consumer at a time, ensuring load balancing. The consumers process messages based on their needs, whether it’s analytics, storage, or other applications.

Kafka architecture

This architecture allows Kafka to handle vast streams of data efficiently, providing fault tolerance and scalability. It’s a reliable backbone for real-time data pipelines, event-driven applications, and more.

Now that we have understood the workings of Kafka, let’s dive into the code!

Setting Up Kafka in Spring Boot: Code Implementation

Before proceeding, it’s essential to have a Kafka server operational in your local environment. If you haven’t configured Kafka on your system yet, you can refer to the Kafka quickstart guide for detailed instructions.

We need to add the spring-kafka maven dependency to pom.xml.

        <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

Configuring Producer

To start creating messages, we first set up a production factory. This acts as a guide for forming Kafka Producer instances.

Next, we employ a KafkaTemplate, which wraps around a Producer instance and offers simple methods to send messages to specific Kafka topics.

Producer instances are designed to be thread-safe, which means using a single instance across your application’s context can boost performance. This also applies to KafkaTemplate instances.

@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

In the above code snippet, we configure the producer using the ProducerConfig properties. Here’s a breakdown of the key properties used:

  • BOOTSTRAP_SERVERS_CONFIGThis property specifies the Kafka brokers' addresses, which are a comma-separated list of host-port pairs.
  • KEY_SERIALIZER_CLASS_CONFIGandVALUE_SERIALIZER_CLASS_CONFIG: These properties determine how the key and value of a message will be serialized before being sent to Kafka. In this example, we use StringSerializer for both key and value serialization.

So, in this case, our properties file should contain the ‘bootstrap-server’ value.

spring.kafka.bootstrap-servers=localhost:9092

All the services used in this article assume running on their default port.

Creating Kafka Topics

We will be sending the message to a topic. That’s why, before sending the messages, a topic should be created.

@Configuration
public class KafkaTopic {

@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic-1").build();
}

@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic-2").partitions(3).build();
}
}

AKafkaAdmin bean is responsible for creating new topics in our broker. With Spring Boot, a KafkaAdmin bean is automatically registered.

Here we have created topic-1 with 1 partition (default) and topic-2 with 3 partitions. TopicBuilder provides various methods for creating topics.

Sending Messages

KafkaTemplatehas various methods to send messages to topics:

@Component
@Slf4j
public class KafkaSender {

@Autowired
private KafkaTemplateString, String> kafkaTemplate;

public void sendMessage(String message, String topicName) {
log.info("Sending : {}", message);
log.info("--------------------------------");

kafkaTemplate.send(topicName, message);
}
}

We just need to call the send() method with a message and topic name as parameters to publish the message.

Configuring Consumer

A KafkaMessageListenerContainerFactory receives all messages from all topics on a single thread. Also, we need to configure consumerFacotry for that.

@Configuration
@EnableKafka
public class KafkaConsumer {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ConsumerFactoryString, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}

Next, we need to consume messages with the help of @KafkaListener annotation. For that, we use annotation @EnableKafka on consumer configuration. It tells Spring to scan for @KafkaListener annotations on your beans and configure the necessary infrastructure to process Kafka messages.

@Component
@Slf4j
public class KafkaListenerExample {

@KafkaListener(topics = "topic-1", groupId = "group1")
void listener(String data) {
log.info("Received message [{}] in group1", data);
}

The groupId is a string that uniquely identifies the group of consumer processes to which this consumer belongs. We can specify multiple topics to listen to within a single consumer group. In the same way, more than one method can listen to the same topic.

@KafkaListener(topics = "topic-1,topic-2", groupId = "group1")
void listener(@Payload String data,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) int offset) {
log.info("Received message [{}] from group1, partition-{} with offset-{}",
data,
partition,
offset);
}

We can also retrieve some useful metadata about the consumed message using the @Header() annotation.

Message Consumption from a Particular Partition with an Initial Offset

In some scenarios, you might need to consume messages from a particular partition of a Kafka topic, starting from a specific offset. This can be useful when you want to reprocess specific messages or have fine-grained control over where to begin consuming them.

@KafkaListener(
groupId = "group2",
topicPartitions = @TopicPartition(topic = "topic-2",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}))
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
log.info("Received Message [{}] from partition-{}",
message,
partition);
}

By setting the initialOffset to "0," we are instructing Kafka to start consuming messages from the beginning of the partition. If you want to just specify the partition without initialOffset, just write this:

@KafkaListener(groupId = "group2", topicPartitions 
= @TopicPartition(topic = "topicName", partitions = { "0", "3" }))

KafkaListner at Class Level

Class-level annotation is suitable when you want to group related message handling logic together. Messages from these topics will be distributed to the methods within the class based on their parameters.

@Component
@Slf4j
@KafkaListener(id = "class-level", topics = "multi-type")
class KafkaClassListener {

@KafkaHandler
void listenString(String message) {
log.info("KafkaHandler [String] {}", message);
}

@KafkaHandler(isDefault = true)
void listenDefault(Object object) {
log.info("KafkaHandler [Default] {}", object);
}
}

By doing this, we can group methods that will consume data from specific topics. Here we can capture different types of data using methods annotated with @KafkaHandler.The method parameters will determine how the data is received, and if none of the data types match, the default method will be applied.

Now that we’ve covered the basics of producers and listeners using string messages, let’s explore various scenarios and use cases.

Using RoutingKafkaTemplate

We can use RoutingKafkaTemplate when there are several producers with various configurations and we want to choose a producer based on the topic name at runtime.

@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {

// ProducerFactory with Bytes serializer
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

// ProducerFactory with String serializer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);

Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile(".*-bytes"), bytesPF);
map.put(Pattern.compile("strings-.*"), stringPF);
return new RoutingKafkaTemplate(map);
}

RoutingKafkaTemplate routes messages to the first factory instance that matches a given topic name from a map of regex and ProducerFactoryinstances. The pattern strings-.* should come first if there are two patterns, str-.* and strings-.*, because otherwise the str-.* pattern will “override” it.

In the above example, we have created two patterns - .*-bytes and strings-.*. The serialization of messages depends on the topic name at runtime. The topic names ending with ‘-bytes’ will use byte serializer, while those starting with strings-.* will use StringSerializer .

Filtering the Messages

Any messages meeting the filter’s criteria will be discarded before they even reach the listener. Here, messages that contain the word “ignored” will be discarded.

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(record -> record.value().contains("ignored"));
return factory;
}

The listener is encapsulated by a FilteringMessageListenerAdapter. This adapter relies on a RecordFilterStrategy implementation, where we define the filter method. You can simply add one line in your current consumer factory to invoke the filter.

Custom Messages

Let’s now look at how to send or receive a Java object. We’ll be sending and receiving User objects in our example.

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

String msg;
}

Producer and Consumer configuration

We will be using JSON Serializer for the producer’s value configuration:

@Bean
public ProducerFactory<String, User> userProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}

And for consumers, it will be a JSON Deserializer:

public ConsumerFactory<String, User> userConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer>(User.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}

The JSON serializer and deserializer in spring-kafka use the Jackson library. which is responsible for converting Java objects to bytes and vice versa.

        <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.7.1</version>
</dependency>

This is an optional dependency, and if you want to use it, use the same version as spring-kafka.

Sending Java Objects

Let’s send the User object with the help of the userKafkaTemplate() we have created.

@Component
@Slf4j
public class KafkaSender {

@Autowired
private KafkaTemplate<String, User> userKafkaTemplate;


void sendCustomMessage(User user, String topicName) {
log.info("Sending Json Serializer : {}", user);
log.info("--------------------------------");

userKafkaTemplate.send(topicName, user);
}

Receiving Java Objects

@Component
@Slf4j
public class KafkaListenerExample {


@KafkaListener(topics = "topic-3", groupId = "user-group",
containerFactory = "userKafkaListenerContainerFactory")
void listenerWithMessageConverter(User user) {
log.info("Received message through MessageConverterUserListener [{}]", user);
}

Since we have multiple listener containers, we specify which container factory to use.

If we don’t specify the containerFactory attribute, it will take defaults to kafkaListenerContainerFactory which uses StringSerializer and StringDeserializer in our case.

Conclusion

In this comprehensive guide on Apache Kafka, we started with the basics, understanding Kafka’s core concepts. We also explained how to set up Kafka in a Spring Boot application and covered how to produce and consume messages using Kafka templates and listeners. Additionally, we talked about handling different message types, message routing, message filtering, and custom data format conversion.

Kafka is a versatile and powerful tool for building real-time data pipelines and event-driven applications, and we hope this guide has equipped you with the knowledge and skills to leverage its capabilities effectively.

As you continue your Kafka journey, keep experimenting, learning, and building, as the possibilities with Kafka are endless. Happy Kafka coding!”

To learn more about engineering at Simform, check out the rest of our Engineering Blog, and visit our Simform site.

--

--