Building an Event-Driven Spring Boot Microservice with Apache Kafka

Mustafa GÜÇ
6 min readFeb 21, 2024

--

Nowadays event-driven microservices stand out as a game-changer. These nimble, self-reliant modules communicate seamlessly via events, fostering scalability, adaptability, and agility. At the heart of this revolution lies message queues particularly Apache Kafka, a distributed event streaming powerhouse, fueling the backbone of modern architecture with its robust capabilities.

Understanding Event-Driven Architecture

Event-driven architecture (EDA) is a design pattern where components within a software system communicate with each other by producing and consuming events. Events represent significant occurrences within the system and can trigger actions in other components. This approach enables loosely coupled, highly scalable systems that can react to real-time changes.

Introducing Apache Kafka

Apache Kafka provides a distributed and fault-tolerant messaging system capable of handling high volumes of data and real-time streams. It uses a publish-subscribe model, where producers publish messages to topics, and consumers subscribe to them to receive messages. Kafka’s durable storage and replication mechanisms ensure data reliability and fault tolerance.

Explore the straightforward guide on How to Easily Deploy Apache Kafka on Kubernetes or Locally.

Setting Up the Environment

Before we dive into building our microservice, let’s ensure we have a working environment set up. You’ll need:

  1. Java Development Kit (JDK)
  2. Gradle (if you use gradle-wrapper no need to install system-wide gradle)
  3. Docker
  4. A running instance of Apache Kafka
  5. Your favorite code editor (e.g., IntelliJ IDEA, Eclipse, VSCode)

Step 1: Set up a Spring Boot Project

Utilize the Spring Initializer to instantiate a fresh Spring Boot project, ensuring the inclusion of the requisite dependencies. Integrate “Spring Web” to facilitate the management of web-centric functionalities and “Spring Boot DevTools” to enhance development efficiency. Opt for Java 21 to guarantee seamless compatibility with new String templates and Java Virtual Threads.

Create and save the project files, then use your preferred Integrated Development Environment (IDE) to open them.

Before proceeding with the project implementation, ensure optimal performance by enabling Spring Boot virtual threads in the application.properties file. This configuration allows the utilization of virtual threads within the default thread executor for handling HTTP connections.

spring.threads.virtual.enabled=true

Step 2: Implement Text Producer Microservice

In this microservice, we will handle the upload of text data and publish it to the Apache Kafka topic named TEXT_DATA.

Add required Apache Kafka properties to application.properties

# src/main/resources/application.properties
spring.kafka.bootstrap-servers=localhost:9092

Create the necessary components:

The TextDataProducer class :

package com.example.kafkaapp;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.Period;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import java.util.stream.Stream;

import static java.lang.StringTemplate.STR;

