02. Async REST Services with Messaging — Part 2

Yasuni Chamodya
15 min readJul 18, 2022

Let’s make the foundation for your Kafka learning journey. I am going to explain the most important things you must know about Kafka with a practical demonstration. Before jumping to the next paragraph, I recommend you to read my previous article (part 1) which I explained when to use Kafka and why not RbbitMQ for some use cases.

Let’s learn everything you need to know for designing a solution with Kafka. If you came here after reading my previous article, you already know we can have topics and multiple partitions for the topic in Kafka. Consider the following image.

Use Case 1

There we have a topic named Employee-topic and three partitions. Also, there is only one consumer in a consumer group named Project-consumer-group. This single consumer takes messages from all 3 partitions of employee-topic.

Let’s scale up this application by adding one more consumer to the same consumer group. Now partition 2 is pointed to this new consumer 2. Partition 0 and partition 1 are still pointed to the same consumer 1. (Do not consider this happened in some order. This is completely a random selection depending on how they coordinate)

Use Case 2

Let’s add one more consumer to further scale this application. Now there are three partitions and 3 consumers, each read from a single partition.

Use Case 3

What if we want to scale up more? Let’s add another consumer right? If you read my previous article, I am pretty sure you know the answer. Your number of active consumers in a consumer group should be less than or equal to the number of partitions. It won’t further scale up based on how many extra consumers you add. All of them will be kept in idele status and Kafka won't send messages to them until some consumer crash and then the number of consumers become less than the number of partitions. Consider the following diagram.

Use Case 4

As shown in the above figure, now you have 4 consumers, but still 3 partitions. Therefore consumer 4 will be in an idle state. Suppose a consumer crashed after some time. Then Kafka can immediately transfer the relevant tasks to this already running consumer 4.

Assume you open another consumer group named finance-consumer-group which has 2 consumers for this employee topic as shown in the following diagram. consumer 1 in this finance group listens only to partition 1 and consumer 2 listen to 2 partitions.

Use Case 5

As you already read in my previous article, Kafka supports retention by maintaining consumer offset. Based on this consumer offset, Kafka knows this particular consumer read from here and will go up to here.

Let’s see these use cases practically. Since I am running Kafka on Docker, I include the following docker-compose.yml file for your reference. Please ignore it if you already have Kafka installed.

version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Let’s go inside the Kafka container. I use the following command to tell docker to execute my kafka container in interactive mode.

docker exec -it kafka bash

Then go inside the bin folder of opt where Kafka instals by default.

cd /opt/kafka/bin/

Then I create a Kafka topic named employee-topic with three partitions

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic employee-topic

Let’s create a directory called kafka-demo and init a project inside the directory. I open this project with VS Code.

// create directory
mkdir kafka-demo
// go inside the directory
cd kafka-demo
// init project
npm init -y
// open the project in VS Code
code .

Since I use Node.js initially I installed a client to talk with Kafka using the following command. I installed the kafkajs library but you can use any other as your preference.

npm i kafkajs

Now you should have a similar interface as shown in the following image.

let’s create producer.js file. Use the following code.

import { Kafka } from "kafkajs";// Creating new Kafka connection to employee-service 
const kafka = new Kafka({
clientId: 'employee-service',
brokers: ['localhost:9092'],
});
// Get producer
const producer = kafka.producer();
// Publish
const publish = async() => {
// Get a producer to connect
await producer
.connect()
.catch((e) => console.error('Error on connecting to Kafka',e)); //Log errors
for (let i = 0; i < 10; i++){
await producer.send({
topic: 'employee-topic',
messages:[
{key: i.toString(), value:JSON.stringify({empName:'Yasuni' + i})},
],
});
}
};
publish().catch((e) => console.error(e));

You can see I have highlighted key:i.toString() in the above code I will explain this later. Let’s create consumer.js file as well. Use the following code.

