Apache Kafka with Spring Boot Application

Shyamal Jadav
5 min readJan 3, 2022

--

Hello everyone!!! 👋🏻 I hope you go through the basic concepts of Apache Kafka and for what purpose it is being used. There are so many use cases available and we will working on event sourcing (log events in backend) to handle large amount of log data at backend side.

Here I will connect Apache Kafka with Spring Boot application in windows. But for basic concepts and more information, visit official doc of Apache Kafka here.

Steps we will follow:

  1. Run Kafka server in your local system
  2. Create two Spring Boot project (one for producer and one for consumer) using Spring Initializer
  3. Add Kafka configuration in Consumer service
  4. Add Kafka configuration in Producer service

Software required:

  1. IntelliJ (Community version)
  2. Kafka-2.8.1 (Download binary folder)

Step 1:

Download Kafka (version 2.8.1 — later versions has issue with binary download in windows) and extract in your system.

Now create data folder as shown below and inside data folder create two more folder as kafka and zookeeper.

> Copy path of kafka folder (in our case S:/softwares/kafka_2.12–2.8.1/data/kafka) and open config/server.properties file and paste it as value of “logs.dirs”. something like this:

log.dirs=S:/softwares/kafka_2.12-2.8.1/data/kafka

> Similarly copy path of zookeeper folder (in our case S:/softwares/kafka_2.12–2.8.1/data/zookeeper) and open config/zookeeper.properties file and paste it as value of “dataDir”. shown as below:

dataDir=S:/softwares/kafka_2.12-2.8.1/data/zookeeper

> Now copy path of folder kafka_2.12–2.8.1/bin/windows (in our case S:\softwares\kafka_2.12–2.8.1\bin\windows) and paste it inside environment variable (path) as shown in above image.

— setup of kafka in your local system is done. let’s run kafka server first —

> Now open terminal inside kafka_2.12–2.8.1 folder and run below command to start zookeeper first…

zookeeper-server-start.bat config/zookeeper.properties

to check whether zookeeper is running or not, you can check version-2 folder is created inside kafka_2.12–2.8.1/data/zookeeper, that means it is running and if not then recheck configuration you did for zookeeper.

> Open new terminal inside kafka_2.12–2.8.1 folder and run below command to start kafka… to run kafka first we need to run zookeeper.

kafka-server-start.bat config/server.properties

In output, you can check kafka server will start running on localhost:9092.

— kafka server is up and running, let’s test it by running producer and consumer —

> Open new terminal and run below command to start producer

kafka-console-producer.bat --topic test-kafka --bootstrap-server localhost:9092

this will start kafka producer and create topic as test-kafka and provide server URL as localhost:9092 where kafka is running.

again open new terminal and run below command to start kafka consumer

kafka-console-consumer.bat --topic test-kafka --from-beginning --bootstrap-server localhost:9092

this will start kafka consumer and and it will start listening to topic test-kafka and also provide server URL same as localhost:9092.

Now we can send some message from producer terminal and at same time we can see messages in consumer terminal… check output shown in below image…

So finally, kafka is working in your local system. let’s configure kafka in spring boot project.

Step 2:

> Let’s open spring initializer and create two spring boot project (for consumer and producer) and download it. for more information on how to generate spring boot project, visit this site.

Once downloaded, open it using IntelliJ and update <dependencies> inside pom.XML file as shown below:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<scope>test</scope>
</dependency>

here we have spring web, kafka and slf4j for logs.

Step 3:

> Let’s configure consumer spring application first. Add below code in application.properties:

server.port=8081
spring.kafka.consumer.bootstrap-servers= localhost:9092
spring.kafka.consumer.group-id= mygroup

here we are running this service on port number 8081 and we have added next two line to configure consumer in this service. one is for kafka server and another is for kafka group-id.

> Now create a new service class as Consumer.java and add below code:

package com.example.kafka.kafkaconsumer.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class.getName());

@KafkaListener(topics = "commentary")
public void consume(String msg) {
logger.info(String.format("->> %s", msg));
}
}

Here we have added @service annotation to this class and also added required dependencies. We are using default listener @KafkaListener provided by spring-kafka to listen to specific topics. Similarly we can listen to thousand of topics using this listener

Now run this service and to test it open new terminal and run kafka producer command:

kafka-console-producer.bat — topic commentary — bootstrap-server localhost:9092

now enter some string in producer terminal and you can see logs coming in spring boot service.

Step 4:

> Let’s configure producer spring application. Add below code in application.properties:

spring.kafka.producer.bootstrap-servers= localhost:9092

producer just need kafka server to send data.

> Now create Producer.java file and add below code:

package com.example.kafkaproducer.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String Topic, String message) {
logger.info(String.format("#### -> Producing message -> %s", message));
this.kafkaTemplate.send(Topic, message);
}
}

Here again we are using @Service annotation and required dependencies. Also we are Autowired KafkaTemplate class and created sendMessage method to send message to kafka server using KafkaTemplate (this class is provided by spring-kafka).

> Now create one more controller as KafkaController to hit the request and send message to kafka.

package com.example.kafkaproducer.controller;

import com.example.kafkaproducer.service.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

private final Producer producer;

@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}

@PostMapping(value = "/send")
public void sendMessageToKafkaTopic(@RequestBody String message) {
System.out.println(message);
this.producer.sendMessage("commentary", message);
}
}

This is basic endpoint created to send message. Here we are sending message to “commentary” topic. similarly we can send message to different topics.

Now run this microservice. it will run on default port 8080. To get output successfully, confirm kafka server and also consumer service are running.

Now open postman and hit POST request on http://localhost:8080/kafka/send and in body pass some message.

You can see output coming in consumer service shown as below:

So this is how we can send and handle millions of message from producer service to kafka server and listen messages from kafka server to consumer service. And in consumer service we can save those messages in database as well.

Check code here: https://github.com/shyamal-jadav/kafka-sb
and do follow for more amazing and easy content like this. :)

Let me know if i am missing something here and feedbacks are appreciated. Thanks :D

--

--