Rails ActiveJob and Sneakers

A Rails (4.2) project has a RabbitMQ setup where background jobs are enqueued with bunny and consumed by non-ruby-rails worker processes. The goal here is to bring the workers into Rails project as ActiveJobs but keep using the RabbitMQ as the message broker. I have used sidekiq before and I’d love to use it instead but thought this would be a great exercise. Enter sneakers.

First thing to do is to tell Rails to use sneakers as background job processor by setting the configuration attribute.

config.active_job.queue_adapter = :sneakers

And write a job.

class FirstJob < ActiveJob::Base
queue_as :primary
  def perform(*args)
Sneakers.logger.info "Hello World!"
end
end

And run it.

# start sneakers workers
> bundle exec rake sneakers:run
# then in rails console enqueue a job
> FirstJob.perform_later()
Enqueued FirstJob (Job ID: 9f20e4e6-5ebb-4801-b265-f7b5632257a6) to Sneakers(primary)

Log message says it was enqueued but nothing happened. And there is no queue named ‘primary’ created in the RabbitMQ (there is a ‘default’ queue instead). So what’s going on? The sneakers log file showed it was booted with ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper as a lone worker. Inside that code the queue was hard-coded to ‘default’. So if I drop the `queue_as` from my job, it would work but that is not what I want as I’d like to setup multiple queues. This meant that I have to write a worker for each queue I define as below.

class PrimaryWorker
include Sneakers::Worker
from_queue :primary
  def work(msg)
job_data = ActiveSupport::JSON.decode(msg)
ActiveJob::Base.execute job_data
ack!
end
end

Rebooting the sneakers process, the PrimaryWorker is now loaded and ‘primary’ queue is created. Run the FirstJob again and it is processed as expected.

Note: ActionMailer has its own ActiveJob called ActionMailer::DeliveryJob that puts jobs into a ‘mailers’ queue. So a worker for that queue is needed as well.

Now onto retry implementation. There is a Maxretry handler in sneakers so let’s put that in.

class PrimaryWorker
include Sneakers::Worker
from_queue :primary, {
handler: Sneakers::Handlers::Maxretry,
arguments: { :'x-dead-letter-exchange' => "primary-retry" }
}
  def work(msg)
begin
job_data = ActiveSupport::JSON.decode(msg)
ActiveJob::Base.execute job_data
ack!
rescue
reject!
end
end
end

And make a job that will raise error on perform. Running the error job shows the job was executed 5 times (retry_max_times) in 6 second intervals (retry_timeout). This is done by the handler creating ‘{queue-name}-retry’ and ‘{queue-name}-error’ queue. It also created ‘{queue-name}-retry’, ‘{queue-name}-retry-requeue’, and ‘{queue-name}-error’ exchanges. All with ‘#’ as the routing key.

Just one problem with that. This means it will create a set of those queues and exchanges for each queue/worker I define. This is where I started to miss sidekiq. I found the config options to define retry queues and exchanges so I set those as follows.

Sneakers.configure {
retry_exchange: 'activejob-retry',
retry_error_exchange: 'activejob-error',
retry_requeue_exchange: 'activejob-retry-requeue'
}

And change all workers to send retry to ‘activejob-retry’ queue.

class PrimaryWorker
include Sneakers::Worker
from_queue :primary, {
handler: Sneakers::Handlers::Maxretry,
arguments: { :'x-dead-letter-exchange' => "activejob-retry" }
}
def work(msg)
begin
job_data = ActiveSupport::JSON.decode(msg)
ActiveJob::Base.execute job_data
ack!
rescue
reject!
end
end
end

This created only one set of retry/error queues and exchanges. Looks good so I tried my test with multiple queue workers and jobs. Something wasn’t right. All the retry jobs were going to only one of the worker queues. And it happened to be the mailers queue where I was doing an extra work within the worker to handle Devise::Mailer jobs before executing the job. What gives? The answer was in the bindings within the activejob-retry-requeue channel. The worker queues are listed there with routing key all set to ‘#’. So it just went to the first one which was the mailer queue. Not good. There is currently no way to configure it. The routing key needs to be the queue name for worker queue and requeue exchange binding in this case. That way , failed jobs will get requeued and routed back to origin worker queues. So I forked the sneakers project to handle that for now. And the final version of the worker code is:

class PrimaryWorker
include Sneakers::Worker
from_queue :primary, {
handler: Sneakers::Handlers::Maxretry,
retry_routing_key: 'primary',
arguments: { :'x-dead-letter-exchange' => "activejob-retry" }
}
def work(msg)
begin
job_data = ActiveSupport::JSON.decode(msg)
ActiveJob::Base.execute job_data
ack!
rescue
reject!
end
end
end

And it works!

Now where’s that exponential backoff stuff…

UPDATE: The fix is merged and available as of version 2.5.

UPDATE: Exponential backoff is posted here.

UPDATE: Delay feature is posted here.

Alternatively, you can give PikaQue a try.