Handling message retries in RabbitMQ with delay

Harshit Khandelwal
SAFE Engineering
Published in
6 min readJun 25, 2024

In our journey towards building a more efficient and responsive system, we’ve recently moved our architecture to event-based to achieve near real-time processing. Multiple microservices are now communicating with each other asynchronously via a common RabbitMQ (RMQ) event bus. This shift has significantly improved our system’s responsiveness, but it also introduced challenges in handling message failures. To address this, we need a robust retry mechanism that ensures messages are reprocessed after a delay, allowing transient issues to be resolved.

Implement Retry Mechanisms?

In an event-driven architecture, handling message failures gracefully is essential for maintaining system reliability and efficiency. Tracking and managing failures can become a daunting task without a proper retry mechanism. When a message processing fails due to a transient issue, it’s crucial to have a system that can handle these failures effectively, ensuring they do not lead to data loss or system inconsistency.

A robust retry mechanism provides better handling of unexpected errors by giving failed messages another chance to be processed. This ensures that temporary glitches do not result in permanent issues. One of the core advantages of implementing retries is enhancing system resiliency by ensuring that messages are retried instead of directly being pushed into the dead letter queue.

Moreover, re-queuing failed messages without any delay can quickly overload the system. If a message fails and is immediately re-queued, it can lead to a vicious cycle of failures, increasing the load on the system and potentially causing further issues. Introducing a delay between retries helps to smooth out the load, preventing system overload and ensuring a more stable processing environment.

By incorporating a well-thought-out retry mechanism, including delayed and exponential backoff strategies, we can significantly enhance the fault tolerance and reliability of our RabbitMQ-based event-driven architecture. This ensures that our microservices communicate smoothly and our system remains robust even in the face of transient errors.

Chosen Solution:

Using the RabbitMQ Delayed Message Plugin

To handle retries effectively, we will use the RabbitMQ Delayed Message Plugin, available in RabbitMQ 3.5.3 and later versions. This plugin allows us to manage delays at the exchange level, ensuring that delay logic is handled within RabbitMQ itself rather than in the consumer code. By doing this, we can simplify our consumer logic and leverage RabbitMQ’s powerful message-handling capabilities.

The RabbitMQ Delayed Message Plugin enables us to specify a delay at the exchange level before delivering messages to the queue. This allows for precise control over message retries, including the ability to configure exponential delays based on the retry count. When a message processing fails, it can be re-routed to a delay exchange with a specified delay. After the delay period, the message is delivered back to the original queue for reprocessing. This solution allows us to handle transient errors efficiently, avoid system overload, and ensure that messages are processed successfully or appropriately flagged for further investigation.

Enough talk! Let’s jump into the implementation.

Implementation steps:

Now we’ll go into the detailed implementation with the example of code written in TypeScript using amqplib library package. You can implement the same logic as per your framework.

  • Install and enable the RabbitMQ Delayed Message Exchange Plugin for handling delayed message processing.
  • Declare a delayed message exchange in RabbitMQ, ensuring messages are routed with specified delays before delivery to their respective queues.
