Live Streaming Data Using Kafka, Node JS, WebSocket and Chart JS

Aprajita
5 min readSep 20, 2019

--

Yeah well the title of the story pretty much define what we were trying to achieve. This was a small POC that we started as a stepping stone to handle a huge data coming from our client and provide a live streaming after performing some processing.

I wanted to share my experience and give a walk through to anyone who would want to create a simple Live streaming of data using Kafka, Node JS, WebSocket and Chart JS :)

The reason for writing down this story is not to define something which might sound quite obvious but to allow reader to be able to follow the steps and have a working example. I found articles related to this topic on google but hardly found a step to step instruction that i could follow. And being new to Kafka, Node JS it could be daunting.

Follow the guide below and you should be able to have something like this up and running in minutes.

  1. Start with the basic Kafka installation on your machine using https://kafka.apache.org/quickstart

Use the steps mentioned here to set-up and start Kafka server, create a topic and then use the Kafka console producer to send messages to the topic.

That mean do as said from Step 1 to Step 3 as in the Kafka quick-start document. Further we will use Step 4 to send some messages.

I have created a topic say — test1 and have my producer ready as below

2. For creating a consumer using Node JS, we will be using https://www.npmjs.com/package/kafka-node#consumer . The purpose is straightforward — read messages from the Kafka topic. Following is a small snippet of code to create a consumer for the topic test1 that we created earlier in step 1. The topic I created is available on localhost:9092, modify your code according to your settings. (Just keep reference of this code in mind, we will be using it further)

var kafka = require(‘kafka-node’);
var Consumer = kafka.Consumer,
client = new kafka.KafkaClient(“localhost:9092”),
consumer = new Consumer(
client, [ { topic: ‘test1’, partition: 0 } ], { autoCommit: false });
consumer.on(‘message’, function (message) {
console.log(message);
});

3. On the Node JS service side we will be creating an WebSocket API using https://www.npmjs.com/package/websocket, which will be returning back data to the client (browser). A simple example would be as following:

var WebSocketServer = require(‘websocket’).server;
var http = require(‘http’);

var server = http.createServer(function(request, response) {
console.log(‘ Request recieved : ‘ + request.url);
response.writeHead(404);
response.end();
});
server.listen(8080, function() {
console.log(‘Listening on port : 8080’);
});

webSocketServer = new WebSocketServer({
httpServer: server,
autoAcceptConnections: false
});

function iSOriginAllowed(origin) {
return true;
}

webSocketServer.on(‘request’, function(request) {
if (!iSOriginAllowed(request.origin)) {
request.reject();
console.log(‘ Connection from : ‘ + request.origin + ‘ rejected.’);
return;
}

var connection = request.accept(‘echo-protocol’, request.origin);
console.log(‘ Connection accepted : ‘ + request.origin);
connection.on(‘message’, function(message) {
if (message.type === ‘utf8’) {
console.log(‘Received Message: ‘ + message.utf8Data);
}
});
connection.on(‘close’, function(reasonCode, description) {
console.log(‘Connection ‘ + connection.remoteAddress + ‘ disconnected.’);
});
});

Step 3 Continued :

The purpose is to combine the Kafka consumer and the WebSocket API to play together and return data coming from Kafka producer. So combining the above two steps will result in something like this, let’s call it consumer.js.

var WebSocketServer = require(‘websocket’).server;
var http = require(‘http’);

var kafka = require(‘kafka-node’);
var Consumer = kafka.Consumer,
client = new kafka.KafkaClient(“localhost:9092”),
consumer = new Consumer(
client, [ { topic: ‘test1’, partition: 0 } ], { autoCommit: false });


var server = http.createServer(function(request, response) {
console.log(‘ Request recieved : ‘ + request.url);
response.writeHead(404);
response.end();
});
server.listen(8080, function() {
console.log(‘Listening on port : 8080’);
});

webSocketServer = new WebSocketServer({
httpServer: server,
autoAcceptConnections: false
});

function iSOriginAllowed(origin) {
return true;
}

webSocketServer.on(‘request’, function(request) {
if (!iSOriginAllowed(request.origin)) {
request.reject();
console.log(‘ Connection from : ‘ + request.origin + ‘ rejected.’);
return;
}

var connection = request.accept(‘echo-protocol’, request.origin);
console.log(‘ Connection accepted : ‘ + request.origin);
connection.on(‘message’, function(message) {
if (message.type === ‘utf8’) {
console.log(‘Received Message: ‘ + message.utf8Data);
}
});
consumer.on(‘message’, function (message) {
console.log(message);
connection.sendUTF(message.value);
});

connection.on(‘close’, function(reasonCode, description) {
console.log(‘Connection ‘ + connection.remoteAddress + ‘ disconnected.’);
});
});

Use command- node consumer.js to keep everything up and running

4. At the web browser (our client) we would invoke the WebSocket API and display the data as using Chart JS.

Whip out a index.html that would just include a placeholder for the chart and some basic script references:

<html>
<head>
<script src=”https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.slim.min.js"></script>
<script src=”https://cdn.jsdelivr.net/npm/chart.js@2.8.0"></script>
<script type=”text/javascript” src=”index.js”></script>
</head>
<body>
<canvas id=”myChart” width=”400" height=”400"></canvas>
</body>
</html>

In the index.js we would create a basic line chart and invoke the WebSocket API as shown below:

$(document).ready(function(){
var count = 10;
var data = {
labels : [“0”,”1",”2",”3",”4",”5", “6”, “7”, “8”, “9”],
datasets : [
{
fillColor : “rgba(220,220,220,0.5)”,
strokeColor : “rgba(220,220,220,1)”,
pointColor : “rgba(220,220,220,1)”,
pointStrokeColor : “#fff”,
data : [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
}
]
}
var updateData = function(oldVal, newVal){
var labels = oldVal[“labels”];
var dataSetInitial = oldVal[“datasets”][0][“data”];
labels.shift();
count++;
labels.push(count.toString());
var newData = Math.floor(newVal);
dataSetInitial.push(newData);
dataSetInitial.shift();
};
var ctx = document.getElementById(“myChart”).getContext(“2d”);
var chart = new Chart(ctx);
chart.Line(data, {animation : false});

function webSocketInvoke() {

if (“WebSocket” in window) {
console.log(“WebSocket is supported by your Browser!”);
var ws = new WebSocket(“ws://localhost:8080/”,”echo-protocol”);

ws.onopen = function() {
console.log(“Connection created”);
};

ws.onmessage = function (evt) {
var received_msg = evt.data;
updateData(data, evt.data);
chart.Line(data, {animation : false})
console.log(received_msg );
};

ws.onclose = function() {
console.log(“Connection closed”);
};
} else {
alert(“WebSocket NOT supported by your Browser!”);
}
}
webSocketInvoke();
});

Open the index.html on any browser, We are all ready to check the live streaming of data by sending messages using Kafka console producer. The data would be shown as a line graph which will keep on updating as the messages are send.

See no rocket science involved.

This is the most basic example and in the upcoming stories I will be doing refactoring and make a more robust system. Hope this current draft allow you to create a simple example and understand end to end flow.

--

--