Dead Letter Exchange RabbitMq, python implementation and use cases.

Deepak Joseph
4 min readApr 22, 2019

A guide on how to use RabbitMq to temporarily move messages into queue and retrieve later.

Scenario :

I’m building a pipeline, in which there are actions that cannot be performed unless certain artifacts are generated ( in cat language, unless i have get food, i won’t move a bit).

A bit more elaborate explanation for the problem i’m facing.

I have published a message to a queue. My worker is not able to to consume the message due to the unavailability of a dependency. I cannot lose the message, and nack’ing the message would put my worker into an infinite loop, i.e as soon as I negatively ack the message, the message is put back into the queue and any other worker listening to the queue will take the message.This push and pull will go on, enough to choke the network.

So, my first instinct was, I would ack the message, wait for a while and then re-enqueue the message into the main queue.

Well, the message was now getting re-enqueued less frequently, but i had to sleep my worker on a message for a while, which reduced the efficiency of the worker.

Imagine a situation where i have 10 workers, and all of them would be in a limbo for a specified time and the queue would get growing. Once i re-enqueue the message, a free worker would pick it and if the dependencies are not in place it would attempt to re-enqueue and this would go on.

So, i tried to use a re-enqueue counter, set to 25 and a delay ( i.e how long my worker would sleep before i re-enqueue the message. The re-enqueue function was completely written afresh, to perform a publish after an ack to the original message.

Method 2, would be to enqueue the message to a temporary queue and then have another worker look at the delay value or ttl and push the message to the main queue when it hits the threshold. Too much of software engineering to be put to make it work.

The Hero to the Rescue

Dead-Letter-Exchange (DLX) to the rescue!

You can specify a DLX for every queue to deliver the messages that were nacked or rejected . This is the RabbitMQ implementation of Method 2.

Lets see how it works.

I’m using RabbitMQ 3.7.8. and pika==0.12.0 , python 2.7 ( yeah sad).

Steps

  1. Establish Connection
  2. Create a channel
  3. declare main queue ( lets name it hello)
  4. declare delay queue ( lets name it hello_delay) with ttl and DLX parameters.
  5. Publish the message to the delay-queue.
  6. Wait for 5 secs and viola, the message reaches the hello queue.

Here i’ll show you 2 ways to do it.

  1. By Specifying an exchange(amq.direct), and a particular channel for delay and main queue. (Code Courtesy from https://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq)
  2. By using the basic exchange and the same channel.

The first Method can be taken from the link https://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq.

The second method, is written below. Using a single channel and basic exchange. Although this is not recommended, in case you are curious, this is also possible.

import pika,os
from dotenv import load_dotenv
dotenv_path = os.path.abspath(".env.test")
load_dotenv(dotenv_path)


credentials = pika.PlainCredentials(RABBIT_MQ_USER, RABBITMQ_DEFAULT_PASS)
parameters = pika.ConnectionParameters(RABBITMQ_HOSTNAME,
RABBITMQ_PORT,
'/',
credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True) # Main Queue
channel.queue_declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl' : 5500, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange' : '', # Basic Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})
# Published to the hello-delay queue via basic exchange.
channel.basic_publish(exchange='',
routing_key='hello_delay',
body="asd",
properties=pika.BasicProperties(delivery_mode=2))

I hope you have your RabbitMQ user name and password. The above method of establishing a connection using environmental variables can be used in production.

Or If you are running on your local use this

connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))

So, coming back to our problem.

We had 2 problems

  1. Problem of consumers falling into the endless loop of queuing and re-queuing. (This is problem needs to be solved from a client side implementation of having re-try counters, and sending back and error message to your logging system or the caller. The re-try method is provided by RabbitMQ, which i’ll cover later.)
  2. Consumers falling into limbo by processing messages that cannot be processed. ( Consumers are liberated from managing when to push the message back to the main queue, and they can take over tasks that can be worked on, the work of waiting for a specified time is taken care by the RabbitMQ broker, simplifying a lot of our work)

You can read through some of these excellent material on DLX.

  1. https://stackoverflow.com/questions/41185039/delayed-messages-loop-with-rabbitmq
  2. https://medium.com/@dotbox/delayed-requeuing-with-rabbitmq-dcbdf0026bf0
  3. https://www.cloudamqp.com/docs/delayed-messages.html
  4. https://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq

--

--