Implementing a Job Queue with Node.js

In this post, we will go through the step-by-step implemention of a simple job queue using Node-Kue and Redis


At the simplest level, a message queue is a way for applications and discrete components to send messages between one another in order to reliably communicate.
Message queues are typically (but not always) ‘brokers’ that facilitate message passing by providing a protocol or interface which other services can access. This interface connects producers which create messages and the consumers which then process them.

A job queue is similar to a message queue and contains an ordered list of jobs to be performed by a separate subsystem.

In this post, we will use Kue, a Node.js module for creating a job queue and processing jobs in the background. Kue is backed by Redis and it is designed to be simple to use.

Before moving on, ensure that Redis is installed and a redis-server is running on your system.

Add Kue as a dependency in the package.json file in the root directory of your application:

/* package.json */
{
"name": "Sample App",
"author": {
"name": "Prateek Bhatt"
},
"version": "0.1.0",
"dependencies": {
"kue": "~0.6.2"
},
"devDependencies": {}
}

Then run the following command to install all dependencies into the node_modules directory:

npm install

Lets create a simple job queue

Create a new file simple-job-queue.js , and require the kue module:

/* simple-job-queue.js */
var kue = require(‘kue’);                                                    var jobs = kue.createQueue();                                        

Lets add a function newJob that will be used to add a new job to the “new_job” queue:

/* simple-job-queue.js */
...
function newJob (){
var job = jobs.create('new_job');
job.save();
}

Now, we need to start processing each job in the “new-job” queue. To do that, we need to call the jobs.process function as shown below:

/* simple-job-queue.js */
...
jobs.process(‘new_job’, function (job, done){
console.log(‘Job’, job.id, ‘is done’);
done && done();
})

For testing our “new_job” queue, lets use javascript’s setInterval method to add a new job to the queue every 3000 milliseconds:

/* simple-job-queue.js */
...
setInterval(newJob, 3000);

Now, go to your command line and run the following to test your first task queue!

node simple-job-queue.js

You will get an output in the format below:

Job 1 is done
Job 2 is done
Job 3 is done
...

In a real-world scenario, we will need to add additional parameters while queuing a job. We can achieve that passing an additional object containing job-specific params while calling the jobs.create function. For e.g. here we pass a name param to each job:

/* simple-job-queue.js */
...
function newJob (name){
var job = jobs.create('new job', {
name: name
}
);
job.save();
}

While processing jobs, we need to access the params assigned to any specific job while it was queued. These job specific params can be accessed from the job.data object, e.g.

job.data.name

We can also add job-specific events to trigger callbacks when a specific job has completed or failed (highlighted in bold below):

/* simple-job-queue.js */
var kue = require('kue')
, jobs = kue.createQueue()
;
function newJob (name){
name = name || 'Default_Name';
var job = jobs.create('new job', {
name: name
});
 job
.on('complete', function (){
console.log('Job', job.id, 'with name', job.data.name, 'is done');
})
.on('failed', function (){
console.log('Job', job.id, 'with name', job.data.name, 'has failed');
});
 job.save();
}
jobs.process('new job', function (job, done){
/* carry out all the job function here */
done && done();
});
setInterval(function (){
newJob('Send_Email');
}, 3000);

The code for this project is hosted on Github at: github.com/prateekbhatt/node-job-queue

In the next post, we will see the implementation of a nested task queue, where “parent” tasks are completed only when a set of “children” tasks is completed.