Replaying DLX messages in RabbitMQ with admin tool

We’ve been using RabbitMQ for more than a year now in production and I want to share how we tackled replaying messages that get sent into the dead-letter exchange (DLX). If you are new to RabbitMQ and want to try it out as a messaging broker, I’d highly recommend just playing around with http://tryrabbitmq.com to design your system.

One of the major requirements when setting up RabbitMQ is how we deal with failures. If a message is not acknowledged by your consumer due to a timeout or an exception, how do we fix the issue? This is where the dead-letter exchange comes into play. When a message is not acknowledged or times out it goes into the DLX queue. This is where you can decide what to do with the message. Should you replay the message or discard it? I’ll show you how we tackled this question by building out an Admin tool in Rails.

Lets setup a scenario where this would happen and look at the queue. I’ll be doing all of the examples in Ruby and the Ruby client for RabbitMQ called Bunny ( https://github.com/ruby-amqp/bunny ). We’ll be building queues to support a blog application.

First, lets setup the Queues

We need two queues:

  1. Where our normal blog messages will go
  2. Where our dead-letter messages will go.

In your development environment, go to the RabbitMQ admin (assuming you’ve got RabbitMQ setup) at http://localhost:15672/ or you can also do the creation using the rabbitmqctl command line tool.

Set a TTL, if message from a consumer is not received within the TTL period, in this case 1 second, then it will go to the DLX queue
Setting up our DLX queue for development environment

Next, lets build out the Exchanges

We need to build two different exchanges:

  1. The exchange where we can publish our blog messages using a direct exchange.
  2. The exchange where our messages will be funneled to our DLX queue. This exchange must be a fanout. A direct exchange will not work.
Blog direct exchange
DLX exchange using fanout

Bind the exchanges to the queues

The go to the development_blog queue in the RabbitMQ admin and then bind with the blog exchange we just created. Do the same with the DLX queue and the dlx.failure exchange we created.

Publish some messages to our blog queue

require “rubygems”
require “bunny”
puts “=> Direct exchange routing”
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.direct(“blog”, durable: true)
5.times do
q.publish(“hello world”)
end
sleep 0.5
puts “Disconnecting…”
conn.close

Wait a second or so for the message to “die” and go to the DLX.

Decision making time

Now that the messages are in the DLX queue, we can decide what to do with these messages. Some questions that might arise are:

  • Can we automate the messages to be replay-able?
  • Can we manually decide what to do with the failures?

Both of these can be solved in various ways. I tried looking for existing solutions that would solve both of these problems, but didn’t really find anything good out there. If anyone knows any good solutions please comment below.

To automate the messages being replay-able, you would need a worker to fetch the messages and then republish back to the existing queue. You can also add a layer of business logic where it decides whether or not the message should be replay-able.

The other case is where the human factor comes into play. We want to manually decide what to do with the messages, so a solution is to build an admin tool for this queue. We will build an admin tool using Rails and fetch the messages using the RabbitMQ API http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html

A few things I need for this tool. A model to store each failed message. A controller for my admin tool to fetch, delete, or replay a message. Finally, a view to display the message, from what queue it came from, timestamps, and action buttons. I’m not going to go into huge detail on how to implement each individual piece, but I mainly want to show the model portion since the controller and view is pretty straight-forward.

class FailedMessage < ActiveRecord::Base
attr_accessible :queue, :exchange, :routing_key, :vhost, :payload, :message_failed_at, :deleted_at
default_scope { where(deleted_at: nil) }
 ########## Begin of API Class ###########
class Api
AMQP_CONFIG = YAML.load(File.read(“# {Rails.root}/config/amqp.yml”))[Rails.env]
   include HTTParty
base_uri “#{AMQP_CONFIG[“host”]}:#{AMQP_CONFIG[“api_port”]}”
 class << self
def purge_and_load
options = { body: { count: 100, requeue: false, encoding: “auto” }.to_json }.merge(auth)
data = post(“/api/queues/#{CGI::escape(vhost)}/development_dlx_queue/get”, options)
data.each do |msg|
payload = generate_payload(msg)
FailedMessage.create(payload)
end
end

private
    def auth
{ basic_auth: { username: AMQP_CONFIG[“username”], password: AMQP_CONFIG[“password”] } }
end
    def generate_payload(msg)
headers = msg[“properties”][“headers”][“x-death”].first
{
queue: headers[“queue”],
exchange: headers[“exchange”],
routing_key: headers[“routing-keys”].first,
message_failed_at: Time.at(headers[“time”]),
payload: msg[“payload”]
}
end
end
end ########## End of API Class ###########
end

We fetch 100 messages at time from the API even though there might not be that many in the queue to deal with. After receiving the message we then parse out the necessary data we need to persist in our model. Now that you have persisted the necessary data to republish your message using this data model. You just need to have you Bunny publisher republish the message given the queue, exchange, routing_key, and data payload. You can monitor this queue and have your engineers work on resolving failures on a periodic basis. This is a very simple and quick solution that works for us and hopefully you guys find this useful too.

Show your support

Clapping shows how much you appreciated Charles Wang’s story.