Using a work queue

This is the third article from a new series about Node.js. In the last article we saw how we can use the async package to control the asychronous callback flow. In this article we'll also be using async, this time to provide a queue, which will allow us to have an even tighter control of the flow of asynchronous operations. I hope you enjoy it!
Some of the code samples in this article you can find in this github repo.
- Pedro Teixeira, CTO, YLD!

Using a work queue

If an external service requires a global maximum of parallel requests, you can concentrate all the work behind a work queue using async. You may want to do this for several reasons:

  • To reduce the pressure on a given external system
  • To perform asynchronous work

Async lets you define a queue, given a work function and a maximum number of outstanding requests. For instance, you can define a Singleton logger module like this:

var async = require('async');
var parallelLimit = 5;
var q = async.queue(sendLog, parallelLimit);
function sendLog(entry, cb) {  
sendLogEntryToExternalSystem(entry, cb);
}
module.exports = push;
function push(entry, cb) {  
q.push(entry, cb);
};

Here, on line 5 we’re creating a queue, passing in a worker function (sendLog) and the maximum number of ongoing operations (five in this case). This singleton module then exports a function (push) that pushes the log into the queue using the queue.push method. This method accepts the payload as the first argument, and an optional callback as the second.

As the callback is optional, it allows the clients of this module not to care about the outcome of this operation:

var log = require('./logger');
log('payload 1');  
log('payload 2');

If interested, as an alternative, the clients may pass in a callback function to get notified of the completion of each operation:

var log = require('./logger');
log('payload 1', handleLogDone);  
log('payload 2', handleLogDone);
function handleLogDone(err) {  
if (err) {
console.error('error logging: ' + err.stack);
}
else {
console.log('logging finished well');
}
}

This last feature can be used to provide a transparent queue in front of a service. Let’s say, for instance, that you have a legacy database that you must use it to insert documents. This legacy database is not very powerful and not particularly scalable, which may start failing during peak traffic. You can absorb peak traffic by putting this fragile service behind an async queue:

document_inserter.js:

var async = require('async');
var q = async.queue(insertDocument, 1);
function insertDocument(doc, cb) {  
fragileRemoteSystem.insertDocument(doc, cb);
}
module.exports = function push(doc, cb) {  
q.push(doc, cb);
};

Bear in mind that pushing an object into the queue occupies memory. A in-memory queue like this one is great to absorb peaks, but if the consuming rate is continuously slower than the producing rate, your Node.js process will collapse due to memory exhaustion. To solve this you can alternatively a) increase the maximum allowed pending operations or b) use a persistent queue.

Next article

The callback pattern works well for simple operations that have a start and an end state; but if you’re interested in state changes, a callback is not enough. In the next chapter we’ll cover the Event Emitter pattern and how we can use it to observe state changes. Stay tuned!

You can find all the previous posts on Flow Control here:

This article was extracted from the Flow Control Patterns, a book from the Node Patterns series.

Written by: Pedro Teixeira


Originally published at blog.yld.io on November 13, 2015.

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.