Implementing RabbitMQ with node.js

This article is the 8th of the article series which will help you grasp different concepts behind Node.js and will empower you to create production ready applications. This article expects the reader to know babel and how to set it up. Please read this article if you need to know the same.

Using a message queue is a no-brainer if you need to throttle the requests going to your api or workers or you want to things happening asynchronously. If you still want more reasons, please go through the following article.

Some of the popular message queues are as below:

RabbitMQ
Kafka
IBM MQ
ActiveMQ
RocketMQ
Qpid

RabbitMQ is the most popular open source message broker. RabbitMQ is lightweight and easy to deploy. It supports multiple messaging protocols. RabbitMQ has a decent performance and a strong community. If your requirement is to process some thousands of messages per second, I would suggest to go for something like RabbitMQ. I have seen people jumping to kafka without going through their requirement properly. No doubt kafka gives better performance and features than messaging queues like RabbitMQ, but it is quite complex to set up. The Kafka server uses Zookeeper for cluster membership and routing. Setting up both servers will be unnecessary if the payload is not that high.

In this article we will be exploring rabbitmq and how to use it with node.js/express apis.

To be honest the rabbitmq documentation is so well written, you don’t need anything else. But we need a rabbitmq tutorial as part of this series. So i will be summarising the documentation to some extent and see how we can build a rest api to work with rabbitmq drivers.

A normal work queue setup in Rabbit MQ looks like below:

Work Queues setup in RabbitMQ

There are 3 important parts in a rabbitmq setup:

  1. Producer : A program that sends messages is a producer. (represented by P in the diagram).
  2. Queue : Messages are stored inside a queue. It’s essentially a large message buffer. A queue is only bound by the host’s memory & disk limits. (represented by Red Rectangular boxes in the diagram).
  3. Consumer : A consumer is a program that mostly waits to receive messages. (represented by C1 & C2 in the diagram).

There are can be multiple producers and consumers to a queue. But in normal use cases, We don’t need multiple producers as putting messages into a queue is quite simple and fast. But consuming a message from the queue and performing a task based on the message normally takes time. Thus we tend to implement multiple consumers for a queue. So that when consumer is busy, the other consumer can read from the queue and perform the task.

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. In this case, if the worker crashes, we will lose the message it was just processing. In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it. RabbitMQ will understand that a message wasn’t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer

But what if the rabbitMQ server crashes ? Will they messages in the QUEUE get lost ?

By default, yes. But there are steps to ensure your queue and messages are persisted to disk. So that in a case of server crash, you can restart your server and process the messages.

First, first we have to ensure our queue is declared as durable. We can either do that in rabbitmq admin console or through code. Then we should ensure the messages in queue are persistent by setting the persistent field as true.

We will check both of them when we code.

Now we can have a rabbitmq setup in our local machine by following this link. But to spice things up we will use a remote rabbitmq server like we do in production environments.

There is a company which is generous enough to provide a free plan for developers.

Their free plan is enough for learning purpose. Create an account in their website and create an instance.

Once you create an instance, click on the RabbitMQ Manager button.

This should open the standard rabbitmq admin console. Let us create a durable queue in the same console.

We can also create an Queue from the code. The Queue should be ready in seconds

Now we need to connect the rabbitmq instance from our code, for that we need the connection urls. Click on the instance name/row to get the connection and other instance details.


You might see lot of simple programs just putting a message into the queue. But in a real life example, it is more likely that a rest api is called which in turns puts the message in the queue.

Let us start with a boiler plate I have made earlier. If you have followed the series from start the boiler plate should be very clear. You can find the code here:

https://github.com/pankaj805/medium-00_boiler_with_db

First install the following library

npm install amqplib --save

In the 5th article, we saw one way to reuse the mongo db connection by injecting it to the request object. In this article we will see another way to achieve the same thing.

Create the following file services/MQService.js

Import the library and keep the connection url in an constant variable.

import amqp from 'amqplib/callback_api';
const CONN_URL = 'amqp://gsgmnvnl:NITe9ThLkXQvKVLl7L6gEtMllb6obQmw@dinosaur.rmq.cloudamqp.com/gsgmnvnl';

We can use the connect() method provided by amqp to create a connection. In order to send or receive messages from a queue. We have to use the channel object. The way to do this is as below:

let ch = null;
amqp.connect(CONN_URL, function (err, conn) {
conn.createChannel(function (err, channel) {
ch = channel;
});
});

Then let’s create a method to send messages to queue which implements sendToQueue method.

export const publishToQueue = async (queueName, data) => {
ch.sendToQueue(queueName, new Buffer(data));
}

The MQService file looks like below:

import amqp from 'amqplib/callback_api';
const CONN_URL = 'amqp://gsgmnvnl:NITe9ThLkXQvKVLl7L6gEtMllb6obQmw@dinosaur.rmq.cloudamqp.com/gsgmnvnl';
let ch = null;
amqp.connect(CONN_URL, function (err, conn) {
conn.createChannel(function (err, channel) {
ch = channel;
});
});
export const publishToQueue = async (queueName, data) => {
ch.sendToQueue(queueName, new Buffer(data));
}
process.on('exit', (code) => {
ch.close();
console.log(`Closing rabbitmq channel`);
});

We also added a process listener to close the rabbitmq connection when we kill the process.

Now let’s add another route in our routes/User.js file. First let us import the publishToQueue method we created in the service.

import {publishToQueue} from '../services/MQService';

and then add the following route method

router.post('/msg',async(req, res, next)=>{
let { queueName, payload } = req.body;
await publishToQueue(queueName, payload);
res.statusCode = 200;
res.data = {"message-sent":true};
next();
})

We are just sending the message to queue and sending a success response.

Now let us work on the consumer. Create a new project where we will define our workers/consumers.

Create file like worker-1.js in the project.

It is a best practice to keep the workers lean and simple as you might need to spawn multiple instances of the same to handle the traffic.

Now for simplicity, i am not using babel here. So you won’t see any ES6 in the worker file.

Write the following code in the worker file.

var amqp = require('amqplib/callback_api');
const CONN_URL = 'amqp://gsgmnvnl:NITe9ThLkXQvKVLl7L6gEtMllb6obQmw@dinosaur.rmq.cloudamqp.com/gsgmnvnl';
amqp.connect(CONN_URL, function (err, conn) {
conn.createChannel(function (err, ch) {
ch.consume('user-messages', function (msg) {
      console.log('.....');
setTimeout(function(){
console.log("Message:", msg.content.toString());
},4000);
      },{ noAck: true }
);
});
});

Here first we import our dependency and then we do the connection initialisation we did earlier. Once we have the channel connection, we use the consume method which takes 3 parameters:

consume(queue_name, callback , ack)

The first parameter is the queue name, the second param is the callback method which will be invoked once we get a message in the queue. The third parameter is the acknowledgement settings.

noAck:true

If we set the “noAck” field as true, then the queue will delete the message, the moment it is read from the queue.

In the callback function, we are printing the message after a delay of 4 seconds just to mock a I/O operation.

function (msg) {
console.log('.....');
setTimeout(function(){
console.log("Message:", msg.content.toString());
},4000);
}

Now let us first run the producer only.

Hit the REST API like we have done before in the earlier articles. Mention the queue-name and the payload as shown in the diagram.

You should have got a response immediately as here we don’t wait for the consumer to perform the task.

As currently we have not started our worker currently, we should see the message in the queue.

Now let us start the worker.

We should see the message after 4 seconds.

Let us hit the api again with a different payload.

The worker reads the message from queue and prints it after 4 seconds. 
Perfect !

But in a real life scenario, the consumer might crash while doing some operation. In that scenario, We want the message to go back to the queue, so that the message can be consumed by another worker or by the same worker when we spawn it again.

To achieve this, we have to do couple of things:

The first thing to do is to change the third parameter value in the consume method. noAck : false

Now as we have set the noAck as false we have to explicitly call the channel.ack() method. So now our consume method looks like below:

ch.consume('user-messages', function (msg) {
console.log('.....');
setTimeout(function(){
console.log("Message:", msg.content.toString());
ch.ack(msg);
},8000);
},
{ noAck: false }
);

We also increased the sleep time so that it is easy for us to reproduce the scenario.

Now let us push another message to queue and then kill the worker before the 8 seconds wait is over.

In the above screenshot, we killed the consumer while it was waiting for the timeout. Thus emulating a consumer crash in between a task.

The message is back in the queue.

Let us restart the worker and see if it reads the message again.

But What if our rabbitMQ instance got down ?? How do we ensure that the payload is not lost ?

We have to declare our Queues to be durable which we have done already in the rabbitmq admin console. Secondly, We have to add a third parameter to the sendToQueue method in our producer code.

{persistent: true}

So the function looks as below:

export const publishToQueue = async (queueName, data) => {
ch.sendToQueue(queueName, new Buffer(data), {persistent: true});
}

This was a very basic usage of rabbitmq. You can do a lot more with it. You can run multiple consumers and decide the the strategy of dispatching messages to them. You can also design a pub/sub service using rabbitmq. You can also do topic based routing.

You can find the code here:
https://github.com/pankaj805/medium-08_mq
https://github.com/pankaj805/medium-08_mq_workers

If you liked the article, you can 👏 the story and share it with others. You can find the entire article series here.

Done for the day !