channel.assertExchange("delay-retry-exchange", "x-delayed-message", {
arguments: {
"x-delayed-type": "direct"
}
}

“x-delayed-message” will declare it as delayed message exchange

  • Create and bind a delay retry queue within RabbitMQ, responsible for managing messages rerouted back to the original exchange after a designated delay period.
channel.assertQueue("delay-retry-queue", { //queue config }),
channel.bindQueue("delay-retry-exchange", "delay-retry-queue")
  • Implement error handling by creating a common consumer error class that extends the error object with an isRetriable flag. This allows identification of errors eligible for retry attempts.
export class ConsumerError extends Error {
isRetriable: boolean;
constructor({
isRetriable,
message,
stack
}: {
isRetriable: boolean;
message: string;
stack?: string;
}) {
super(message);
this.isRetriable = isRetriable;
this.name = "ConsumerError";
this.stack = stack;
}
}
  • Write a centralized error handler wrapper for consumers starting with checking if the message is retriable, if the error is not an instance of Consumer Error, assume it’s retriable by default or you can define your logic as suits.
  • Add the following headers to message metadata and publish it to delayed exchange.
"x-delay": number of milliseconds the message should be delayed
"x-retry-count": limit number of retry attempt
"x-target-exchange": current exhange of message, will be used for reroute
"x-target-routing-key": current routing key of message, will be used for reroute
  • If the message is not retriable or the retry count is exhausted, push it to DLX (dead letter exchange) or nack directly as per your specified configuration.
  • Full implementation of error handler wrapper:
class ConsumerWrapper {
private handler: Function;

constructor(handler: Function) {
this.handler = handler;
}

public async consume(message: ConsumeMessage): Promise<void> {
try {
await this.handler(message);
amqp.ack(message);
} catch (err) {
const error = this.wrapConsumerError(err as Error);
if (error.isRetriable) {
await this.publishMessageToDelayExchange(message);
} else {
amqp.nack(message);
}
}
}

private async publishMessageToDelayExchange(message: ConsumeMessage): Promise<void> {
const {
fields: { exchange, routingKey },
properties: { headers }
} = message;

// Check if retry counts are exhausted
const { "x-retry-count": currentRetryCount } = headers;
if (currentRetryCount && currentRetryCount >= MAX_DELAY_RETRY_COUNT) {
amqp.nack(message);
return;
}

// Increasing the retry count if already present otherwise initialise it with 1
const nextRetry = currentRetryCount ? currentRetryCount + 1 : 1;

/**
* Add your predefined delay time as per retry count, define your own exponential backoff strategy
* You can also add random delay of some seconds to avoid message burst
*/
const nextDelay = retryDelayInMS[nextRetry] + Math.floor(Math.random() * (retryDelayInMS[nextRetry] / 10));

const updatedHeaders = {
...headers, //original headers
"x-delay": nextDelay,
"x-retry-count": nextRetry,
"x-target-exchange": exchange,
"x-target-routing-key": routingKey
};

// publish message to delay exchange
await publishMessageToExchange({
exchange: "delay-retry-exchange"
message: message.content.toString(),
options: {
headers: updatedHeaders
}
});

amqp.ack(message);

private wrapConsumerError(error: Error): ConsumerError {
if (error instanceof ConsumerError) {
return error;
}

/**
* When consumer error is not thrown by ConsumerError class, we need to check if it is retriable or not
*/
return new ConsumerError({
isRetriable: !is4xxError(error),
message: error.message,
stack: error.stack
});
}
}
  • Implement a dedicated consumer function for the delay retry queue, ensuring timely processing of messages rerouted from this queue back to their source exchange following the specified delay period.
class DelayRetryConsumer {
public async consume(message: ConsumeMessage): Promise<void> {
const headers = message.properties.headers;
await this.publishMessageToExchange({
exchange: headers["x-target-exchange"],
routingKey: headers["x-target-routing-key"],
message: message.content.toString(),
options: {
headers: headers
}
});
amqp.ack(message);
}
}

Flow diagram:

Conclusion

Moving to an event-driven architecture with RabbitMQ has improved our system’s agility and real-time capabilities. However, managing message failures posed challenges. Implementing RabbitMQ’s Delayed Message Plugin provided a robust solution. It streamlined retry logic, enhanced reliability, and ensured smooth communication between microservices. This approach not only simplifies error handling but also optimizes system performance, making our architecture more resilient and efficient.

In the future, we plan to persist failed messages alongside the dead letter queue for further debugging and analysis. This will enhance our ability to diagnose issues.

Thanks for reading :)

--

--

Harshit Khandelwal
SAFE Engineering
0 Followers
Writer for

Software Development Engineer 2 @ Safe Security