import { Kafka } from "kafkajs";// Creating new Kafka connection to employee-service 
const kafka = new Kafka({
clientId: 'employee-service',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({groupId: 'project'});
// Subscribe employee-topic
await consumer.subscribe({topic: 'employee-topic', fromBeginning: true});
await consumer.run({
eachMessage: async({topic, partition, message}) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});

When I create kafka consumer, I set consumer groupId as project. It is a must to supply the groupId because every consumer must belong to a particular consumer group. In this consumer class, I subscribed to the same topic employee-topic which is that the producer publishes messages. I set fromBeginning as true.

Let’s run the consumer.

node consumer.js

If you get an error like ‘cannot use import statement outside a module’ go to the package.json and set type as module as shown below.

package.json

When you run consumer.js, you will be able to see an output similar to the following. “employee-topic”:[0,1,2] means all three partitions we created (when we create the topic) are listened by this consumer.

consumer 1 — listen to partition 0, partition 1 and partition 2

This is the first use case I explained above (3 partitions are listened by one consumer).

Use Case 1

The consumer we created listens to all three partitions. Let’s open a second consumer and see what happen (create a new terminal and run node consumer.js).

As you can see in the following images, consumer 1 listening only to partition 1. But consumer 2 listen partition 0 and partition 1. That means whenever you add a new consumer to the consumer group, Kafka automatically do rebalancing based on the number of partitions.

consumer 1 — listen to partition 1
consumer 2 — listen to partition 0 and partition 1

This is the use case 2 which I discussed above (2 consumers & 3 partitions).

Use Case 2

Let’s create one more consumer. This is the third use case which I mentioned above (3 consumers & 3 partitions).

Use Case 3

Now you should be able to see that each consumer listen to one partition as shown below.

consumer 1 — listen to partition 1
consumer 2 — listen to partition 0
Consumer 3 — listen to partition 2

Let’s send some messages.

node producer.js
producer

Let’s see how many messages each consumer gets out of 10 (the total no of messages becomes 10 because of the for loop runs only 10 times as per the code for (let i = 0; i < 10; i++) in producer.js.

consumer 1–2 messages

As you can see in the above image, consumer 1 gets only 2 messages out of 10.

consumer 2–4 messages

As you can see in the above image, consumer 2 gets only 4 messages out of 10.

consumer 3–4 messages

As you can see in the above image, consumer 3 gets the remaining 4 messages. That means these messages (load) are balanced between these 3 consumers.

Let’s add one more consumer. This is use case 4. You already know that this extra consumer will be idle.

Use Case 4

You can see this consumer does not have anything in memberAssignment means it does not listen to any partition now/idle. Let’s run the producer again to send the messages for the second time (node producer.js).

Let’s see what happened to the consumers.

Consumer 1
consumer 2
consumer 3
consumer 4

As you can see in the above images, consumer 1, 2 and 3 gets the messages from partition 1, 0 and 2 accordingly. But consumer 4 gets nothing and it is still idle.

Let’s kill consumer 1 now (ctrl+c).

consumer 1 — killed

After a little amount of time based on the zookeeper heartbeat configuration, it will be rebalanced. Because one of the active consumers is gone. Ideally, the fourth consumer (previously idle) will get some member assignments now. Look at the following images.

consumer 2 — listen to partition 0
consumer 3— listen to partition 1
consumer 4— listen to partition 2

As you can see above, consumer 4 which was idle previously listens to partition 2 now. Let’s publish the same messages again (node producer.js).

consumer 4

You can see consumer 4 who did not receive any messages before, receiving messages now because consumer 1 died and consumer 4 was hot and therefore assigned partition 2 to this consumer when rebalancing.

Let’s kill one more consumer 2 (ctrl+c).

Then there are only 2 active consumers and they will be rebalanced again for the 3 partitions. That means one consumer should listen to 2 partitions. It will be rebalanced as shown below. Consumer 3 listens to partition 0 and 2 while consumer 4 listens to partition 1.

consumer 3— listen to partition 0 & 2
consumer 4— listen to partition 1

Let’s produce the same messages again (node producer.js).

consumer3
consumer4

Now you can see consumer 4 gets the 2 messages and consumer 3 gets the remaining 8 messages because it is listening to two different partitions.

You can see in the producer, key is dynamic because it gets the iteration number of the for loop as the key (I include that code part back for you to remember below ).

for (let i = 0; i < 10; i++){
await producer.send({
topic: 'employee-topic',
messages:[
{key:i.toString(), value:JSON.stringify({empName:'Yasuni' + i})},
],
});
}

Let's hardcode the key to something like ‘emp001'.

import { Kafka } from "kafkajs";// Creating new Kafka connection to employee-service 
const kafka = new Kafka({
clientId: 'employee-service',
brokers: ['localhost:9092'],
});
// Get producer
const producer = kafka.producer();
// Publish
const publish = async() => {
// Get a producer to connect
await producer
.connect()
.catch((e) => console.error('Error on connecting to Kafka',e)); //Log errors
for (let i = 0; i < 10; i++){
await producer.send({
topic: 'employee-topic',
messages:[
{key:"emp001", value:JSON.stringify({empName:'Yasuni' + i})},
],
});
}
};
publish().catch((e) => console.error(e));

Let’s produce messages again by running the producer again (node producer.js). If you read my previous article, I am pretty sure you know what will happen.

consumer4

You will see one consumer get nothing while the other consumer gets all the 10 messages in your case. Here also you can see my consumer 4 gets all the messages while consumer 3 does not get any message even though it is running.

This happens because Kafka guarantees the partition based on the key of the message. If you specify a key in your messages, Kafka uses its own mode/modular function (not a numeric mode function) and based on its results Kafka decides which partition this particular message should go. Therefore if you need to specify the key if you need order guaranteed in your Kafka implementation. For example, if you need to process all the customers’ orders in the order they generated, you can use the customer ID as the key. Since we used the same value ‘emp00 as the key, t send that particular employee’s messages to the same partition. As a result single consumer will take it because a single partition sends messages to only 1 consumer.

But there is one important thing to keep in your mind. Kafka does not guarantee the order between the partitions. It always guarantees order within a partition.

Let’s change the consumer group to ‘project2’ as shown below.

import { Kafka } from "kafkajs";// Creating new Kafka connection to employee-service 
const kafka = new Kafka({
clientId: 'employee-service',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({groupId: 'project2'});
// Subscribe employee-topic
await consumer.subscribe({topic: 'employee-topic', fromBeginning: true});
await consumer.run({
eachMessage: async({topic, partition, message}) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});

Let’s open another consumer (consumer 5) in this project2 consumer group (node consumer.js).

As you can see in the above image, consumer 5 in the project2 consumer group gets all the messages from the beginning. Because Kafka knows this is a new consumer group and this consumer 5 listens to all 3 partitions. Therefore whatever the message the producer sent to these 3 partitions before, all of them are listened to by this single consumer in project2 group.

Now change groupId to project3 and fromBeginning to false.

import { Kafka } from "kafkajs";// Creating new Kafka connection to employee-service 
const kafka = new Kafka({
clientId: 'employee-service',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({groupId: 'project3'});
// Subscribe employee-topic
await consumer.subscribe({topic: 'employee-topic', fromBeginning: false});
await consumer.run({
eachMessage: async({topic, partition, message}) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});

Let’s open another consumer (consumer 6) in this consumer group ‘project3’.

You can see consumer 6 listening to all three partitions, but did not get any message. Because we set the value of fromBegining to false.

Let’s kill (ctrl+c) all the consumers. Now change back your key to dynamic one as it was previously in your producer as shown below.

import { Kafka } from "kafkajs";// Creating new Kafka connection to employee-service 
const kafka = new Kafka({
clientId: 'employee-service',
brokers: ['localhost:9092'],
});
// Get producer
const producer = kafka.producer();
// Publish
const publish = async() => {
// Get a producer to connect
await producer
.connect()
.catch((e) => console.error('Error on connecting to Kafka',e)); //Log errors
for (let i = 0; i < 10; i++){
await producer.send({
topic: 'employee-topic',
messages:[
{key:i.toString(), value:JSON.stringify({empName:'Yasuni' + i})},
],
});
}
};
publish().catch((e) => console.error(e));

Then set the groupID to project and fromBegigning to false in your consumer.

import { Kafka } from "kafkajs";// Creating new Kafka connection to employee-service 
const kafka = new Kafka({
clientId: 'employee-service',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({groupId: 'project'});
// Subscribe employee-topic
await consumer.subscribe({topic: 'employee-topic', fromBeginning: false});
await consumer.run({
eachMessage: async({topic, partition, message}) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});

Let’s open two consumers (1 & 2) in this project consumer group.

consumer1 in ‘project’ consumer group — listening to partition 1
consumer2 in ‘project’ consumer group — listening to partitions 0 & 2

Let’s change the consumer group to project2.

const consumer = kafka.consumer({groupId: 'project2'});

Let’s open a new consumer (consumer3) in this project2 consumer group.

consumer3 in ‘project2’ consumer group — listening to partitions 0, 1 & 2

Let’s produce the same messages again (node producer.js).

consumer 1 gets 2 messages
consumer 2 gets the remaining 8 messages
consumer 3 gets all 10 messages

You can see in the above images, consumer 1 and 2 are in the same consumer group and therefore load is balanced between those 2 consumers. Consumer1 gets 2 messages and consumer 2 gets the remaining 8 messages. But consumer 3 is in a different consumer group (project2) and listens to all three partitions. Therefore it gets all 10 messages.

Let’s kill all three consumers now.

Let’s remove the key from the producer now. That means the producer does not provide the key to Kafka.

import { Kafka } from "kafkajs";// Creating new Kafka connection to employee-service 
const kafka = new Kafka({
clientId: 'employee-service',
brokers: ['localhost:9092'],
});
// Get producer
const producer = kafka.producer();
// Publish
const publish = async() => {
// Get a producer to connect
await producer
.connect()
.catch((e) => console.error('Error on connecting to Kafka',e)); //Log errors
for (let i = 0; i < 10; i++){
await producer.send({
topic: 'employee-topic',
messages:[
{value:JSON.stringify({empName:'Yasuni' + i})},
],
});
}
};
publish().catch((e) => console.error(e));

Let’s create three different consumers in the same consumer group called ‘project’ now. Therefore each consumer will be listening to each partition. Let’s send the messages again (node producer.js) and see what happens.

consumer 1
consumer 2
consumer 3

As you can see in the above images, consumer 1 and 2 get 3 messages per each and consumer 3 gets the remaining 4 messages. Remember we did not provide a key there.

Let’s set the loop to iterate 12 times.

import { Kafka } from "kafkajs";// Creating new Kafka connection to employee-service 
const kafka = new Kafka({
clientId: 'employee-service',
brokers: ['localhost:9092'],
});
// Get producer
const producer = kafka.producer();
// Publish
const publish = async() => {
// Get a producer to connect
await producer
.connect()
.catch((e) => console.error('Error on connecting to Kafka',e)); //Log errors
for (let i = 0; i < 12; i++){
await producer.send({
topic: 'employee-topic',
messages:[
{value:JSON.stringify({empName:'Yasuni' + i})},
],
});
}
};
publish().catch((e) => console.error(e));

Let’s produce the messages again and see what happen.

consumer 1
consumer 2
consumer 3

You can see each consumer gets 4 messages. Remember we did not provide a key there. Let’s understand what happened there.

  • If you provide a dynamic key with the producer, Kafka uses its mode/modular algorithm and determines which partition to send based on the key.
  • If you provide the same key with every message, those will always send to the same partition. That means they will be consumed by the same consumer.
  • If you do not supply the key, Kafka will send messages in a round-robin way. But the problem is if the messages are related to the same customer, one message goes to one partition, 2nd message goes to 2nd partition likewise. If so, the order is not guaranteed even within the same customer. If you need order guaranteed, you need to use the key.

This is the end of the explanation about everything you need to know regarding consumers, consumer groups and partitions. Let’s meet with another article.

Wish you happy learning!

Source: Everything you need to know about Kafka (With Demo) to work as a PRO

--

--