Apache Kafka — Understanding how to produce and consume messages?

Syed Sirajul Islam Anik
10 min readJun 27, 2020

--

Image from: https://stayrelevant.globant.com/wp-content/uploads/2014/01/kafka-logo-wide.png

I lack motivation. Every tool I try to explore at least requires three times to concentrate. After a while, I lose my motivation and stop learning that tool. It even can take years. It’s my third time to understand the basics of Kafka. The first time I tried learning Kafka was possibly Sept’19.

N.B: I didn’t implement Kafka on production. Whatever I have learned is from tutorials and hands-on. So, the deployment of Kafka and production-grade code quality may differ. But at the very basic, it is the same. And, this application doesn’t use any programming language to interact with Kafka. It uses the commands which are shipped with Kafka. Let’s start.

Terminologies

A few terms are used in Kafka you need to know.

  • Message: A stream of bytes. For simplicity, assume it as a string.
  • Producer: The application or client that produces messages which will be consumed by other clients.
  • Consumer: A client who receives the messages produced by the producers.
  • Broker: Receives the messages, stores it, and decides the consumers who should get these messages.
  • Topic: A unique name through which the data is streamed.
  • Partition: A Partition is a virtual division which allows parallelizing a topic by splitting the data in a particular topic across multiple brokers. A topic can have multiple partitions. Minimum of 1.
  • Offset: A unique number for a message in a topic’s partition.
  • Consumer Group: A group of consumers.
  • Leader: The responsible unit for the topic’s partitions to store messages.
  • Replica: The mirror/sync of your data to handle the faults. So that the data doesn’t get lost.
  • In-Sync Replica: The in-sync replicas are replicas that are alive and are in sync with the leader.

These are the main terms for Kafka. We’ll start our Kafka application in a docker container. You can use the following docker-compose.yml file to start Kafka in your host machine.

So, if you copy the above docker-compose.yml file and run docker-compose up -d --build you’ll see two containers booted up. One is for Apache Kafka, and the other one is for Zookeeper which is required for the Kafka to keep track of its brokers.

Running Kafka & Zookeeper containers

Now, we can exec the Kafka container with docker-compose exec kafka bash.

Creating a topic

To interact with Kafka Topic we’ll have to use the kafka-topic.sh script. This is already in the environment's path variable.

kafka-topics.sh --bootstrap-server localhost:9092 \
--create \
--topic test-topic \
--replication-factor 1 \
--partitions 3
  • bootstrap-server — Required. A list of host:port,host:port servers. In our case, the broker runs within the container itself with a port 9092.
  • replication-factor — The number of replicas you want to have for your topic. The minimum value is 1.
  • partitions — The number of partitions you want on your topic.
  • test-topic — It’s the name of the topic. Like the variables, we can name it whatever we want. In this case, I used test-topic. But you’re free to choose.

Now, after running the above command, it shows that the topic was created.

Topic is created

Now, we can describe the topic using the following command.

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topic

Which will describe the topic in the terminal. In our case, we created the topic with 3 partitions, 1 replica. The ISR means the in-sync replica, which means the replicas that are alive and are synced with the leader. In the case of replicas, if any replica is down, the ISR will remove that broker id.

Describing the topic

As we didn’t use the cluster and didn’t configure the server, the default broker.id is 1001. That’s why the Leader, Replicas, ISR all are in the same broker. But if we manage a cluster and on successful setup, Kafka will automatically put those Leader, Replicas in different brokers.

If we want to check the available list of topics, we can use

kafka-topics.sh --bootstrap-server localhost:9092 --list
List of topics

Producing Message

As we created a topic, it’s time to put some message in that topic. To put messages on our topic, we will use the kafka-console-producer script.

kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic test-topic
  • bootstrap-server — The list of brokers in a CSV format. localhost:9092 in our case as we have only one instance of Kafka running.
  • test-topic — The name of the topic. In this case, we want to send messages to our previously created test-topic.

After we run the above command, the terminal will prompt in user input mode. Where you can send single-line messages and as long as you want. In the above picture, I sent three single-line messages and four new lines. Each new line was also counted as a message. Just note that we can not send message to different partitions other than 0 (zero) using the terminal. To send message to different partitions, we’ll have to use any programming language.

Consuming Message

So far we created a topic and published messages on that topic. Now, we want to consume those messages. To consume a message, we’ll use kafka-console-consumer script.

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test-topic \
--from-beginning
  • bootstrap-server — The list of CSV values of brokers in host:port format
  • test-topic — The topic name we want to consume messages. In this case, we want to consume messages from test-topic which we created earlier.
  • --from-beginnning — Position from where we want to read the messages. We want to read it from the beginning.

We can see that we got the messages we sent using kafka-console-producer script. The first three lines of messages and followed by three new lines.

Yay!! We’ve successfully published and consumed messages from Kafka.

Consumer Group

