Integrating Kafka with Node.js: Building Real-time Applications

Raheel Butt
4 min readMay 6, 2024

--

Kafka with Node.js

In today’s fast-paced world, time is everything. Real-time data streaming has become incredibly important for modern applications. Just imagine being able to process large volumes of data instantly, gaining valuable insights in the blink of an eye! Thanks to powerful tools like Apache Kafka and frameworks like Node.js, developers can now build robust applications that handle real-time data seamlessly.

Kafka allows applications to send and receive real-time data feeds, while Node.js offers exceptional performance and scalability, making them a perfect pair. The possibilities are truly limitless, opening doors for innovation like never before. Get ready for an exciting journey into the world of real-time data streaming with Kafka and Node.js!

By the end of this tutorial, you’ll have a solid understanding of how to harness the power of Kafka and Node.js to create real-time data streaming applications that cater to the needs of modern users. Let’s dive in!

Prerequisites: Before we dive into the exciting world of real-time data streaming with Kafka and Node.js, let’s make sure you have everything set up on your environment.

Project Setup:

To start, we have to initialize a node project.

npm init --yes

where — — yes will set the default template values.

Now, we have to install dependencies to implement the Kafka.

npm i body-parser express kafkajs nodemon

and yes, that’s the only dependency we need to run on servers.

Now, we would create two different server files named producer.js and consumer.js.

Before that, In the package.json file, we would write one script.

"scripts": {
"start": "node index.js"
}

Constants.js

export const TOPIC_NAME = "my-topic"
export const GROUP_ID = "test-group"
export const CLIENT_ID = "nodejs-kafka"
export const BROKERS = ["localhost:9092"]

Config.js

import {Kafka} from "kafkajs";
import {BROKERS, CLIENT_ID} from "./constants.js";

const kafkaConfig = {
clientId: CLIENT_ID,
brokers: BROKERS,
};

const kafka = new Kafka(kafkaConfig);

export default kafka;

Producer.js

import kafka from "./config.js";

class KafkaProducer {
constructor() {
this.producer = kafka.producer();
}

async produce(topic, messages) {
try {
await this.producer.connect();
await this.producer.send({
topic: topic,
messages: messages,
});
} catch (error) {
console.error(error);
} finally {
await this.producer.disconnect();
}
}
}

export default KafkaProducer;

Consumer.js

import kafka from "./config.js";

class KafkaConsumer {
constructor(groupId) {
this.consumer = kafka.consumer({groupId});
}

async consume(topic, callback) {
try {
await this.consumer.connect();
await this.consumer.subscribe({topic, fromBeginning: true});
await this.consumer.run({
eachMessage: async ({topic, partition, message}) => {
const value = message.value.toString();
callback(topic, partition, value);
},
});
} catch (error) {
console.error(error);
}
}
}

export default KafkaConsumer;

fromBeginning: is set to true, which means that whenever we start consuming messages from a specific topic, we want to consume all the messages the topic has from the beginning. Otherwise, it will start consuming the messages produced after running our consumer server.

Controller.js

import KafkaProducer from "./producer.js";
import {TOPIC_NAME} from "./constants.js";

const sendMessageToKafka = async (req, res) => {
try {
const {message} = req.body;
const kafkaProducer = new KafkaProducer();
const messages = [{key: "key1", value: message}];
kafkaProducer.produce(TOPIC_NAME, messages);

res.status(200).json({
status: "Ok!",
message: "Message successfully send!",
});
} catch (error) {
console.log(error);
}
};

const constrollers = {sendMessageToKafka};

export default constrollers;

Index.js

import express from "express";
import bodyParser from "body-parser";
import constrollers from "./controller.js";
import KafkaConsumer from "./consumer.js";
import {GROUP_ID, TOPIC_NAME} from "./constants.js";

const app = express();
const jsonParser = bodyParser.json();

app.get("", (req, res, next) => {
console.log('Hello to Node Express and Kafkajs')
return res.send('<h1>Hello to Node Express and Kafkajs</h1>')
});

app.post("/api/send", jsonParser, constrollers.sendMessageToKafka);

// consume from topic "test-topic"
const kafkaConsumer = new KafkaConsumer(GROUP_ID);
kafkaConsumer.consume(TOPIC_NAME, (topic, partition,value) => {
console.log("Topic: ",topic,"Partition", partition)
console.log("📨 Receive message: ", value);
});

app.listen(8080, () => {
console.log(`Server is running on port 8080.`);
});

To interact with Kafka and test our producer and consumer functionality, we’ll use Postman.

NOTE: Make sure your Kafka server and Node.js application are up and running. If not, start them using Docker Compose or any other method you prefer.

To run the Node.js application run the following command.

npm start
Postman
Output

To explore the full code, visit the node-kafka-docker repository. Follow me on GitHub for additional insightful content and updates.

Conclusion:

In this tutorial, we’ve explored how to integrate Kafka with Node.js to build real-time applications. We’ve learned how to create a Kafka producer to send messages and create a Kafka consumer to receive messages. With Kafka and Node.js, developers can build scalable, fault-tolerant, and real-time applications that process data streams efficiently.

That is all for this post, Apache Kafka is quite an extensive topic to cover in a single post, and I’m excited to see you in my other couple of posts with the following topics.

  1. Installation of Apache Kafka and Zookeeper.
  2. Kafka vs RebitMQ
  3. Many more….

If you found this blog post useful then clap, comment, and follow.

🤝 Let’s connect on LinkedIn: Raheel Butt

--

--