Kafka Consumer and Producer with Spring Boot and Python

Marcos
Geek Culture
Published in
6 min readJan 15, 2023

This text is a tour of Kafka, visiting concepts, and implementing applications locally with Spring Boot and Python.

Hello, my name is Marcos and I’m a developer. Today we will talk about Kafka, how to use Java and Python to interact with Kafka, and a GUI to facilitate the creation of topics.

What we will do here?

  • Run Kafka using Docker
  • Run a consumer with Spring Boot
  • Run a producer with Spring Boot
  • Consume messages with Python
  • Produce messages with Python

What is Apache Kafka?

Apache Kafka is a distributed data streaming platform that has functions to publish, subscribe, store and process log streams in real-time. This platform was developed to process data streams from different sources and deliver them to various clients. This means that Apache Kafka moves huge volumes of data not just from one point to another, but from one point to many points.

How a simple solution using Kafka works

The Producer produces a message that is attached to a topic and the Consumer receives that message and does whatever it has to do.

Some concepts

Producer

Responsible for producing messages for a specific topic.

Consumer

Responsible for reading the messages that the producer puts on a topic.

Topic

Topic is a categorized group of messages.

Message

I think that all events in Kafka can be summarized in messages. A message can contain a simple text like “Hello World” or an object in JSON format for example.

Configuration

Apache Kafka uses Apache Zookeeper to maintain its configurations. But, what is Apache Zookeeper?

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them, which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed.

Reference: https://zookeeper.apache.org/

Confluent Kafka

Confluent is an open-source Kafka distribution founded by the original creators of Kafka on Linkedin that offers a set of tools. Below I will list some tools.

Kafka Connect

It is the framework that integrates Kafka with other systems. To copy data between Kafka and other systems, users instantiate Kafka push or pull connectors. Source connectors import data from other systems (for example, from a database), and sink connectors export Kafka data (for example, content from a Kafka topic to an HDFS file).

REST proxy

Provide a RESTful interface with your Kafka cluster. This makes it much easier to produce and consume messages from a cluster, as well as view cluster status and perform administrative tasks.

Confluent CLI

The CLI simplifies development by allowing you to start a Kafka cluster with a command confluent start in the terminal or stop and start services, verify cluster status, etc.

Schema Registry

Stores the history of all schemes and allows their evolution according to compatibility criteria. It also allows you to manage your storage and retrieval of messages sent in Avro format.

Running Kafka with Docker

After these concepts, let’s run Apache Kafka in a simple way using docker-compose:

version: "3.0"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Use this command:

docker-compose up

GUI for Kafka

Now, I will use kafkatool to visualize and interact with my cluster. You can download it here.

Pay attention to the ports of our docker-compose file. This example shows how to configure:

In order to solve this error, fill in the advanced configurations:

And now you are ready to use kafkatool with your cluster.

Adding a topic

It’s simple:

Consumer wrote using Java

Dependencies on pom.xml:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

After this we must configure the application.properties:

################################################
# Kafka Consumer properties #
################################################
spring.kafka.consumer.bootstrap-servers=127.0.0.1:29092
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
topic.name.consumer=topic-test-1

Here I have one observation: StringDeserialize for working with texts and auto.create.topics.enable = true.

In the main class, I put the annotation “@ EnableKafka”, which makes it possible to connect to a topic.

@EnableKafka
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}

The consumer class looks like this:

@Slf4j
@RequiredArgsConstructor
@Service
public class TopicListener {

@Value("${topic.name.consumer}")
private String topicName;

@KafkaListener(topics = "${topic.name.consumer}", groupId = "group_id")
public void consume(ConsumerRecord<String, String> payload) {
log.info("Topic: {}", topicName);
log.info("key: {}", payload.key());
log.info("Headers: {}", payload.headers());
log.info("Partion: {}", payload.partition());
log.info("Order: {}", payload.value());
}

}

It’s very simple, the annotation @KafkaListener allows connection to a topic for receiving messages.

Run this project.

Sending messages with Python

The code is really short, at first, you must install Kafka:

pip install kafka-python

And then write this script:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:29092')

for i in range(10):
print('sending message ', i)
producer.send('topic-test', b'mensagem- %d' % i)
print('done')

Now, see the logs from our consumer:

All of ten messages were received.

Producer wrote using Java

Dependencies on pom.xml:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

After this we must configure the application.properties:


################################################
# Kafka Producer properties #
################################################
spring.kafka.producer.bootstrap-servers=127.0.0.1:29092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id=group_id
topic.name.producer=topic-test-1
################################################
# Common Kafka Properties #
################################################
auto.create.topics.enable=true

I put the StringSerializer class to serialize the messages, just to send some text.

The auto.create.topics.enable=true property automatically creates topics registered to those properties.

Let’s go to the code then.

In the main class, I put the annotation “@EnableKafka”, which makes it possible to connect to a topic.

@EnableKafka
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}

And my producer class

@Slf4j
@Service
@RequiredArgsConstructor
public class TopicProducer {

@Value("${topic.name.producer}")
private String topicName;

private final KafkaTemplate<String, String> kafkaTemplate;

public void send(String message){
log.info("Payload: {}", message);
kafkaTemplate.send(topicName, message);
}
}

The KafkaTemplate class is the class that sends messages to topics, the first String is the topic, and the second is the type of information.

Finally, I created a controller to do tests:

@RequiredArgsConstructor
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

private final TopicProducer topicProducer;

@GetMapping (value = "/send")
public void send(){
topicProducer.send("test message");
}
}

Consumer with Python

Now, let’s create a python script to consume messages from a topic:

from kafka import KafkaConsumer
import sys

consumer = KafkaConsumer("topic-test-1", group_id='group_id', bootstrap_servers='localhost:29092')for msg in consumer:
print("Topic Name=%s, Message=%s" %(msg.topic, msg.value))

sys.exit()

Look at the result:

As long as the spring boot producer sends messages they will arrive here.

The messages also will be consumed by the spring boot:

In a very simple way that is it.

--

--

Marcos
Geek Culture

I study software development and I love memes.