Schedule Messages in RabbitMQ
There are some scenarios in the actual business that need to send messages with some delay or at a particular time, for example:
- There is a smart water heater in the house that needs to be activated after 30 minutes.
- Unpaid order closed after 15 minutes.
- Sending SMS, mail, push notification for a sale going live at 2:00 PM.
But since AMQP protocol doesn’t have a native delayed queue feature, If you search for “how to delay/schedule messages in RabbitMQ”, you’ll most likely run into two possible solutions for it.
- One solution is to make use of the combination of message TTL function and the dead-lettering function to emulate this.
- The second solution is to use the official RabbitMQ delayed messages exchange plugin.
Both solutions are valid, but the second solution is relatively simple when compared to solutions based on Dead Letter Exchanges and message TTL.
The RabbitMQ delayed messages plugins add a new exchange type to RabbitMQ which will store messages internally, using Mnesia, until they are scheduled for delivery. This protects in case the server goes down.
So let’s begin with implementing the second solution by installing the plugin first but before that have a look at its prerequisite:
- RabbitMQ 3.5.8 and later versions.
- Erlang/OTP 18.0 and later versions
Installing the Plugin
To install the plugin go to the Community Plugins page, search for “rabbitmq_delayed_message_exchange” and download the corresponding .ez files(which are zip files with metadata) for your RabbitMQ installation. Copy the plugin into RabbitMQ’s plugin folder.
The plugins directory location is determined by the RABBITMQ_PLUGINS_DIR environment variable. Its default location depends on how RabbitMQ was installed. Some common values are:
Once the files were copied to the correct directory, then enable it by running the following command:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Declaring Exchange
To delay messages, declare an exchange with the type x-delayed-message
channel.assertExchange(exchange, "x-delayed-message", {
autoDelete: false,
durable: true,
passive: true,
arguments: {
'x-delayed-type': "direct"
}
})
When we declared the exchange above, we provided an x-delayed-type argument set to “direct” so our exchange will behave like the direct exchange, but we could pass the topic, fanout, or a custom exchange type provided by some other plugin.
Publishing Messages
To delay a message, publish messages with the custom header x-delay expressing in milliseconds a delay time for the message. The message will be delivered to the respective queues after x-delay milliseconds, till then it will be stored in the Mnesia table.
If the x-delay header is not present, then the plugin will proceed to route the message without delay.
channel.publish(exchange, queue, new Buffer.from(params), {
headers: {
"x-delay": delayInMilliSeconds
}
})
Checking if a Message was Delayed
To check if the message was delayed or not, you can check the x-delay header of the message received, it will be equal to -(minus) delay.
For example, if you have published a message with a 10000 milliseconds delay, the consumer receiving said message will find the x-delay header set to -10000