RabbitMQ Scheduling Messages with Sneakers

In a project, there are always ways to improve the performance of your application. One of the common patterns is to place synchronous work into a background work asynchronously. Common tools that I’ve used for my Rails applications include DelayedJob, Resque, and Sidekiq. If you don’t want to use one of these solutions, an alternative is RabbitMQ’s scheduled messaging.

RabbitMQ has a plugin where you can enable delayed messaging. You will need to have a fairly recent version of RabbitMQ in order for this delayed message plugin to work. First, we have to download the plugin since it does not come with RabbitMQ by default.

I’m currently downloading the plugin to my homebrew installation. If you need to download it to a different path check out where to install it from here ( http://www.rabbitmq.com/installing-plugins.html ).

Downloading it to my local path using wget:

wget http://www.rabbitmq.com/community-plugins/v3.6.x/rabbitmq_delayed_message_exchange-0.0.1.ez

Then install the plugin:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

You should then see the plugin enabled when running the following command: rabbitmq-plugins list

Now that the plugin has been installed we need to create the x-delayed-type exchange. I create this by adding it in the rabbitmq admin or you can also create it via the rabbitmqctl tool.

I then bind this exchange to the queue so that after the delay happens the message will then be routed to the queue specified by the routing key I’ve defined.

Now we are able to publish a delayed message. We have several different ways to publish a message in Ruby. A common client is using the Bunny gem https://github.com/ruby-amqp/bunny, but we’ll be using the Sneakers gem https://github.com/jondot/sneakers, which under the hood uses Bunny.

$pub = Sneakers::Publisher.new({exchange: “delayed.exchange”, exchange_options: { type: “x-delayed-message”, arguments: { “x-delayed-type” => “direct” }, durable: true, auto_delete: false } })
$pub.publish(“Go Dubs!”, headers: { “x-delay” => 20000 }, routing_key: “development_delayed”);

In this example, we are initializing a publisher with the exchange that we created and specifying a few exchange options. When we publish, we have to specify in the headers the amount of time we want to delay the message from being delivered to the queue. The delay time is in milliseconds. In our example, we are setting the “x-delay” header with the value of 20 seconds. Now you may be wondering where is the message being stored since its not being stored in a queue? Mnesia, the RabbitMQ data store.

So another caveat with this example, is that you want to make sure that when you initialize a message you use that same connection over and over again.

Even though this RabbitMQ feature is great, I feel that there is still room for improvement. One of the major things that this feature is lacking compared to tools like Sidekiq or DelayedJob is the visibility into the queue. Whether you are looking up DelayedJob messages by running MySQL queries or checking out the Sidekiq queue using their admin tool. I think once there is an admin interface that exposes the ability to remove/manage the queued messages then I would definitely use it more.