@Component
public class TextDataProducer {
Logger logger = Logger.getLogger(getClass().getName());

// Constants for topic configuration
private final static int PARTITION_COUNT = 8;
private final static String TOPIC = "TEXT-DATA";
private final static short REPLICATION_FACTOR = 1;
private final KafkaTemplate<String, String> kafkaTemplate;

public TextDataProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@Autowired
public void configureTopic(KafkaAdmin kafkaAdmin) {
kafkaAdmin.createOrModifyTopics(new NewTopic(TOPIC, PARTITION_COUNT, REPLICATION_FACTOR));
}

private void sendTextMessage(String text, int lineIndex) {
if (text == null || text.isEmpty()) {
return;
}
// Sends the Link message to the topic, distributing across partitions based on the line index
kafkaTemplate.send(TOPIC, "KEY-" + (lineIndex % PARTITION_COUNT), text);
}

public void sendContentOf(File file) {
Instant before = Instant.now();
try (Stream<String> lines = Files.lines(file.toPath())) {
AtomicInteger counter = new AtomicInteger(0);
lines.forEach(line -> sendTextMessage(line, counter.getAndIncrement()));
Instant after = Instant.now();
Duration duration = Duration.between(before, after);
logger.info(STR."Streamed \{counter.get()} lines in \{duration.toMillis()} millisecond");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
  • This class is responsible for producing text data and sending it to the Kafka topic named TEXT-DATA.
  • It’s annotated with @Component, making it a Spring-managed component.
  • It has constants for topic configuration such as PARTITION_COUNT, TOPIC, and REPLICATION_FACTOR.
  • The configureTopic method, configures the Kafka topic using KafkaAdmin. It creates or modifies the topic with the specified settings.
  • The sendTextMessage method sends a text message to the Kafka topic, distributing messages across partitions based on the line index.

The most important part of the TextDataProducer class is the sendContentOf(File file) method. This method is responsible for reading the content of the given file, line by line, and sending each line as a message to the Kafka topic named TEXT-DATA.

The TextDataController class

package com.example.kafkaapp;

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;

@RestController
public class TextDataController {

private final TextDataProducer producer;

public TextDataController(TextDataProducer producer) {
this.producer = producer;
}

@PostMapping("/upload")
public Optional<String> uploadTextFile(@RequestParam("file") MultipartFile file) throws IOException {
Path tempFile = Files.createTempFile(file.getOriginalFilename(), null);
file.transferTo(tempFile);
Thread.ofVirtual().start(() -> producer.sendContentOf(tempFile.toFile()));
return Optional.of(tempFile.toString());
}
}
  • This class is a Spring REST controller responsible for handling HTTP requests.
  • It injects a TextDataProducer bean through the constructor.
  • The uploadTextFile method is mapped to handle POST requests to /upload. It accepts a multipart file and uploads it.
  • It creates a temporary file, transfers the uploaded file content to it, sends the content of the temporary file to Kafka using the TextDataProducer, and returns the path of the temporary file.

The TextProducerApplication class

package com.example.kafkaapp;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TextProducerApplication {

public static void main(String[] args) {
SpringApplication.run(TextProducerApplication.class, args);
}
}
  • This is the entry point of the Spring Boot application.
  • It’s annotated with @SpringBootApplication, which combines @Configuration, @EnableAutoConfiguration, and @ComponentScan.
  • The main method starts the Spring Boot application.

Once you implemented the text-producer, you can run the project with the following command:

./gradlew bootRun

Post an example text file to http://localhost:8080/upload endpoint with your preferred Rest client like Postman or SwaggerUI. Also, you may post the file using the following raw HTTP command in IntelliJ IDEA:

### Send a form with the text and file fields
POST http://localhost:8080/upload HTTP/1.1
Content-Type: multipart/form-data; boundary=boundary

--boundary
Content-Disposition: form-data; name="file"; filename="shakespeares.txt"

// The 'input.txt' file will be uploaded
< /Users/mustafaguc/Desktop/kafka-demo-content/shakespeares.txt

--boundary

Up to this stage, we’ve crafted a TextDataProducer capable of reading a text file line by line and sending each line to TEXT-DATA Kafka topic. Also we created a TextDataController REST Controller to provide input file to start streaming of text data.

Step 3: Dockerize the TextData Producer

Create a Dockerfile to package the microservice as a Docker image

FROM openjdk:21-slim
WORKDIR /app
COPY build/libs/text-producer-1.0.jar app.jar
EXPOSE 8080

ENTRYPOINT ["java", "-jar", "app.jar"]

Before we build Docker image we first need to build the microservice through Gradle :

./gradlew assemble

Once we successfully assemble the single fat microservice jar file we can build the Docker image in the project root folder :

docker build . -t text-producer

When the image is successfully built, run the specified command to launch the Docker image :

docker run -it -p 8080:8080 text-producer

Upon running the above command, you should observe logs similar to the following:

c.e.kafkaapp.TextProducerApplication     : Starting TextProducerApplication using Java 21.0.1 with PID 4871 (/Users/mustafaguc/projects/java/text-producer/build/classes/java/main started by mustafaguc in /Users/mustafaguc/projects/java/text-producer)
c.e.kafkaapp.TextProducerApplication : No active profile set, falling back to 1 default profile: "default"
o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port 8080 (http)
o.apache.catalina.core.StandardService : Starting service [Tomcat]
o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.17]
o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 307 ms
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
........
o.a.k.clients.admin.AdminClientConfig : These configurations '[sasl.jaas.config, idompotence.enabled]' were supplied but are not used yet.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1708262321726
o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for adminclient-1 unregistered
o.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
o.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
o.apache.kafka.common.metrics.Metrics : Metrics reporters closed
o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8080 (http) with context path ''
c.e.kafkaapp.TextProducerApplication : Started TextProducerApplication in 0.764 seconds (process running for 0.976)

Currently, we have implemented the producer part. We will cover the consumer and aggregator microservices in separate articles.

Part 2: The TextConsumer Microservice

The source code can be accessed here.

I hope you have a productive and enjoyable time during your development process!

--

--

Mustafa GÜÇ

A disciplined software engineer in design, clean code and architecture | Java, Spring | Microservices | Kubernetes | Functional Paradigm