Communicating using RabbitMQ in Node.js

Sergei
The Startup
Published in
5 min readDec 14, 2019

What will we be building?

In this article, we will build two simple Node.js apps, that will communicate via RabbitMQ. First app, let’s call it sender, will provide an API to send something to the second app. Second app, let’s call it receiver, will be receiving messages from the sender app and printing them to the console.

Communication

In this article, we will use RabbitMQ to build communication between these apps. RabbitMQ is a message broker written in Erlang and using multiple protocols to communicate with a user (we will use AMQP).

Communication strategy is deadly simple. First app, aka sender, will check if a queue exists, create it if it doesn’t, and send a message to it. Second app, aka receiver, will subscribe for new messages from this queue and print received ones.

Action

I will skip the routing setup part, full code is available here. Actually, all we need to do is write an interface that will allow us to work with RabbitMQ easily. This interface will be a singleton class, that will have methods like subscribe to consume messages from a given queue, send to send messages to the given queue, and a static getInstance method to get singleton instance.

Let’s start with the getInstance method. This method should return a class instance (I called this class MessageBroker) with established RabbitMQ connection and create a channel. To communicate with RabbitMQ using AMQP protocol we will use the amqplib package. The connection method looks like this:

On the line 6 we create a connection and one the line 7 we create a channel to communicate with RabbitMQ.

Next method that we will add is the getInstance method, that will return singleton:

On the line 7 we declared an instance variable, that will contain a singleton instance. On the line 27 we check if instance variable already contains the singleton instance. If so, the method return instance variable, that may hold pending Promise, which will subsequently return us MessageBroker instance. If not, we will initialise created MessageBroker instance on the line 29, and assign returned pending Promise to instance value. This way we can avoid double connection.

Sending messages

Let’s add the send method to the MessageBroker class.

On the line 10, we use assertQueue method, that check a queue existence, and if it doesn’t, assertQueue will create the queue. This operation is idempotent given identical arguments. We set durable option to true, if true, the queue will survive broker restarts. On the line 11, we use sendToQueue method, that sends a single message with the content given as a buffer to the specific queue.

Subscription

To make subscription method the right way, we need to understand how RabbitMQ sending messages to its subscribers.

When an app subscribes for queue’s messages, RabbitMQ treats it as a worker. RabbitMQ will send new messages to the workers using round-robin algorithm.

So, our app should subscribe to a queue only once, but be able to register as many handlers as needed.

Firstly, we created queue class member and defined as an empty object. This member will store queues’ handlers, where a key is a queue name and a value is an array of registered handlers. On the line 23, we check if queues variable holds something. If true, we check if given handler already registered, if so, return unsubscribe function, if it’s not, push the handler to the array of handlers this.queues[queue].push(handler) and return unsubscribe function. If this.queues[queue] is not defined, we check the queue existence(line 32), and define this.queues[queue] with an array, that holds the handler (line 33).

Finally, we subscribe for the queue messages using this.channel.consume method. First argument is the queue’s name. Second is a message handler. To understand what is going on the lines 36–39, we should get to know about Message acknowledgment.

Message acknowledgment

If you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

There aren’t any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It’s fine even if processing a message takes a very, very long time.

Triggering handlers

So, when new message is arrived, we create the function ack, that mark message as acknowledged, using _.once to make the function execute only once. Then, we iterate over the handlers array this.queues[queue].forEach(h => h(msg, ack)), triggering every handler with the message and acknowledge function.

Sending and receiving

Sender controller:

Here, we are getting the MessageBroker instance and calling send method with queue name equal to test and message equal to Buffer with stringified body.

Receiving controller:

Here, we are getting the MessageBroker instance and calling subscribe method with queue name and handler function. The handler will be printing incoming messages’ contents and acknowledge them right after.

Let’s send a message.

Printed in console of receiver app:

Message: {“msg”:”Hello”}

--

--