How to implement a simple yet bulletproof Kafka Producer/Consumer cycle with nodeJS

Photo by Will Porada on Unsplash

If you are reading this article, I am assuming you probably know what Kafka is and which role it plays in a system architecture solution, but if not, here goes a simple and straightforward concept just in case.

Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log.

Source: https://www.confluent.io/what-is-apache-kafka

Simple, right? And it is!

The beauty of Kafka is that you can abstract complex controls, implementing an agnostic layer between your systems, and rely on a high-available, scalable and fault-tolerant solution.

Okay, so let's dive in

For the sake of simplicity, we are going to use nodeJs to implement our code, but all the concepts explained here are basically the same in every other supported language.

First things first. Check below a high-level overview of Kafka architecture.

What is a Producer?

As the name suggests, its a message producer, or in other words is the source of the data stream.

More detail: https://docs.confluent.io/current/clients/producer.html

In order to produce/insert messages into Kaka Topics (yeah, Topic is another concept you need to know, but in plain/simple English think of a queue where your messages are inserted into and wait to be consumed), we need to perform a sequence of actions, like:

  1. Connect the producer into a Kafka Cluster;
  2. Specify one or more topics where the message will be published;
  3. Send the message;

Let's code step-by-step;

Disclaimer 1: This is for educational purpose only. My goal is to guide you through the knowledge using simple and easy to learn snippets in order to empower you to understand and implement your own version of this architecture.

Disclaimer 2: This isn't a copy and paste guide, so I am assuming you have a basic knowledge in nodeJS (i.e install npm libraries, initiate a project, etc.) and you're working with a local test Kafka environment.

1.a) Create a specific Kafka connection config file.

module.exports = {
clientId: 'nodeJS-medium-kafka-example',
kafka_topic: 'medium-topic-01',
brokers: ['broker:9092'],
connectionTimeout: 3000,
authenticationTimeout: 1000,
reauthenticationThreshold: 10000,
};

1.b) Create a producer file.

const { Kafka } = require('kafkajs');
const config = require('./src/config/kafkaConnection');
// 1.Instantiating kafka
const kafka =
new Kafka(config);
// 2.Creating Kafka Producer
const producer = kafka.producer();
const runProducer = async () => {const message = {
nTransOrderID: 1000,
sTransOrderCode: "TO-101212"
}
// 3.Connecting producer to kafka broker.
await producer.
connect()
await producer.send({
topic: 'medium-topic-01',
messages:
[{ value: JSON.stringify(message) }],
})
}

With these three easy steps, we are able to send messages to our Kafka test environment already.

If you want to simulate sending more than one message (boring), feel free to implement a loop and voilà.

Important producer’s config parameters.

First

acks: The number of acknowledgments the producer requires the leader in a Kafka cluster configuration to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:

  • acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.
  • acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgment from all followers. In this case, if the leader fails immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
  • acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

Second

retries: Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error.

Third

request.timeout.ms: The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

Now, let‘s consume these messages!

What is a Consumer?

Guess what?! Consumers read messages from Kafka topics by subscribing to topic partitions.

Don’t blame on me, I told you Kafka handles the complexity to offer us simplicity. :)

Let’s implement our consumer, creating the following JS file.

const { Kafka } = require('kafkajs');
const config = require('./src/config/kafkaConnection');
// 1.Instantiating kafka
const kafka = new Kafka(config);
// 2.Creating Kafka Consumer and passing group ID.
const consumer = kafka.consumer({ groupId: 'medium-group-01' });
const runConsumer = async () => {// 3.Connecting consumer to kafka broker.
await consumer.connect()
// 4.Subscribing to a topic in order to receive messages/data.
await consumer.subscribe({ topic: 'medium-topic-01', fromBeginning: true })
// 5. Sending an action to be handled for each message RECEIVED.await consumer.run({
eachMessage: async ({ topic, partition, message }) => {

console.log({ "Doing something with the message": topic, partition, message });
},
})
}

Again, five easy steps, and we are receiving messages from our Kafka Topics.

Important consumer’s config parameters.

First and only (in our context).

enable.auto.commit: If true (default) the consumer’s offset will be periodically committed in the background.

This is the most important parameter in our example, and need to be false.

Why? Every time a consumer takes a message from the topic, the offset is committed (besides other factors). The problem is if for some reason a consumer crashes and needs to re-processes the message, the offset was already changed, and the message cannot be retrieved.

enable.auto.commit: false

Now with the parameter changed, we can commit our Offset manually after the processing has taken place. If a crash occurs while a consumer is processing a message it will start consuming from that same Offset and no messages are lost.

So, is there any other place where the magic happens?

Yes, indeed. Implement a)Topics Partitions, b)Consumer Groups, and monitor your LAG.

Topic partition: Topics are divided into partitions, and each message is given an offset. Each partition is typically replicated at least once or twice. Each partition has a leader and one or more replicas (copies of the data) that exist on followers, providing protection against a broker failure. All brokers in the cluster are both leaders and followers, but a broker has at most one replica of a topic partition. The leader is used for all reads and writes.

Consumer group: Consumers can be organized into logic consumer groups. Topic partitions are assigned to balance the assignments among all consumers in the group. Within a consumer group, all consumers work in a load-balanced mode; in other words, each message will be seen by one consumer in the group. If a consumer goes away, the partition is assigned to another consumer in the group. This is referred to as a rebalance. If there are more consumers in a group than partitions, some consumers will be idle. If there are fewer consumers in a group than partitions, some consumers will consume messages from more than one partition.

Lag: A consumer is lagging when it’s unable to read from a partition as fast as messages are produced to it. Lag is expressed as the number of offsets that are behind the head of the partition. The time required to recover from lag (to “catch up”) depends on how quickly the consumer is able to consume messages per second.

Lastly, a quick recap.

  1. Producers guarantee message deliverance through ack, retries, and request.timeout.
  2. Consumers guarantees that they only update offset when they finish messages processing through enable.auto.commit = false.
  3. Topic partition guarantees parallelism and fault tolerance.
  4. Consumer group guarantees parallelism, high throughput, and application isolation to avoid that a bottleneck from an application impacts another application.
  5. Lag: One of the most important KPI not only to prevent system stress but to be used as a base to tune other parameters and get to know more about your data.

What do you think?

What do you think about this architecture? Feel free to offer your perspective and ideas to the comments section below.

If you found this article useful, feel free to smash that clap button 👏 many times to help others find it.

Sources:

--

--

--

Software Engineer, blockchain enthusiast, writer, dad, husband, runner, and learner of life.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Javascript high order function

Tryhackme — Walking An Application

Building a Sentiment Analysis App with React, Flask, and Tesseract

JavaScript Buzzwords Part 2: Isomorphic JavaScript and Server Side Rendering

Populating Next Right Pointers in Binary Tree with JavaScript

How to animate react-native inputs to avoid covering them with the keyboard

Why should you consider a Single Page Application for your next project? — Galaxy Weblinks

WebPack 3. From a config file to the deeps

Get the Medium app

Jonathan Soufer

Jonathan Soufer

Software Engineer, blockchain enthusiast, writer, dad, husband, runner, and learner of life.

More from Medium

Atomic Microservices Transactions with MongoDB Transactional Outbox

Exponential backoff algorithm in NodeJS

A TL;DR guide to MVC frameworks

Node.js development consulting, the development speed booster you need