Building an Event-Driven Spring Boot Microservice with Apache Kafka Part II

Mustafa GÜÇ
5 min readFeb 21, 2024

--

In the first part of this article we implemented a TextProducer Spring Boot microservice to publish each line of a text file to TEXT-DATA Kafka topic. In this section, we will cover the TextConsumer Spring Boot microservice, which consumes a line of text from the aforementionedTEXT-DATA Kafka topic, splits it by whitespace, calculates the frequency of each word, and finally groups them.

Setting Up the Environment

Before we dive into building our microservice, let’s ensure we have a working environment set up. You’ll need:

  1. Java Development Kit (JDK)
  2. Gradle (if you use gradle-wrapper no need to install system-wide gradle)
  3. Docker
  4. A running instance of Apache Kafka
  5. Your favorite code editor (e.g., IntelliJ IDEA, Eclipse, VSCode)

Step 1: Initialize Spring Boot Project

Use Spring Initializer or your preferred IDE to create a new Spring Boot project with Spring Web (not Spring Boot Web), Spring Kafka, and Jackson Databind dependencies as follows:

dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework:spring-web:6.1.2'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.1'
}

Step 2: Implement TextConsumer

Create application.properties as follows:

app.stop-words=i,me,my,myself,we,our,ours,ourselves,you,your,yours,yourself,yourselves,he,him,his,himself,she,her,hers,herself,it,its,itself,they,them,their,theirs,themselves,what,which,who,whom,this,that,these,those,am,is,are,was,were,be,been,being,have,has,had,having,do,does,did,doing,a,an,the,and,but,if,or,because,as,until,while,of,at,by,for,with,about,against,between,into,through,during,before,after,above,below,to,from,up,down,in,out,on,off,over,under,again,further,then,once,here,there,when,where,why,how,all,any,both,each,few,more,most,other,some,such,no,nor,not,only,own,same,so,than,too,very,s,t,can,will,just,don,should,now

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.properties.security.protocol=PLAINTEXT
spring.kafka.properties.sasl.mechanism=GSSAPI
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.properties.spring.json.type.mapping=wordFrequency:com.example.textconsumer.WordFrequency,wordFrequencyList:com.example.textconsumer.WordFrequencyList

To serialize a Kafka Message as a concrete type, JsonSerializer and the type mapping of the WordFrequency and WordFrequencyList classes are specified in the properties file.

Create the following classes:

WordFrequency :

package com.example.textconsumer;

public record WordFrequency(String word,Long count) {

}

WordFrequencyList:

package com.example.textconsumer;

import java.util.List;

public record WordFrequencyList(List<WordFrequency> frequencies) {
}

WordFrequency and WordFrequencyList records are created to transmit necessary Kafka messages without losing type safety. They are used in both producer and consumer microservices.

WordCounter :

package com.example.textconsumer;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class WordCounter {

@Value("${app.stop-words}.split(',').toLowerCase()")
private Set<String> stopWords = new HashSet<>(Arrays.asList("the", "to", "a", "and", "is", "are", "or", "in", "at", "on", "me", "i", "we", "you", "he", "she", "it"));

public Stream<String> splitText(String text) {
return Arrays.stream(text.replaceAll("\\p{P}", "").toLowerCase().split("\\s"));
}

public Stream<WordFrequency> groupByWords(Stream<String> words) {
return words
.filter(this::isNotStopWord)
.collect(Collectors.groupingBy(word -> word, Collectors.counting()))
.entrySet()
.stream().map(e -> new WordFrequency(e.getKey(), e.getValue()))
.sorted(Comparator.comparing(WordFrequency::count).reversed());
}

private boolean isNotStopWord(String word) {
return !stopWords.contains(word);
}
}

The critical point of the WordCounter class lies in its ability to efficiently count the frequency of words in a given text while excluding common stop words. This is achieved through the groupByWords method, which filters out stop words, groups the remaining words by frequency, and sorts them based on frequency in descending order. This ensures that meaningful words contribute to the final frequency count, essential for various natural language processing tasks.

TextConsumer :

package com.example.textconsumer;

import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Stream;

@Component
public class TextConsumer {
private final Logger logger = LoggerFactory.getLogger(getClass().getName());

private static final String INPUT_TOPIC = "TEXT-DATA";
private static final String OUTPUT_TOPIC = "AGGREGATE-DATA";
private final WordCounter wordCounter;
private final KafkaTemplate<String, WordFrequencyList> kafkaTemplate;

public TextConsumer(WordCounter wordCounter, KafkaTemplate<String, WordFrequencyList> kafkaTemplate) {
this.wordCounter = wordCounter;
this.kafkaTemplate = kafkaTemplate;
}

@Autowired
public void configureTopic(KafkaAdmin kafkaAdmin) {
kafkaAdmin.createOrModifyTopics(new NewTopic(OUTPUT_TOPIC, 1, (short) 1));
}

@KafkaListener(topics = INPUT_TOPIC, groupId = "TEXT_CONSUMERS")
public void consumeMessage(String message) {
Stream<WordFrequency> group = wordCounter.groupByWords(wordCounter.splitText(message));
kafkaTemplate.send(OUTPUT_TOPIC, "KEY-1", new WordFrequencyList(group.toList()));
}
}

The main point of the TextConsumer class is to consume text messages from a Kafka topic, process them using a WordCounter instance, and then publish the resulting word frequencies to another Kafka topic. This is achieved by annotating the method consumeMessage with @KafkaListener to handle incoming messages, processing them using the WordCounter, and sending the results to an output Kafka topic via a KafkaTemplate. Additionally, the class ensures that the output topic is created or modified during initialization.

As you may notice, we send WordFrequencyList data to another Kafka topic named AGGREGATE-DATA as soon as a new Kafka message is retrieved from TEXT-DATAtopic. This means our TextConsumer app is both a consumer and producer microservice. By publishing a message to AGGREGATE-DATA topic we initiate another flow. We will cover TextAggregate microservice in the third phase.

TextConsumerApplication:

package com.example.textconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TextConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(TextConsumerApplication.class, args);
}

}

TextConsumer microservice acts as both consumer and producer roles.

After creating the classes and resources, you can run the following command to verify if the Kafka Producer microservice is operational:

./gradlew bootRun

Go to text producer microservice and upload a text file on it, as soon as the upload is finished the text content of it must be read line by line and be dispatched to TEXT-DATA. You can consume these text messages through the TextConsumer.consumeMessage. If the consume service works you can package and dockerize it.

Build single jar :

./gradlew assemble

Create Dockerfile:

FROM openjdk:21-slim
WORKDIR /app
COPY build/libs/text-consumer-1.0.jar app.jar

ENTRYPOINT ["java", "-jar", "app.jar"]

Build it :

docker build . -t text-consumer

Run docker container :

docker run -it text-consumer

Step 3: Implement TextAggregator consumer microservice

We need to build another Spring Boot microservice to aggregate the word count frequencies.

Repeat the Setting Up Environment, Step 1, and creating of application.properties phases above to prepare the workspace of the aggregator microservice.

Create WordFrequencyConsumer:

package com.example.textaggregator;

import com.example.textconsumer.WordFrequency;
import com.example.textconsumer.WordFrequencyList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

@Component
public class WordFrequencyConsumer {

private static final String TOPIC = "AGGREGATE-DATA";
private final Logger logger = LoggerFactory.getLogger(getClass().getName());

private List<WordFrequency> inMemoryFrequencyList = List.of();
private final AtomicInteger counter = new AtomicInteger(0);

@KafkaListener(topics = TOPIC, groupId = "AGGREGATE_CONSUMERS")
public void consumeMessage(WordFrequencyList frequencyList) {
inMemoryFrequencyList = Stream
.concat(inMemoryFrequencyList.stream(), frequencyList.frequencies().stream())
.collect(Collectors.groupingBy(WordFrequency::word, Collectors.summingLong(WordFrequency::count))).entrySet()
.stream()
.map(e -> new WordFrequency(e.getKey(), e.getValue()))
.sorted(Comparator.comparing(WordFrequency::count).reversed())
.toList();

if (counter.incrementAndGet() % 5 == 0) {
inMemoryFrequencyList.forEach(wordFrequency -> logger.info(STR."In memory word frequency: \{wordFrequency}"));
}
}
}

The WordFrequencyConsumer class is a Spring-managed component annotated with @Component. It contains a method consumeMessage annotated with @KafkaListener, which listens to the Kafka topic specified in the TOPIC constant.

The consumeMessage method processes incoming WordFrequencyList objects, aggregates the word frequencies, and updates an in-memory frequency list. Additionally, it logs the in-memory word frequencies after every 5th message received.

You can build and package aggregator microservice by repeating the commands mentioned above.

After implementing and running all three microservices, large text content can be aggregated by uploading a text file with a significant volume to thehttp://localhost:8080/upload endpoint of TextProducer microservice that is created in the first part of these articles.

Source code can be found under GitHub repositories:

Keep up the good work with coding and building! Have a productive time ahead!

--

--

Mustafa GÜÇ

A disciplined software engineer in design, clean code and architecture | Java, Spring | Microservices | Kubernetes | Functional Paradigm