Roger, Roger

Node.js + RabbitMQ + Acknowledgements

Following on from my simple getting you started post on RabbitMQ with Node.js, this post looks at a more real world aspect of using queues: acknowledgements.

Typically you want to know when a message has been successfully placed on a queue. There are application scenarios where that is not the case, but in most cases you care that the queue has received the message so your application can safely move on.

RabbitMQ achieves this by using acknowledgements and the AMQP module models those acknowledges in the form of a callback. Here’s some code to illustrate the point:

var amqp = require('amqp');
// Make a connection
var connection = amqp.createConnection();
...
// Create | Connect to an exchange
connection.exchange(..., function(ex) {
 // Publish message
ex.publish(..., function() {
   // ACK
connection.end(); // ← end
});
});

The thing I want to point out with this code is that the publish method accepts as the last parameter a function, a callback, which the AMQP module calls when the message has been successfully queued.

The other key difference from the sample code is we need a reference to an exchange. If you recall, an exchange is where we really send the messages to enable the routing and semantics provided by RabbitMQ, you don’t publish messages to queues, you publish them to exchanges, which in turn route the messages to a queue or queues as necessary.

In this case, the reason I need a reference to an exchange is because I need to control some of the properties of that exchange, namely the message confirmation flag. Here is how I created and configured the exchange to make acknowledgements work:

// When the connection is open
connection.on('ready', function() {
 // Create the exchange
connection.exchange(
'acks-ex', // Name of the exchange
{ confirm: true }, // ** Options for the exchange
function(ex) { // Called when created
// Publish message
}
);
});

The confirm:true setting states that I want the exchange to confirm when a message as been queued; this is the magic configuration required to make the AMQP module invoke the callback; otherwise your code will just hang—there is one other setting we need, and that’s all to do with how we publish the message.

// Publish message
ex.publish(
'spike', // The routing info (queue name)
'hello', // The message
{ deliveryMode: 2 }, // Message delivery mode, 2 means persist
function() {
// ACK
connection.end(); // <-- end
}
);

The secret source here is that little deliveryMode: 2, add that to the confirm:true on the exchange and you’re all set.

In theory. In practice… not so much.

I read the docs and wrote code every similar to this—but the callback was never called, my program just hung, despite what the docs said. I scratched my head for a good long while before I discovered that the AMQP module in the NPM registry is broken and subsequently the fix on Stack Overflow.

To save you time if you don’t want to read those posts, the solution is to use the tip version of the GitHub repo for the AMQP module in your project and all will be well. To pull in the tip version into your project you can use the following NPM command:

$ npm install https://github.com/postwait/node-amqp/tarball/master

With the magic two switches and the updated AMQP module source you’re all set for acknowledgements in RabbitMQ with Node.js.

Finally, I’ve created a gist of the complete code sample used in this post for your information and entertainment.