Communicating using RabbitMQ in Node.js
What will we be building?
In this article, we will build two simple Node.js apps, that will communicate via RabbitMQ. First app, let’s call it sender, will provide an API to send something to the second app. Second app, let’s call it receiver, will be receiving messages from the sender app and printing them to the console.
In this article, we will use RabbitMQ to build communication between these apps. RabbitMQ is a message broker written in Erlang and using multiple protocols to communicate with a user (we will use AMQP).
Communication strategy is deadly simple. First app, aka sender, will check if a queue exists, create it if it doesn’t, and send a message to it. Second app, aka receiver, will subscribe for new messages from this queue and print received ones.
I will skip the routing setup part, full code is available here. Actually, all we need to do is write an interface that will allow us to work with RabbitMQ easily. This interface will be a singleton class, that will have methods like
subscribe to consume messages from a given queue,
send to send messages to the given queue, and a static
getInstance method to get singleton instance.
Let’s start with the
getInstance method. This method should return a class instance (I called this class MessageBroker) with established RabbitMQ connection and create a channel. To communicate with RabbitMQ using AMQP protocol we will use the amqplib package. The connection method looks like this:
On the line 6 we create a connection and one the line 7 we create a channel to communicate with RabbitMQ.
Next method that we will add is the
getInstance method, that will return singleton:
On the line 7 we declared an instance variable, that will contain a singleton instance. On the line 27 we check if instance variable already contains the singleton instance. If so, the method return instance variable, that may hold pending Promise, which will subsequently return us MessageBroker instance. If not, we will initialise created MessageBroker instance on the line 29, and assign returned pending Promise to instance value. This way we can avoid double connection.
Let’s add the
send method to the MessageBroker class.
On the line 10, we use
assertQueue method, that check a queue existence, and if it doesn’t,
assertQueue will create the queue. This operation is idempotent given identical arguments. We set durable option to true, if true, the queue will survive broker restarts. On the line 11, we use
sendToQueue method, that sends a single message with the
content given as a buffer to the specific
To make subscription method the right way, we need to understand how RabbitMQ sending messages to its subscribers.
When an app subscribes for queue’s messages, RabbitMQ treats it as a worker. RabbitMQ will send new messages to the workers using round-robin algorithm.
So, our app should subscribe to a queue only once, but be able to register as many handlers as needed.
Firstly, we created
queue class member and defined as an empty object. This member will store queues’ handlers, where a key is a queue name and a value is an array of registered handlers. On the line 23, we check if
queues variable holds something. If true, we check if given handler already registered, if so, return unsubscribe function, if it’s not, push the handler to the array of handlers
this.queues[queue].push(handler) and return unsubscribe function. If
this.queues[queue] is not defined, we check the queue existence(line 32), and define
this.queues[queue] with an array, that holds the handler (line 33).
Finally, we subscribe for the queue messages using
this.channel.consume method. First argument is the queue’s name. Second is a message handler. To understand what is going on the lines 36–39, we should get to know about Message acknowledgment.
If you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.
An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.
There aren’t any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It’s fine even if processing a message takes a very, very long time.
So, when new message is arrived, we create the function
ack, that mark message as acknowledged, using
_.once to make the function execute only once. Then, we iterate over the handlers array
this.queues[queue].forEach(h => h(msg, ack)), triggering every handler with the message and acknowledge function.
Sending and receiving
Here, we are getting the MessageBroker instance and calling send method with queue name equal to test and message equal to Buffer with stringified body.
Here, we are getting the MessageBroker instance and calling subscribe method with queue name and handler function. The handler will be printing incoming messages’ contents and acknowledge them right after.
Let’s send a message.
Printed in console of receiver app: