Heroku Workers on Amazon SQS with Node

Adam Neary
4 min readMar 2, 2015

--

My first app on Heroku was a Rails app, and like the rest of the civilized world, we used Resque to manage the queue.

But if you’re working on an Express app, the choice of platform for queueing background work is less clear-cut. We tried Kue backed by redis, but ran into some challenges. Heroku posted an article on using RabbitMQ, but ultimately we decided to use Amazon’s SQS, and so far we’ve been quite pleased.

We have responsibilities isolated neatly into four services: the worker, the queue, the handler, and the mediator.

The Worker

When Foreman launches a Heroku worker dyno, he is calling worker.coffee:

The worker is responsible for many of the same things as app.coffee: loading environment variables, connecting to the database, registering performance measurement with NewRelic, and registering exceptions with BugSnag.

Beyond that, it passes control to the queueing service, which is where the SQS logic sits.

The Queue

You will notice a couple things. First, everything in the service relates to SQS, which means we could swap out Amazon’s service for another’s in one place.

The service has a function for enqueuing new work, a function for removing items once processed, and a listener. The former two are relatively straightforward, but the latter is a bit less intuitive.

SQS uses long polling, which means that we want to create a function `readMessage` to wrap their `receiveMessage` API call. We call this function immediately after defining it, and we ensure that when the function has completed, it calls itself again no matter what — and only once.

Settings

  • MaxNumberOfMessages defines how many records are retrieved at once. Once a batch of messages is retrieved, async.each will process them in parallel, calling ReadMessage again upon completion whether any fail or not.
  • VisibilityTimeout is the amount of time given to handlers for executing the job. During this window, the item will be unavailable to other message requests (these items will be marked “In Flight”). This is important because only upon success will an item be removed (see below).
  • WaitTimeSeconds is a setting for the long polling process. As long as there are messages to process, SQS will continuously kick messages over to our worker(s). But if there are no messages in the response, we immediately call ReadMessage again, and this is the period of time Amazon will wait before sending another response with no messages.

Error Handling and Removing Items from the Queue

The listener takes a handler function as its sole argument, and in our case the worker passes JobService.processTask, which consumes a task name, timestamp, message body, and a callback.

If any of our jobs lose track of its callback and call a function synchronously, the ReadMessage loop is broken and Odin’s valkyries will encircle our workers, raining their sinewy punishment down upon our hopeless shades, and it will be as the twilight of the gods.

Don’t lose track of your callbacks.

But, if you funnel errors diligently back to the queueing service, it can manage them in a central and marginally less apocalyptic manner.

If an enqueued item is resolved successfully, we remove it from the queue explicitly. Otherwise, we call the error and continue our work. In such a case, our thread moves immediately to the next item in the queue, and SQS will automatically make the job available to the next request.

This would cause an infinite cycle for failing jobs were it not for Deadletter Queues, the final resting place where failing jobs go to die. You simply specify the maximum number of retries a given item will suffer before being shepherded aside, where it can be dissected…and where critical information can be salvaged.

The Handler

Then we have the job handler itself, which we have isolated to a service, an excerpt of which is here:

Ultimately, it is none of the queue’s business how jobs are processed, and yet it seems outrageous to have job-related logic strewn all about.

So we use a job handler to act as an intermediary, consuming tasks and their consistent API we have defined (taskName, sentStamp, body, and callback), calling corresponding services.

The handlers object is as long as the number of unique jobs we have, each key being the name of a task and each value being a function to execute.

The Mediator

This setup would be enough if we only wanted to push items into the queue for processing, and this is the pattern for many of the jobs we run. But, we also want jobs enqueued in response to events, and this is where the Mediator comes in. Here is an excerpt:

Our app can publish events to the mediator, who then enqueues whichever jobs are associated with the event. Common use cases for us is pre-save hooks on key models or scoring algorithm events.

Just like the job service maintains a mapping of task names to functions, our mediator maintains a mapping of events to arrays of tasks.

The result is not really a pubsub model, but it behaves a bit like one, isolating the relationships between events and the tasks kicked off in response to them.

That’s it. Hope this helps!

--

--