NestJS — Queue Management

priya s
YavarTechWorks
Published in
4 min readFeb 27, 2023

Hi friends, in this blog we are going to learn about how to implement queues in nestjs using bull package.

Queues

Queues are a powerful design pattern that help you deal with common application scaling and performance challenges. It helps to solve:

  1. Smooth out processing peaks
  2. Breaks up a larger task that blocks the Node.js event loop.
  3. Provide a reliable communication channel across various services.

Nest provides @nestjs/bull library to implement queues. This package is an integration of Bull queues in Nest environment.

Bull uses Redis by default. Because, on using Redis, queue architecture can be completely distributed and platform-independent. For eg: if some queue producers, consumers and listeners run on one node and other queue producers, consumers and listeners run on other nodes.

@nestjs/bull:

It is a dual mechanism, which consists of producers and consumers.

Producers: it pushes some work into the Queue.

Consumers: It receives the data from the queue.

In case of CPU bound operation within an application, if it is handled synchronously there will be problems like requests will take more time when the operations are involved in the CPU. so the user’s thread will be blocked for a while to process the entire operation. Sometimes the response may result in a timeout.

To avoid this, Nestjs provides a Queueing mechanism. Here long time taking operations will be done later without synchronizing with the user request. In such cases the request will be fast and response will be quick.

Installation:

Once the installation is done, the BullModule can be imported into the root AppModule.

The forRoot() method is used to register a bull package configuration object that will be used by all queues registered in the application.

To register a queue, BullModule.registerQueue() is used.

In the above picture message-queue and file-operation are the names of two different queues.

Producers:

Producers add jobs to queues. They are the normal injectable services.

The queue name has to be injected in to producer using the @InjectQueue() decorator. To add a queue into the producer we use this.queue.add().

‘Message-job’ is the job name in the queue, and { name: msg} is the payload.

The above job will be processed after a 10 seconds delay.

There can be many job options like:

  1. Priority:number
  2. Delay:number
  3. Attempts:number
  4. repeat:RepeatOpts
  5. Lifo:boolean
  6. Timeout:number

Consumers:

Consumers are the normal classes which process the jobs added into the queue or listen for events on the queue or it does both.

Consumer class is declared using @Processor() decorator. The @Processor() decorator is used to specify a queue name. The @Process() decorator is used to specify a job name.

Event listeners:

Bull generates a set of useful events when queue and/or job state changes occur.

A local event is one that is produced when an action or state change is triggered on a queue in the local process. When a queue is shared across multiple processes, we encounter the possibility of global events.

Queue Management:

await this.queue.add(); — -> this method is used to add a queue into a producer

await this.queue.pause(); — — → the pause queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized.

await this.queue.resume(); — — -> to resume a paused queue, the resume() method is used.

Async configuration:

Instead of static, the bull options can be passed asynchronously like

The usefactory syntax allows to create providers dynamically. The factory function can accept arguments and inject property accepts an array of providers that Nest will resolve and pass as arguments to the factory function during the installation.

Note: The producers and the consumers should be registered under Providers in AppModule.

Note: The routing for the producers and consumers should be handled by controllers.

Hope You find it usefull. Thank you!!

--

--