Delayed feature for Rails ActiveJob and Sneakers
One day my colleague blurted out “S#!t!!!, Sneakers don’t support delay!” and pointed me to a comparison chart for ActiveJob’s QueueAdapters. Delayed feature is not a supported feature as shown in the chart. Nor is Retries but that was covered in previous two posts (1, 2). Attempting to enqueue a job with delay results in NotImplementedError. Searching for answers led to a solution like this. But that solution required installation of a plugin that doesn’t seem to be mature enough and may not be available depending on service provider. Basically the plugin is handling the delay in exchange level using the local storage.
To make the Sneakers support delayed feature, it needs to handle two types of delay. One is a constant time delay (:wait option) and the other is variable time delay which is derived given the target time of execution (:wait_until option). So let’s try to implement it without the plugin. From Rails ActiveJob side, only thing needed is to implement enqueue_at method in SneakersAdapter. Rest is up to what I do with Sneakers.
The approach I took is very similar to exponential backoff retries. One constraint you have with RabbitMQ is you can only setup a constant time delay queue using the dead letter policy. And I wanted to make it work for various time delays without creating too many queues. The solution is to setup cascading time delay loops. First setup queues with ttl of differing scales like this: [86400, 3600, 600, 60]. The values represent 24 hrs, 1 hr (60 mins), 10 mins, 1 min. So when a job comes in with delay of 3 days 12 hrs 15 mins, it will be routed to 86400 queue 3 times, routed to 3600 queue 12 times, routed to 600 queue one time, and routed to 60 queue 5 times.
The step down routing logic is implemented in a custom Sneakers handler. The worker for the handler only calls reject! to defer the work to the handler. And the job is enqueued with custom headers parameters :work_at and :work_queue as shown in this gist. Running Sneakers with these classes will create following exchanges and queues:
treadmill-delay type: headers
treadmill-delay-requeue type: topic
treadmill-delay-60 arguments: delay: 60
treadmill-delay-600 arguments: delay: 600
treadmill-delay-3600 arguments: delay: 3600
treadmill-delay-86400 arguments: delay: 86400
After going thru these queues past delay period, the job is published to main exchange (:sneakers as default) with routing_key set to :work_queue value. Then a different worker for that queue will pick up the job and process it and we are done.
I’m not making a pull request for the delay handler to Sneakers as it requires more work (worker and adapter) to make it work and this is just one way of tackling that problem. Best solution would be to support plugin for RabbitMQ at some point. Hopefully there will be a sneakers-rails gem one day that will support all the features.