Beginner’s Guide to Node.js and Kafka Integration
I’ll talk about how I set up Kafka to provide website analytics and event tracking for a web application.
What is Kafka
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.Here are some use cases of Kafka include:
✨Real-time stream processing
✨Messaging system
✨Log Aggregation
✨Event sourcing
✨Data pipeline
This is a high level architecture diagram of how Kafka works:
To understand Kafka, let’s first list some of the basics:
Applications that send data to Kafka are known as producers. Producers publish records to Kafka topics, and each record consists of a key, value, and timestamp.
Applications that read data from Kafka are referred to as consumers. Consumers subscribe to specific topics and receive a continuous stream of records. They can also be notified whenever a new record is sent to the subscribed topic.
Producers send records to Kafka, and each record is associated with a topic. Topics represent a particular stream of data.
Kafka maintains an offset for each consumer within a topic. The offset represents the position of a consumer within the topic’s log.
Kafka ensures the order of messages within a given topic, regardless of the number of consumers or producers.
Setting up Kafka locally
Follow steps 1 to 4 of the official Kafka Quickstart Guide available on the Apache Kafka website.
$ tar -xzf /path/to/kafka_2.11–1.0.0.tgz cd kafka_2.11–1.0.0
$ tar -xzf /path/to/kafka_2.11–1.0.0.tgz
Kafka relies on ZooKeeper, so before starting Kafka, you need to set up a ZooKeeper server if you don’t already have one. Kafka provides a convenient script that allows you to quickly set up a single-node ZooKeeper instance for testing purposes.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Then start the Kafka server
$ bin/kafka-server-start.sh config/server.properties
$ bin/kafka-server-start.sh config/server.properties
Let’s create a topic named events.dev
with a single partition and only one replica:
$ bin/kafka-topics.sh — create — zookeeper localhost:2282— replication-factor 1 — partitions 1 — topic events.dev
$ bin/kafka-topics.sh — create — zookeeper localhost:2282 — replication-factor 1 — partitions 1 — topic events.dev
Then run the list topic command to see the topic:
$ bin/kafka-topics.sh — list — zookeeper localhost:2282 events.dev
$ bin/kafka-topics.sh — list — zookeeper localhost:2282
Now, Kafka is set up and running on http://localhost:2282.
Kafka-producer in Javascript
You can use the kafka-node
npm module to write a Kafka producer in JavaScript. Depending on your use case, you can either have the producer live on its own server or integrate it with your existing web application.
const kafka = require('kafka-node');
const uuid = require('uuid');
const client = new kafka.Client("http://localhost:2282", "my-client-id", {
sessionTimeout: 300,
spinDelay: 100,
retries: 2
});
const producer = new kafka.HighLevelProducer(client);
producer.on("ready", function() {
console.log("Kafka Producer is connected and ready.");
});
// For this demo we just log producer errors to the console.
producer.on("error", function(error) {
console.error(error);
});
const KafkaService = {
sendRecord: ({ type, userId, sessionId, data }, callback = () => {}) => {
if (!userId) {
return callback(new Error("A userId must be provided."));
}
const event = {
id: uuid.v4(),
timestamp: Date.now(),
userId: userId,
sessionId: sessionId,
type: type,
data: data
};
const buffer = new Buffer.from(JSON.stringify(event));
// Create a new payload
const record = [
{
topic: "events.dev",
messages: buffer,
attributes: 1 /* Use GZip compression for the payload */
}
];
//Send record to Kafka and log result/error
producer.send(record, callback);
}
};
export default KafkaService;
//Kafka-producer.js
The KafkaService
object is defined with the sendRecord
method, which accepts a record object and an optional callback. The topic is set as "events.dev" for sending the records.
Kafka-consumer in Javascript
You can use the kafka-node
npm module to write a Kafka consumer in JavaScript. Here's an example of how you can create a Kafka consumer using kafka-node
in a separate Express server to listen to events and store them in a database:
const kafka = require('kafka-node');
const client = new kafka.Client("http://localhost:2282");
const topics = [
{
topic: "events.dev"
}
];
const options = {
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: "buffer"
};
const consumer = new kafka.HighLevelConsumer(client, topics, options);
consumer.on("message", function(message) {
// Read string into a buffer.
var buf = new Buffer(message.value, "binary");
var decodedMessage = JSON.parse(buf.toString());
//Events is a Sequelize Model Object.
return Events.create({
id: decodedMessage.id,
type: decodedMessage.type,
userId: decodedMessage.userId,
sessionId: decodedMessage.sessionId,
data: JSON.stringify(decodedMessage.data),
createdAt: new Date()
});
});
consumer.on("error", function(err) {
console.log("error", err);
});
process.on("SIGINT", function() {
consumer.close(true, function() {
process.exit();
});
});
Once you have written the Kafka consumer code, you can start it by running the following command in your terminal or command prompt:
node kafka-consumer.js
This command will execute the consumer script and start consuming events from the Kafka topic.
I hope that this guide has provided you with a solid understanding of how to initiate your Kafka and Node.js .If you have any further inquiries or feedback, please don’t hesitate to reach out. Thank you!