YLD
YLD Blog
Published in
6 min readOct 3, 2016

--

Using RabbitMQ and AMQP for Distributed Work Queues in Node.js

This is the last article in the Work Queue Patterns series. In the past article, we examined how to manage distributed work queues using Redis as a queue service. In this article I now explain how you can replace this Redis implementation with a RabbitMQ one.

Besides Redis, Rabbit-MQ is another open-source server that implements persistent queues. Unlike Redis, RabbitMQ’s sole purpose is to provide a reliable and scalable messaging solution with many features that are not present or hard to implement in Redis. RabbitMQ can be used for more than just work queues — it allows you to use it as a generic messaging platform — but for the purpose of this chapter we’re just going to use it to coordinate work between work producers and work consumers.

RabbitMQ is a server that runs locally, or in some node on the network. The clients can be work producers, work consumers or both, and they will talk to the server using a protocol named Advanced Messaging Queueing Protocol (AMQP). For Node.js, in between are several libraries that implement this protocol. We chose ampqlib for being the most friendly to use.

Installing RabbitMQ

To install RabbitMQ you should head out to the official website (http://www.rabbitmq.com) and read the installation instructions for your specific platform:

Once you have installed Rabbit, you need to start it. The way to start it depends on your operating system and the way you installed it. For Homebrew on MacOS:

Installing your dependencies

Now you should create a project directory where the code for your RabbitMQ clients will reside. Inside this directory, create a package.json manifest where you state your dependencies:

package.json:

{
"name": "05-amqp_queue",
"version": "0.1.0",
"dependencies": {
"amqplib": "^0.3.0"
}
}

For now, the only dependency that we need is amqplib. Let's install it then:

$ npm install
amqplib@0.3.0 node_modules/amqplib
├── buffer-more-ints@0.0.2
├── bitsyntax@0.0.4
├── when@3.2.3
└── readable-stream@1.1.13 (inherits@2.0.1, isarray@0.0.1, string_decoder@0.10.31, core-util-is@1.0.1)

Connect

In order to send or receive messages, we need to establish a connection and a channel into the RabbitMQ server. We’re going to put the code that is common between the work producers and the work consumers into a common file named channel.js.

To produce messages, you first need to connect to a given RabbitMQ server, specifying a URL.

channel.js:

var amqp = require('amqplib/callback_api');var url = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672';module.exports = createQueueChannel;function createQueueChannel(queue, cb) {  
amqp.connect(url, onceConnected);
function onceConnected(err, conn) {
if (err) {
cb(err);
}
else {
console.log('connected');
}
}
}

Firstly we’re requiring the amqplib library. Here we're specifically requiring the callback-based API (instead of the promise-based one).

Then we’re fetching the AMQP connection URL from the process environment. If the AMQP_URLenvironment variable is not present, the default is to connect to localhost on the default port, using a guest account username with the same password (which is the default for RabbitMQ). This last default should only be useful while you're developing on your computer. In other environments you should change the RabbitMQ account username and password and make sure it's correctly reflected in the AMQP_URL environment variable.

Then you must connect using the URL you just computed by using amqp.connect, also passing in a callback function for when the connection succeeds. This callback gets an error if it can't connect to the server. If it can, it will get a connection object on the second argument.

The next step is to create a channel once you get connected:

channel.js:

// ...function onceConnected(err, conn) {  
if (err) {
console.error('Error connecting:', err.stack);
}
else {
console.log('connected');
conn.createChannel(onceChannelCreated);
}
function onceChannelCreated(err, channel) {
if (err) {
cb(err);
}
else {
console.log('channel created');
}
}
}

Now we need to make sure that the queue is created on RabbitMQ. For that we can use channel.assertQueue():

// ...function onceChannelCreated(err, channel) {  
if (err) {
cb(err);
}
else {
channel.assertQueue(queue, {durable: true}, onceQueueCreated);
}
function onceQueueCreated(err) {
if (err) {
cb(err);
}
else {
cb(null, channel, conn);
}
}
}

When we have made sure that a queue with such a name exists, we finally call back with no error, passing in the channel and the connection objects.

Produce messages

Now that you have a channel to your RabbitMQ and you have made sure that the queue is created on RabbitMQ, you can finally publish a message to that queue:

producer.js:

var Channel = require('./channel');var queue = 'queue';Channel(queue, function(err, channel, conn) {  
if (err) {
console.error(err.stack);
}
else {
console.log('channel and queue created');
var work = 'make me a sandwich';
channel.sendToQueue(queue, encode(work), {
persistent: true
});
setImmediate(function() {
channel.close();
conn.close();
});
}
});
function encode(doc) {
return new Buffer(JSON.stringify(doc));
}

Here we’re sending a specific message into the queue using channel.sentToQueue(). This function accepts the queue name as the first argument. The second argument expects a buffer, which we produce using the encode function. You can send any binary message to RabbitMQ, and here we're choosing to encode an arbitrary JavaScript document as a JSON string, then creating a binary buffer from it. The third argument to channel.sentToQueue() contains some options. Here we're telling RabbitMQ to persist the message into persistent storage, allowing it to survive crashes and RabbitMQ process re-initialisations.

Next, we’re closing the connection. We’re not closing it immediately, in order to give the channel a chance to send those messages into the network.

Typically you don’t want to do that, since you will probably be creating these messages inside a server serving multiple requests. In this case, you can have a shared connection and send all those messages using this same connection, and you will only close the connection after the server is shut down.

You can now use our producer.js script to produce work requests from the command line:

$ node producer

Each time you run the producer.js script, you insert a work order ("make me a sandwich") into a queue named "queue".

Using different queue names, one single RabbitMQ server or cluster can support several different work types. For instance, you can have a queue named “send-email” for sending emails out, and another named “profile-update” for updating a third-party service with user profile updates.

Consume messages

Now that we have work waiting on our queue, we need to have workers consuming this work:

worker.js:

var Channel = require('./channel');var queue = 'queue';Channel(queue, function(err, channel, conn) {  
if (err) {
console.error(err.stack);
}
else {
console.log('channel and queue created');
consume();
}
function consume() {
channel.get(queue, {}, onConsume);
function onConsume(err, msg) {
if (err) {
console.warn(err.message);
}
else if (msg) {
console.log('consuming %j', msg.content.toString());
setTimeout(function() {
channel.ack(msg);
consume();
}, 1e3);
}
else {
console.log('no message, waiting...');
setTimeout(consume, 1e3);
}
}
}
});

Here we start by creating a channel. Once we have established one, we can start trying to consume messages inside the consume() function. Here we try to get a message using the channel.get function, passing in the queue name. The second argument expects some options. In our case, we don't need to pass any. The final argument is a callback function (onConsume). When fetching a message succeeds or fails, the onConsume callback is invoked. If there is no error, we check whether there was a message waiting for us. If there was no message for us, we wait for a bit and then try again to fetch another one (after one second, in our case).

If there is a message waiting, we process it, doing whatever is the worker’s work (sending an email, contacting a remote server, etc.), and then acknowledging the message and trying to consume a next message. By acknowledging the message, we tell RabbitMQ that this message has been processed, removing it from the pending processing list.

If your worker process dies, the channel will be disconnected. Since RabbitMQ keeps a list of unacknowledged messages per channel, each of those messages will eventually be retried once another worker process fetches it.

This was the last article on the subject of Work Queues — published for YLD.
You can find all the previous posts on this topic here:

--

--

YLD
YLD Blog

YLD is behind many of the products and services you use every day. We create tech and design capabilities for you, that last beyond us. medium.com/yld-blog