Building Kafka Producer with Spring Boot

Krishna
kafka with spring
Published in
5 min readJun 6, 2024

In this artcile I am going to show you how to build Message Publisher using Apache Kafka and Spring Boot. First we will see what is Apache Kafka.

Apache Kafka is an open-source, distributed streaming platform designed for real-time event processing. It provides a reliable, scalable, and fault-tolerant way to handle large volumes of data streams. Kafka allows you to publish and subscribe to data topics, making it ideal for building event-driven applications, log aggregation, and data pipelines.

Prerequisites:

Apache Kafka

Java

Apache Maven

Any IDE (Intellij or STS or Eclipse)

Project Structure:

In this project we will expose endpoint to create user and we will publish UserCreatedEvent to Kafka Topic.

application.yml file

spring:
application:
name: message-publisher

kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

app:
topic_name: users-topic

server:
port: 8089

spring.application.name is used to define the application name.

bootstrap-servers specifies the hostname and port number of kafka.

serializer specifies which serializer needs to be used to convert java object to bytes before sending it to kafka. Based on key type we can use StringSerializer or IntegerSerializer.

(Example: org.apache.kafka.common.serialization.StringSerializer)

key-serializer is used in a scenario when same keys should go to same partition.

value-serializer specifies the which serializer needs to be used to convert java object to bytes before sending kafka. If we are using custom java class as value, then we can use JSONSerializer as value-serializer.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.lights5.com</groupId>
<artifactId>message-publisher</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>message-publisher</name>
<description>Demo project for Kafka Producer using Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

spring web, spring kafka are required dependencies.

ApplicationConfiguration class

package com.lights5.com.message.publisher;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "app")
public class AppConfig {

private String topicName;
}

This class is used to bind configuration values from application.yml file to the respective fields.

Application class

package com.lights5.com.message.publisher;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;

@SpringBootApplication
@RequiredArgsConstructor
public class Application {

private final AppConfig appConfig;

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
NewTopic usersTopic() {

return TopicBuilder.name(appConfig.getTopicName())
.partitions(3)
.replicas(2)
.build();
}
}

NewTopic Bean is used to create topic if topic doesn’t exist already on kafka broker. We can configure required number of partitions and replicas as we need.

Model Classes

User class

package com.lights5.com.message.publisher;

import java.time.LocalDateTime;

record User (
String firstName,
String lastName,
String email,
Long phoneNumber,
Address address,
LocalDateTime createdAt) {

record Address (
String city,
String country,
String zipcode) {

}
}

EventType enum

package com.lights5.com.message.publisher;

enum EventType {

USER_CREATED_EVENT;
}

EventPayload class

package com.lights5.com.message.publisher;

record EventPayload (
EventType eventType,
String payload) {

}

Endpoint to Create User (UserController class)

package com.lights5.com.message.publisher;

import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;

import static com.lights5.com.message.publisher.EventType.USER_CREATED_EVENT;

@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/users")
class UsersController {

private final UsersService usersService;

@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public void createUser(@RequestBody User user) {

usersService.publishMessage(user, USER_CREATED_EVENT);
}
}

UsersController class exposes POST method to create user, which inturn calls method in UsersService class.

UsersService class

package com.lights5.com.message.publisher;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
class UsersService {

private final AppConfig appConfig;
private final ObjectMapper objectMapper;
private final KafkaTemplate<String, EventPayload> kafkaTemplate;

public void publishMessage(User user, EventType eventType) {

try {

var userCreatedEventPayload = objectMapper.writeValueAsString(user);
var eventPayload = new EventPayload(eventType, userCreatedEventPayload);
kafkaTemplate.send(appConfig.getTopicName(), eventPayload);
}
catch (JsonProcessingException ex) {

log.error("Exception occurred in processing JSON {}", ex.getMessage());
}
}
}

KafkaTemplate is used to send messages to kafka. Spring Boot autoconfigues KafkaTemplate and injected to required class.

KafkaTemplate<K, V> is of this form. Here K is key type and V is value type.

In our case key is String type and V is EventPayload class type. So we need to use StringSerializer for key and JsonSerializer (EventPayload is custom java class type) for values.

kafkaTemplate.send() method takes topicName as 1st parameter and data to be published as 2nd argument.

Running Kafka in local:

To run this application in local, first we need to run kafka in local and then start spring boot application.

Please use this docker-compose file to run kafka in local.

version: '2.1'

services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888


kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 5
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zoo1

kafka2:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka2
container_name: kafka2
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 6
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zoo1


kafka3:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka3
container_name: kafka3
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 7
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zoo1
docker-compose -f up .

The above command start the kafka in local.

Testing using Postman:

Endpoint: http://localhost:8089/v1/users ( POST method )

Payload:

{
"firstName": "John",
"lastName": "Albert",
"email": "johnalbert@gmail.com",
"phoneNumber": "9999999999",
"address": {
"city": "NewYork",
"country": "USA",
"zipcode": "111111"
},
"createdAt": "2024-06-06T16:46:00"
}

You can verify using kafka-console-consumer command whether the data is published or not.

Github link: https://github.com/saikrishnasaski/message-publisher

Conclusion

Spring Boot provides easy integration with kafka, helps us in creating pub sub model applications easily with minimal configurations. We can develop Microservices event driven application easily with Spring Boot and Kafka.

--

--