A small practical guide to Message queues using BullMQ and Node.js

Akshay Jain
8 min readApr 12, 2023

--

Introduction of the problem statement

So consider you have two microservices Microservice A and Microservice B. Very often you come across use cases where service A needs to call service B. In a real life example it can be your User service calls the Email service to send a welcome email when the user signs up.

Simple Architecture in Ideal Case

So what happens when Email Service fails or is irresponsive for a while? If your error handling is fine, the client will still be able to signup without the API breaking from the client’s end, but will not receive the email, since the Email service was down at the time of the API Call.

Trust me such problems of irresponsive services are NOT as rare as you think.

Failed Scenario

What could be the cause of the Email Service failing? Well many, maybe the bill of the email service provider such as AWS SES isn’t paid. Maybe a new commit in code broke the service or a dev-ops issue.

However, your business still requires you to send an email when a user registers at all times (maybe that email contains some important data), and that non-negotiable.

My point is in distributed systems often the failure of some services (or as some call it node etc.) is not an alien event but often the norm for reasons that may or may not be in our control. In many cases, you’d want the system to be consistent in some form.

So how would you save your job? Well, there are many approaches to solving this problem let’s look at one called Message Queues.

What are Message Queues then?

By a simplified definition

Message queues are a type of asynchronous communication protocol used in computer systems, where one application can send a message to another application, which is then stored in a queue until the recipient application retrieves and processes it.

If you have a hard time understanding from definition, let’s figure it out from an example.

What would the code before message queues would look like?

(Don’t worry about copying code, I have given a GitHub repo at the end)

Initial Code for Microservice A (User Service)
Initial Bare bone code for Microservice B (Email Service)

In case the request to POST : http://microserviceB/send-email fails and throws an error, even if you handled the error, the request will be lost.

So what can we do here? Well, the idea is to push the request into an external data structure, a Queue in our case.

The service making the request will enqueue the data in the queue and the service receives the request dequeues data from queue whenever it is available and performs the task based on data in the queue.

Read the above paragraph again, the idea is to keep data in the queue and consume it only if the required service (Email service) in our case is available.

The architecture looks as follows

The Architecture with queue, the producer being the user service and the consumer being the email service.

Hmm.. so this makes sense right? You push the message in queue (message producing service can be called as a producer), it will stay the queue unless any service is available that requires data from queue. (subscriber in our case).

As you can see message queues allow messages to be sent and received without requiring both services being active and available at the same time, thus they are called asynchronous communication protocol. (Go back to definition, in case you didn’t get it the first time. It’ll make much more sense now)

Implementation of Message Queue in node.js using BullMQ

There are many implementations of message queues available such as RabbitMQ, Amazon SQS, BullMQ, etc. For our example, we’ll be using BullMQ.

As per documentation, BullMQ is a Node.js library that implements a fast and robust queue system built on top of Redis.

Why do such systems have their implementations built on an in-memory DB such as Redis? A simple answer is fast access. I would not want to the overhead when the consumer fetches the message from a queue to be huge as it’ll make the process extremely inefficient.

This is how you would declare the queue with BullMQ

Declaring Queue

In BullMQ the messages are referred to as jobs, you can add a job on the queue using the .add() method to add the message. You could also pass additional parameters, which I’ll discuss in some good practices.

So using what we have learned so far, the code for User Service, is based on the architecture diagram of the queue provided. Looks as follows:

The Edited code for User Service, I now enqueue the data in queue

Once the data is enqueued, they will stay in Queue as till the Job not processed by the Consumer. A Worker is responsible for processing the job in the queue. We’ll be setting up a worker in Email Service as follows.

Modifying Email Service with a worker, if there is no use case of HTTP Request anymore, you can also remove it.

Once a job is processed in BullMQ it moves into various states such as completed or error , you can refer to more about the states here.

Visualization and Monitoring

Well, that’s a biggie, we have successfully implemented a basic queue. However what happened is still inside a black box. What was the message content? How many items are currently in the queue? What is the state of each of them? How much memory my queue takes up?

All these questions are very important when building real apps that scale and it’s extremely critical for you have some form of monitoring over the infrastructure.

There are many tools available that do this job for you. My choice for the basic projects is BullArena. It’s lightweight, intuitive, and really easy to set up.

To set up Bull Arena on myMessageQueue

Bull Arena Set Up in a different service

The following code sets up arena at /admin/queues route. Opening this route on browser, you’ll be able to see the basic arena UI.

Bull Arena Screen. Source: https://github.com/bee-queue/arena/blob/HEAD/screenshots/screen1.png

In the following screen, you’ll be able to see various jobs and the state which they are in.

In a typical queue lifecycle a job is first in Waiting state, i.e. it has not been processed by a worker. After it is processed by a worker it moves to Completed state or any other state such as Failed after processing.

In our case this is what happens when the email job is successfully executed.

As you can see Send Email Job is in the completed state.

Inside the repo provided, feel free to play around with the queue and arena.

A Limitation of the Message Queue to Note

As you may have guessed, many producers and consumers can simultaneously act on a queue. However, the message is consumed once only by whichever worker that gets it. This won’t help us where we want to broadcast a message to multiple services.

Thus in many places, this messaging pattern is also called a point-to-point communication pattern.

To process multiple jobs we need to implement a pub/sub model where multiple consumers can subscribe to a single queue. This can be achieved with BullMQ with groups and worker concurrency, or tools such as Apache Kafka.

Few basic good practices

Alright, this is pretty much everything you need to know when starting out with the queue and BullMQ. Over the course of engineering you’ll be encountering various use cases where you may or may not use a queue. Keep the following things in mind.

Note that some of these practices arrive with high load, to save cost not all of them need to be handled right away.

  1. Use a separate Redis cluster or DB for queue in various environment. This helps in case of accidental data loss, also if due to high enqueue your Redis is compromised, it might save other services depending on it such as caching.
  2. Never pass any unencrypted sensitive data (such as API Auth tokens) in queue, the queue data is often stored in Redis which makes it access able to whoever can view the queue, hence a security risk.
  3. Once the Job is completed you can remove it from Redis, so that completed jobs that are not in use anymore do not hog up the memory. This can be achieved by setting, {removeOnComplete: true} while declaring the queue, a similar can be achieved for failed jobs.
  4. Have retries for failed jobs, if required by the business logic. This saves you from the possibility of a job failing due to reasons beyond the the developer's control to some extent.
  5. Set a reasonable timeout for your jobs, When defining your BullMQ jobs, set a reasonable timeout value based on the expected duration of the task. This can help prevent long-running jobs from clogging up your queue.
  6. Load test, this problem isn’t required to be handled right away. Typically you want to have a comfortable enqueue and high dequeue rate so that, your queue doesn’t run out of space.

Final Thoughts

Aggghhh, so this was a long blog! Hope you understood why message queues are important and can start implementing them. If you are at it, you can build a reminder application with BullMq (Hint use delayed jobs? ) and explore more functionality provided by queues, you’ll certainly be amazed.

As promised here is the GitHub Repo you can also connect with me on my LinkedIn and Twitter if you have new ideas and doubts.

Happy building! :)

--

--

Akshay Jain

Building, @gocobaltio. Goes by i-rebel-aj on most social platforms