Kafka Consumer and Producer with Spring Boot and Python
This text is a tour of Kafka, visiting concepts, and implementing applications locally with Spring Boot and Python.
Hello, my name is Marcos and I’m a developer. Today we will talk about Kafka, how to use Java and Python to interact with Kafka, and a GUI to facilitate the creation of topics.
What we will do here?
- Run Kafka using Docker
- Run a consumer with Spring Boot
- Run a producer with Spring Boot
- Consume messages with Python
- Produce messages with Python
What is Apache Kafka?
Apache Kafka is a distributed data streaming platform that has functions to publish, subscribe, store and process log streams in real-time. This platform was developed to process data streams from different sources and deliver them to various clients. This means that Apache Kafka moves huge volumes of data not just from one point to another, but from one point to many points.
How a simple solution using Kafka works
The Producer produces a message that is attached to a topic and the Consumer receives that message and does whatever it has to do.
Some concepts
Producer
Responsible for producing messages for a specific topic.
Consumer
Responsible for reading the messages that the producer puts on a topic.
Topic
Topic is a categorized group of messages.
Message
I think that all events in Kafka can be summarized in messages. A message can contain a simple text like “Hello World” or an object in JSON format for example.
Configuration
Apache Kafka uses Apache Zookeeper to maintain its configurations. But, what is Apache Zookeeper?
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them, which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed.
Reference: https://zookeeper.apache.org/
Confluent Kafka
Confluent is an open-source Kafka distribution founded by the original creators of Kafka on Linkedin that offers a set of tools. Below I will list some tools.
Kafka Connect
It is the framework that integrates Kafka with other systems. To copy data between Kafka and other systems, users instantiate Kafka push or pull connectors. Source connectors import data from other systems (for example, from a database), and sink connectors export Kafka data (for example, content from a Kafka topic to an HDFS file).
REST proxy
Provide a RESTful interface with your Kafka cluster. This makes it much easier to produce and consume messages from a cluster, as well as view cluster status and perform administrative tasks.
Confluent CLI
The CLI simplifies development by allowing you to start a Kafka cluster with a command confluent start in the terminal or stop and start services, verify cluster status, etc.
Schema Registry
Stores the history of all schemes and allows their evolution according to compatibility criteria. It also allows you to manage your storage and retrieval of messages sent in Avro format.
Running Kafka with Docker
After these concepts, let’s run Apache Kafka in a simple way using docker-compose:
version: "3.0"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Use this command:
docker-compose up
GUI for Kafka
Now, I will use kafkatool to visualize and interact with my cluster. You can download it here.
Pay attention to the ports of our docker-compose file. This example shows how to configure:
In order to solve this error, fill in the advanced configurations:
And now you are ready to use kafkatool with your cluster.
Adding a topic
It’s simple:
Consumer wrote using Java
Dependencies on pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
After this we must configure the application.properties:
################################################
# Kafka Consumer properties #
################################################
spring.kafka.consumer.bootstrap-servers=127.0.0.1:29092
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
topic.name.consumer=topic-test-1
Here I have one observation: StringDeserialize for working with texts and auto.create.topics.enable = true.
In the main class, I put the annotation “@ EnableKafka”, which makes it possible to connect to a topic.
@EnableKafka
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
The consumer class looks like this:
@Slf4j
@RequiredArgsConstructor
@Service
public class TopicListener {
@Value("${topic.name.consumer}")
private String topicName;
@KafkaListener(topics = "${topic.name.consumer}", groupId = "group_id")
public void consume(ConsumerRecord<String, String> payload) {
log.info("Topic: {}", topicName);
log.info("key: {}", payload.key());
log.info("Headers: {}", payload.headers());
log.info("Partion: {}", payload.partition());
log.info("Order: {}", payload.value());
}
}
It’s very simple, the annotation @KafkaListener allows connection to a topic for receiving messages.
Run this project.
Sending messages with Python
The code is really short, at first, you must install Kafka:
pip install kafka-python
And then write this script:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:29092')
for i in range(10):
print('sending message ', i)
producer.send('topic-test', b'mensagem- %d' % i)
print('done')
Now, see the logs from our consumer:
All of ten messages were received.
Producer wrote using Java
Dependencies on pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
After this we must configure the application.properties:
################################################
# Kafka Producer properties #
################################################
spring.kafka.producer.bootstrap-servers=127.0.0.1:29092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id=group_id
topic.name.producer=topic-test-1
################################################
# Common Kafka Properties #
################################################
auto.create.topics.enable=true
I put the StringSerializer class to serialize the messages, just to send some text.
The auto.create.topics.enable=true property automatically creates topics registered to those properties.
Let’s go to the code then.
In the main class, I put the annotation “@EnableKafka”, which makes it possible to connect to a topic.
@EnableKafka
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
And my producer class
@Slf4j
@Service
@RequiredArgsConstructor
public class TopicProducer {
@Value("${topic.name.producer}")
private String topicName;
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String message){
log.info("Payload: {}", message);
kafkaTemplate.send(topicName, message);
}
}
The KafkaTemplate class is the class that sends messages to topics, the first String is the topic, and the second is the type of information.
Finally, I created a controller to do tests:
@RequiredArgsConstructor
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final TopicProducer topicProducer;
@GetMapping (value = "/send")
public void send(){
topicProducer.send("test message");
}
}
Consumer with Python
Now, let’s create a python script to consume messages from a topic:
from kafka import KafkaConsumer
import sys
consumer = KafkaConsumer("topic-test-1", group_id='group_id', bootstrap_servers='localhost:29092')for msg in consumer:
print("Topic Name=%s, Message=%s" %(msg.topic, msg.value))
sys.exit()
Look at the result:
As long as the spring boot producer sends messages they will arrive here.
The messages also will be consumed by the spring boot:
In a very simple way that is it.