We use kafka-console-consumer to consume messages. Now, if we use the same command in another terminal, you’ll get the same amount of messages. But when we add more producers the number of messages in the queue goes high and adding multiple consumers cannot help us. Because all the consumers will get the same amount of message which doesn’t make sense. In this case, we can use the consumer group feature. A consumer group is a group identified by a unique name. Under the consumer group, each consumer will get unique messages. But there is a catch. The number of consumers in a consumer group can be at max the number of partitions in a topic. So, in our test-topic scenario, if we want to add a consumer group, then in that group we can at max have 3 consumers. Adding more consumers to that group will not help. Because the concept behind the consumer group is that each consumer gets assigned to a particular partition. So, assume having a consumer group named consumer-group-test-topic and having 3 consumers in that group, the consumers will only get assigned to a specific partition. So, each of them will get a unique message from that assigned partition and they will not get any message from other partitions.

To use consumer group, we’ll also use kafka-console-consumer.sh script.

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--group consumer-group-test-topic \
--topic test-topic
  • group — The name of the group. All the consumers with the same name will fall under the same group. If the name of the group is changed, then it’ll be considered as a new group.

I used two terminals to listen to the same topic with the same group name. And as we cannot send messages to a different partition other than zero, I used a PHP script to send messages to different partitions.

As we started two consumers under the same group and we have more partitions that the number of consumers in the group, that’s why the below terminal is getting messages from partition 0 and 1. This mechanism is handled by the Kafka. If we had three consumers under that group, each of them will get a message from a specific partition. None of them will receive messages from other partitions.

Now you can see that we have three consumers under the same group. So, none of them is receiving any message from any other group.

But, there is no point in adding more consumers than the number of partitions in that topic. The extra consumers will not receive any message because they’ll not be assigned to any partition.

You can see in the above image that one of the consumers in the consumer group is not getting any message. Because other consumers were assigned the partitions.

Whenever a new consumer joins or leaves the consumer group, A group coordinator rebalances the existing consumers either by assigning a partition or revoking a partition. This is done automatically.

Pub/Sub vs MQ?

With the Apache Kafka, you can achieve both the architecture. In your consumer group, you can receive messages from a topic. If you add multiple consumer groups on a topic, whenever someone publishes messages on a topic, all the subscribers to that topic will instantly start receiving messages. That’s what the pub/sub is for. Another thing, within a consumer group, none of the consumers will get another partition’s message. That’s what the queue does. Consumer seeks for a specific type of message and it gets that type of message.

Pull vs Push

Traditional MQs push messages to consumers. What if the consumer cannot process all those chunks at the same time? It’ll get overwhelmed. Rather Kafka consumers pull messages from the topic. In a traditional way, the broker needs to keep track of the messages which are sent to which consumers and should listen for the acks to delete that message. But in Kafka, as the consumers know from which position (offset) it requires to read, thus the consumer is liable for keeping track of the position. Read more about this in Kafka Documentation.

A few other commands are given below so that you can get your hand dirty and get a hang of it.

  • Alter properties of your topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic test-topic --partitions 5
  • Delete an existing topic
# specific topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--delete --topic test-topic
# wildcard topics
kafka-topics.sh --bootstrap-server localhost:9092 \
--delete --topic '.*topic.*'
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic test-topic \
--partitions 2 --replication-factor 1 \
--config delete.retention.ms=60000 \
--config retention.ms=30000
  • To update/delete the config of entities
# Add/Update config 
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name test-topic \
--alter --add-config max.message.bytes=1024
# Delete config
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name test-topic \
--alter --delete-config max.message.bytes
  • To describe the config of an entity
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name test-topic --describe
  • Consumer groups
# Describe a consumer group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group consumer-group-test-topic --describe
# List of consumer group members
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group consumer-group-test-topic --describe --members --verbose
# Delete a consumer group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group consumer-group-test-topic --delete
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group consumer-group-test-topic --describe --state
# List available consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Before I finish the article, you can send a key when producing the message. To do this with the kafka-console-producer, you will have to do it like below.

kafka-console-producer.sh --bootstrap-server localhost:9092 \
--property parse.key=true --property key.separator=, \
--topic test-topic

So, whenever we’re prompt to type our message, we can then specify the key and then a comma (as mentioned in the key.separator to ,) and then a message.

When we consume the message with kafka-console-consumer, we’ll have to use the following command to get the key if we want.

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--property print.key=true --property key.separator="~~" \
--topic test-topic

We want to separate the key and message using the ~~ sign. That’s what I wanted to use. And, you can use any separator you want.

Publishing key with the message and displaying key when consuming

With packages in your favorite programming language, you can also send headers. But it’s not possible with the provided commands.

If you’re using PHP, then you can use the following Dockerfile to install ext-rdkafka.

The rdkafka repository and usages are below

And, I was suggested to use the following package with composer rather that the rdkafka itself if it solves my purpose.

That’s all for Kafka. If you’re a PHP developer then I’d probably publish another article on how to use Kafka with PHP.

Happy coding.

--

--

Syed Sirajul Islam Anik

software engineer with "Senior" tag | procrastinator | programmer | !polyglot | What else 🙄 — Open to Remote