Implementing a kafka consumer and kafka producer with Spring Boot

Marcos
Geek Culture
Published in
4 min readMay 12, 2021

This text has more information here, like how to use Python and a GUI.

It’s my first text in english so I apologize right here for eventual mistakes.

A few days ago I had to develop some microservices that consumed / produced in kafka topics. I decided to leave some notes here to consult whenever I need to. I left this text in two parts:

  • Create a consumer;
  • Create a producer;

Some essential information:
I made this code in Java, using Springboot, Lombok, Hibernate and managing the dependencies with Maven.

About Apache Kafka

What is this?

It is a distributed messaging and streaming platform.

It didn’t clarify everything, right? another question.

What is it for?

It serves to move and transform a large amount of data between different systems.

It means that this movement and transformation can be read and written in databases, materialization of data on streaming, sending information between systems (producer and consumer) etc. Right here there is a list of the use cases with Kafka.

How a simple solution using kafka works:

The Producer produces a message that is attached to a topic and the Consumer receives that message and does whatever it has to do.

Concepts:

Producer: responsible for producing messages for a specific topic.

Consumer: responsible for reading the messages that the producer puts on a topic.

Topic: topic is a categorized group of messages.

Message: I think that all events in Kafka can be summarized in messages. A message can contain a simple text like “Hello World” or an object in json format for example.

Implementing a Kafka Producer:

Repository link: https://github.com/mmarcosab/kafka-producer

Dependencies on pom.xml:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

After this we must configure the application.properties:

# Producer properties
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id=group_id
topic.name.producer=topico.comando.teste

# Common Kafka Properties
auto.create.topics.enable=true

At first I put the StringSerializer class to serialize the messages, just to send some text.

The auto.create.topics.enable = true property automatically creates topics registered to those properties. In that case I put topic.name.producer = topico.comando.teste.

Let’s go to the code then.

In the main class I put the annotation “@ EnableKafka”, which makes it possible to connect to a topic.

@EnableKafka
@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

}

And my producer class

@Slf4j
@Service
@RequiredArgsConstructor
public class TopicProducer {

@Value("${topic.name.producer}")
private String topicName;

private final KafkaTemplate<String, String> kafkaTemplate;

public void send(String message){
log.info("Payload enviado: {}", message);
kafkaTemplate.send(topicName, message);
}

}

The KafkaTemplate class is the class that sends messages to topics, the first String is the topic and the second the type of information.

Finally I created a controller to do tests:

@RequiredArgsConstructor
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final TopicProducer topicProducer;
@GetMapping (value = "/send")
public void send(){
topicProducer.send("Mensagem de teste enviada ao tópico");
}
}

In a very simple way this is it.

Implementing a Consumer Kafka:

Repository link: https://github.com/mmarcosab/kafka-consumer

Dependencies on pom.xml:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

After this we must configure the application.properties:

# Kafka Consumer properties
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
topic.name.consumer=topico.teste

Here I have the same observations as the producer: StringDeserialize for working with texts and auto.create.topics.enable = true.

In the main class I put the annotation “@ EnableKafka”, which makes it possible to connect to a topic.

@EnableKafka
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}

The consumer class looks like this:

@Slf4j
@RequiredArgsConstructor
@Service
public class TopicListener {

@Value("${topic.name.consumer")
private String topicName;

@KafkaListener(topics = "${topic.name.consumer}", groupId = "group_id")
public void consume(ConsumerRecord<String, String> payload){log.info("Tópico: {}", topicName);
log.info("key: {}", payload.key());
log.info("Headers: {}", payload.headers());
log.info("Partion: {}", payload.partition());
log.info("Order: {}", payload.value());

}

}

It’s very simple, the annotation “@ KafkaListener” allows connection to a topic for receiving messages.

Running the projects, just call the test controller on producer project and you can see the result in the logs:

Producer:

Consumer:

It worked.

--

--

Marcos
Geek Culture

I study software development and I love memes.