Implementing pub/sub model with Apache Kafka and Node.js

Hussain Ali Akbar
Webtips
Published in
10 min readAug 17, 2020

Apache Kafka is a distributed streaming platform with a lot of use cases including acting as a message broker between your applications and processes. Today, We’ll look at how we can implement a publisher/subscriber model with Apache Kafka using Node.js!

Basic Concepts of Apache Kafka

Before we proceed with the code it is important to understand the basic concepts of Apache Kafka. Fortunately, there’s already tons of excellent resources available online that explain the basic concepts way better than I ever can!

  1. Apache Kafka’s own website
  2. This excellent piece on Cloudkarafka
  3. This read up on Consumer Groups since that plays a major role in our pub/sub model

Do go through these before proceeding further in case you’re not aware of the underlying concepts of Kafka!

The Use Case

Let's say we have an eCommerce website where every time a customer checks out an order, an event “orderCreated” will be generated. This event will be listened to by the following services:

  1. Orders Service: This service is responsible for adding the order details to the Database
  2. Payments Service: This service is responsible for deducting credits from the Customer’s wallet
  3. Notifications Service: This service is responsible for notifying the customer that their order has been placed successfully.

We can do these operations asynchronously in the background — The benefit of doing so is that we can quickly send a response back to the customer with the actual work being done in the background.

Now, there are 2 key challenges here:

  1. We want our messages to be processed at least once. The listeners need to acknowledge that they’ve processed the events. In case a consumer service fails to process an event successfully, the event needs to be retried. For example, we cannot afford to lose events of credits that need to be deducted from customers wallets. (Customers getting their orders for free!)
  2. We also want our messages to be processed at most once. In today’s era, We usually have a lot of instances of our application running behind a load balancer. This means that in our scenario, There will be multiple instances of the payment service (as well as others) running. Therefore, we don’t want the event to be processed by each instance. (Customers getting charged double or triple times the order amount!)

Let's see how we can achieve this with Kafka!

Setting up Kafka with Docker

I will once again recommend an excellent piece of article here that I used myself for setting up Kafka locally with Docker.

How to install Kafka using Docker

Here’s an updated version of the docker-compose.yml file:

version: '3.7'

networks:
kafka-net:
driver: bridge

services:
zookeeper-server:
image: 'bitnami/zookeeper:latest'
networks:
- kafka-net
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka-server1:
image: 'bitnami/kafka:latest'
networks:
- kafka-net
ports:
- '9092:9092'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper-server

Just run docker-compose up in the terminal and the kafka server should start running!

You can set up Conduktor as well in case you want to set up a desktop client.

Time to Code!

We’ll be using a package called kafkajs in order to interact with Kafka from Node.js.

  1. As a first step, We’ll need to create a topic orderCreated which will be used by publishers and subscribers.

The main point to note here is the number of partitions — Which we’ve currently set to 2. The reason behind this will become clear as we proceed.

Copy and run this code as is and if everything runs perfectly, the console should output done indicating that the topic has been created. If you have conduktor set up, you can verify this as well:

our topic “orderCreated” has been created with 2 partitions

2. The next step is to write a consumer that will subscribe to this topic and listen for events.

Before executing this code, let's understand a bit what’s happening here.

At the start of the process function, we’re creating 3 consumers to simulate our 3 services namely ordersConsumer, paymentsConsumer and notificationsConsumer. The groupId specifies the consumer group that these consumers will be a part of.

Adding the 3 consumers to 3 different consumer groups will ensure that all the 3 consumers will get the messages (which is what we want). If we add all the consumers to the same group then only one of these will get the message.

Next we subscribe the consumers to our topic “orderCreated” .

Finally, we start the consumers with.run which enables them to listen to any messages that are published on their subscribed topics. Each consumer does a simple job of logging the message that they receive.

The consumer file also expects a command-line argument which tells us the consumer number. Since these are our first consumers, let's execute this script asnode mediumConsumer.js 1 .

If everything works fine, we should see a lot of logs from Kafka saying that consumers have joined the group. This can be verified from Conduktor as well:

3 Consumer Groups are visible with 1 member each and both partitions assigned to them

One log that is important to note here is this one:

{"level":"INFO","timestamp":"2020-08-17T14:54:40.237Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"notifications","memberId":"my-app-2d62f453-edd1-4b3f-a2ac-b926abcd9e12","leaderId":"my-app-2d62f453-edd1-4b3f-a2ac-b926abcd9e12","isLeader":true,"memberAssignment":{"orderCreated":[0,1]},"groupProtocol":"RoundRobinAssigner","duration":20}

Under memberAssignment, We see that both of our partitions (0, 1) are assigned to this single consumer i.e. any message that lands on either of these partitions will be received by this consumer. This log is for our “notifications” consumer. We should see the same log for the remaining consumers as well.

3. The final step is to write the publisher that will publish our messages:

The publisher’s code is very straightforward. We initialize a producer, connect it, and then send 3 messages on our topic in a loop.

With the consumer open in another terminal, execute the producer by running node mediumProducer.js which should publish all 3 messages then exit after outputting “done” . If everything works fine, the output on the consumer should be similar to this:

Since we have not specified which partition we want to send the message on, Kafka uses a Round Robin technique to send messages that’s why 2 messages were sent on partition 0 and 1 was sent on Partition 1.

So far so good. Let's run 2 consumers!

Achieving “at most once” message processing with 2 consumers

As mentioned initially under the use case section, we want to run multiple consumers in order to manage load however, we also want that our messages are only processed once.

To do this, we let our previous consumer keep running in a terminal and in another terminal, we start another consumer with the command node mediumConsumer.js 2 . Here 2 is indicating that this is our second consumer.

As soon as we execute the command, We should start to see some logs in the Consumer#1’s terminal indicating that the group is rebalancing:

{"level":"ERROR","timestamp":"2020-08-17T15:01:04.683Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"orders","memberId":"my-app-e866f23c-53a4-4405-97b6-c49cd2b5b99b","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":319}

Once the rebalancing is complete, We should see this log:

{"level":"INFO","timestamp":"2020-08-17T15:01:14.697Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"notifications","memberId":"my-app-2d62f453-edd1-4b3f-a2ac-b926abcd9e12","leaderId":"my-app-2d62f453-edd1-4b3f-a2ac-b926abcd9e12","isLeader":true,"memberAssignment":{"orderCreated":[0]},"groupProtocol":"RoundRobinAssigner","duration":7}

This time around, If we look at the memberAssignment property, we see that only 1 partition is assigned to Consumer#1. This is the log for our “notifications” consumer. We should see the same log for “orders” and “payments” as well. If we head over to the terminal of Consumer#2, we should see that Partition 1 has been assigned to Consumer#2.

What happened here is that during rebalancing, Kafka tried to divide all the partitions among the consumers equally. Since we had 2 partitions and 2 consumers, Kafka divided them equally among the 2 consumers. This also brings us to another important point:

If we have more consumers and less partitions for e.g. 3 consumers and 2 partitions then the extra number of consumers will be idle and no partition will be assigned to them.

Now that both our Consumers are up and running, lets publish another message by executing the command node mediumProducer.js . After done is outputted on the console, head over to the consumers to examine the ouput:

Consumer #1
Consumer #2

The output on your terminals can be different from the above depending on the assignment of partitions by Kafka. The important thing to note here is that our load has been divided between 2 consumers and each consumer processes only the messages that are sent on the partitions assigned to them. And this is how we achieve “at most once” processing with 2 or more consumers!

Achieving “at least once” message processing with 2 consumers

Our 2nd objective as mentioned under the use case is that we want our messages to be processed at least once i.e. if any consumer fails to process a message due to any reason, the message should be retried until it is successful.

To simulate a failing consumer, let's modify our consumer’s code:

Most of the code is the same except for lines 42–48 where before logging our message, we throw an error. This way, the consumer won't be able to complete the message successfully and it will be retried again by Kafka.

To try this out, stop the Consumer#2 in the 2nd terminal, modify the consumer’s code as mentioned above and restart it with node mediumConsumer.js 2 . Again, we will see a lot of logs from Kafka indicating that it is rebalancing the topic and equally assigning the partitions among the 2 consumers.

Publish another message by executing the command node mediumProducer.js . After done is outputted on the console, check the Consumer#2’s output:

{"level":"ERROR","timestamp":"2020-08-17T16:29:21.569Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"orderCreated","partition":1,"offset":"3","stack":"Error: some error got in the way which didnt let the message be consumed successfully\n ....

We should see this message 6 times — One message for the initial attempt and the remaining for 5 retries. After this:

{"level":"ERROR","timestamp":"2020-08-17T16:29:31.309Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNumberOfRetriesExceeded: some error got in the way which didnt let the message be consumed successfully","groupId":"notifications","retryCount":5,"stack":"KafkaJSNonRetriableError\n  Caused by: Error: some error got in the way which didnt let the message be consumed successfully\n ...{"level":"INFO","timestamp":"2020-08-17T16:29:31.318Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"notifications"}

This indicates that our consumer has crashed and stopped. If during the 5 retries, if the consumer successfully processes the message, then it won't crash (which in our case is not possible)`. By default, any crashing consumer will try to rejoin the consumer group, and surely enough, we see the log of consumer rejoining the group:

{"level":"INFO","timestamp":"2020-08-17T16:29:41.602Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"notifications","memberId":"my-app-bde3d21a-4c83-4c03-9b04-0a26ee227c74","leaderId":"my-app-2d62f453-edd1-4b3f-a2ac-b926abcd9e12","isLeader":false,"memberAssignment":{"orderCreated":[1]},"groupProtocol":"RoundRobinAssigner","duration":1933}

But where did the message go if it wasn’t processed by Consumer#2? For that, let's look at Consumer#1’s logs:

{"level":"ERROR","timestamp":"2020-08-17T16:29:31.573Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"localhost:9092","clientId":"my-app","error":"The group is rebalancing, so a rejoin is needed","correlationId":2325,"size":10}{"level":"ERROR","timestamp":"2020-08-17T16:29:31.574Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"notifications","memberId":"my-app-2d62f453-edd1-4b3f-a2ac-b926abcd9e12","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":258}{"level":"INFO","timestamp":"2020-08-17T16:29:31.579Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"notifications","memberId":"my-app-2d62f453-edd1-4b3f-a2ac-b926abcd9e12","leaderId":"my-app-2d62f453-edd1-4b3f-a2ac-b926abcd9e12","isLeader":true,"memberAssignment":{"orderCreated":[0,1]},"groupProtocol":"RoundRobinAssigner","duration":5}

Once the Notifications Consumer in our Consumer#2 crashed, Kafka initiated a rebalancing of the group and reassigned the partition that was left by Consumer#2 to Consumer#1. And sure enough, we can see the message processed by Consumer#1:

Remaining Message on Consumer#1

This way we can ensure “at least once” message processing for all our messages.

However, there is one important caveat!

Our 2 consumers ran 2 different codes which is why one consumer was able to process the message while the other could not. This is a highly unlikely scenario. In the real world, this will not be the case and the codebase would be consistent across all the consumers.

This means that if there is a bug in the code that does not allow the message to be consumed successfully, the message will keep on circling between the consumers and never be processed successfully!

To handle this, we will need to implement a custom retry strategy that ignores a message /adds to a dead letter queue after a certain number of retries. But that is a topic for another day.

For now, what’s important is that Kafka ensures that all our messages are processed at least once and will keep on being retrying until they’re successful.

Conclusion

And that’s all there is to it! I have tried to keep the implementation simple but these basic concepts can be tweaked easily depending on the use cases! Please provide feedback if there’s anything in the article that’s incorrect or that can be improved. Let me know if you run into any issues while following this and I’ll gladly help!

As mentioned above, We still need to implement a retry mechanism that handles messages which cannot be consumed by the consumers — We will look into that in the next part!

--

--