Re-routing messages with delay in RabbitMQ

Eran Amrani
Nielsen-TLV-Tech-Blog
4 min readNov 24, 2019

Our company’s architecture is based on the microservices pattern, a variant of the service-oriented architecture (SOA) — that structures an application as a collection of loosely coupled services. For messaging between the services (e.g. trigger an action) we use RabbitMQ as a message broker

The problem

We had several occasions in which a service consumer couldn’t complete its logic due to an unexpected error or unavailability of a dependant resource that caused message rejection, which shovels the message to DLX (Dead Letter Exchange) automatically. We had to handle the rejected messages manually.

If we chose to reject the message with re-queue being true, it will be instantly redelivered to the consumer, resulting in a very high workload on the system.

What we needed is to retry the action by rerouting the message back to the original queue but with a delay.

The Possible Solutions

I took it upon myself the task and researched to find the appropriate solution to fit our company flow and needs, and these were the options:

  • Combining the message TTL function and the dead-lettering function: By combining these two functions (have the queue and the dead letter under the same exchange with appropriate routing keys), rejected messages will be shoveled from the queue to dead letter and after TTL expired the message will be re-routed to the queue and so on until it is successfully consumed. A guide for that can be found here.
  • Delay messages plugin: When a message is rejected it is moved to a delayed exchange and hangs until the delay is timed out and then the delay consumer is checking retry attempt count and if it’s valid we move back to the original queue and if it’s not it goes to the main DLX queue for manual action

The chosen one

The RabbitMQ Delayed Message Plugin (RabbitMQ 3.5.3 and later versions) adds a new exchange type to RabbitMQ where messages routed by that exchange can be delayed if the publisher adds a delay header to a message.

steps to implement:

  • Define rabbit delay exchange “DELAY” by installing and enabling the “rabbitmq_delayed_message_exchangeplugin
  • Define “DELAY” exchange as the DLX queue for rejected messages
channel.exchangeDeclare("DELAY", "x-delayed-message", {
autoDelete: false,
durable: true,
arguments: {
'x-delayed-type': "fanout",
'dead-letter-exchange': "DLX.DEAD.LETTERS"
}
}))
  • Add the following headers to the rejected message metaData:
'x-retry': counter of retries attempts
'x-retry-limit': limit number of retry attempts
'x-delay': number of milliseconds the message should be delayed

To delay a message a user must publish the message with the special header called x-delay which takes an integer representing the number of milliseconds the message should be delayed by RabbitMQ

  • Delay consumer checks if the retry attempt is in the range of retry limit and reroutes to original exchange with the same routing key for another attempt after incrementing the “x-retry” header counter.
  • If retry limit exceeded then it will be rejected from delay consumer and passed to DLX queue for manual action and debugging
  • Add monitor for DLX queue to send a notification once a message arrives at the DLX after retry attempts exceeded

Flow Diagram:

Code example

const RETRY_LIMIT = 5
const DELAY_DEFAULT = 5000
const RETRY_ATTEMPT = 'x-retry'
const DELAY_INTERVAL = 'x-delay'
const X_DEATH = 'x-death'
const EXCHANGE = 'exchange'
const ROUTING_KEY = 'routing-keys'
class RetriesConsumer extends BaseConsumer {
constructor() {
super('retries.queue')
this.setHandler(this.retry.bind(this))
}
shouldRetry(headers) {
return headers && RETRY_ATTEMPT in headers && headers[RETRY_ATTEMPT] <= RETRY_LIMIT
}
async shovelMsg(exchange, routingKey, data, headers) {
await publisherService.publish(exchange, routingKey, data, headers)
}
getDelayRetryInterval(retryCount) {
return Math.pow(2, retryCount - 1) * DELAY_DEFAULT
}
async retry(msg) {
try {
const {metadata, data} = msg
const {headers} = metadata
const causeOfDeath = headers[X_DEATH][0]
const exchange = causeOfDeath[EXCHANGE]
const routingKey = causeOfDeath[ROUTING_KEY][0]
if (this.shouldRetry(headers)) {
const retries_attempt = headers[RETRY_ATTEMPT]
const delayInterval = this.getDelayRetryInterval(retries_attempt)
const retryConfig = {}
retryConfig[RETRY_ATTEMPT] = Number(retries_attempt || 0) + 1
retryConfig[DELAY_INTERVAL] = delayInterval
await this.shovelMsg(exchange, routingKey, data, retryConfig)
return Promise.resolve()
}
return Promise.reject(e)
} catch (e) {
Logger.error(`Failed to re-process operation. Message: ${JSON.stringify(data)} Error: ${err}`)
return Promise.reject(e)
}
}
}

Summary

Manually handling rejected messages does not scale well. After we saw that this is happening more and more we decided we need automation. We decided to use the delayed queue mechanism as it offers an infrastructural solution that answers our requirements quite well and is easy to add and maintain.

On a personal note, I love working with Rabbitmq service and I enjoyed investigating and finding the appropriate solution to our needs.

As always, please comment if you have any suggestions or feedback.

Thanks for reading!

--

--