Integrating NestJS with Amazon SQS: A Comprehensive Guide

Ahmad Alhourani
6 min readOct 12, 2023

--

Introduction

In today’s fast-paced world of web applications, efficient job processing is paramount. Whether you’re sending emails, processing data, or performing background tasks, a robust job queue system can streamline your application’s performance. In this guide, we’ll explore the process of integrating NestJS, a powerful Node.js framework, with Amazon Simple Queue Service (SQS), a fully managed message queuing service provided by Amazon Web Services (AWS).

Why NestJS and Amazon SQS?

NestJS, a progressive Node.js framework, has gained immense popularity for building efficient and scalable server-side applications. On the other hand, Amazon SQS provides a reliable and scalable solution for decoupling the components of a cloud application. By combining these technologies, we can harness the benefits of both to create a seamless job processing system.

Steps to Integration

Step 1: Setting Up the Development Environment

Before we dive into the code, let’s ensure our development environment is ready. You’ll need Node.js installed, as well as NestJS, AWS SDK, and the NestJS-SQS library. Here’s how to install the necessary dependencies:

# Install NestJS and Dependencies
npm install --save @nestjs/core @nestjs/common aws-sdk @ssut/nestjs-sqs

Step 2: ProducerModule — Sending Jobs to SQS

In NestJS, the ProducerModule plays a crucial role in sending jobs to SQS. It configures the AWS SQS service and provides ProducerService to interact with it. Let's take a look at how to set up the ProducerModule:

import { Module } from '@nestjs/common';
import { AWS } from 'aws-sdk';
import { ProducerService } from './producer.service';
import { TypeOrmModule } from '@nestjs/typeorm';
import { QueueJob } from './entities/queue-job.entity';
import { QueueJobService } from './queue-job.service';

@Module({
imports: [TypeOrmModule.forFeature([QueueJob])],
providers: [
{
provide: AWS.SQS, // Provide the SQS service from the AWS SDK
useFactory: (configService: ConfigService) => {
const region = configService.get<string>('sqs.region');
const accessKeyId = configService.get<string>('sqs.accessKeyId');
const secretAccessKey = configService.get<string>(
'sqs.secretAccessKey',
);
const sqsConfig = {
region,
accessKeyId,
secretAccessKey,
};
return new AWS.SQS(sqsConfig);
},
inject: [ConfigService],
},
ProducerService,
QueueJobService,
],
exports: [ AWS.SQS,
ProducerService,
QueueJobService,
TypeOrmModule.forFeature([QueueJob]),],
})
export class ProducerModule {}

Step 3: QueueJob Entity — Tracking Job Status

To ensure robust tracking of job status in a database, we create the QueueJob entity. This entity helps us monitor the progress of each job. Here's a simplified example of the QueueJob entity:

@Entity()
export class QueueJob extends BaseEntity {
@Column({ unique: true })
message_id: string;


@Column('json')
message: any;

@Column('json')
entity: any;

@Column()
queue: string;


@Column()
job_type: string;

@Column({default:0})
@Index()
status: number;

@Column({default:0})
counter: number;

@Column('json', { nullable: true })
error: any;

@Column({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' })
created_at: Date;


@Column({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' })
updated_at: Date;


// Add other relevant fields as needed
}

Step 4: ProducerService — Sending Jobs to SQS

The ProducerService is responsible for sending jobs to AWS SQS. It prepares the job details, and message attributes, and sends them to the queue. Let's explore the send method within the ProducerService:

export interface SQSMessage {
QueueUrl: string;
MessageBody: string;
MessageGroupId?: string;
MessageDeduplicationId?: string;
DelaySeconds?: number;

}

export interface Job {
DataType: string;
value: string;
}
export interface MessageAttributes {
job: Job;
}

export interface MessageBody {
messageId:string;
message: any;
date: string;
MessageAttributes: MessageAttributes;
}

@Injectable()
export class ProducerService {
constructor(
private readonly configService: ConfigService,
private readonly queueJobService: QueueJobService,
) {}

send(message: any, jobType: string, messageGroupId: string = 'general') {
if (!JOB_TYPES.includes(jobType)) {
throw new BadRequestException('Invalid job type');
}
const region = this.configService.get<string>('sqs.region');
const accessKeyId = this.configService.get<string>('sqs.accessKeyId');
const secretAccessKey = this.configService.get<string>(
'sqs.secretAccessKey',
);
const isFifo: boolean = JSON.parse(this.configService.get('sqs.isFifo'));

const messageId = uuidv4();
let sqsMessage: SQSMessage = {
QueueUrl: this.configService.get<string>('sqs.url'),
MessageBody: JSON.stringify({
messageId,
message,
MessageAttributes: {
job: {
DataType: 'string',
value: jobType,
},
},
} as MessageBody),
};
if (isFifo == true) {
sqsMessage = {
...sqsMessage,
MessageGroupId: messageGroupId,
MessageDeduplicationId: messageId,
};
}
const sqs = new AWS.SQS({
region,
accessKeyId,
secretAccessKey,
});
console.log('sqsMessage:', sqsMessage);

const input: Partial<QueueJob> = {
message_id: messageId,
message: sqsMessage,
entity: message,
job_type: jobType,
queue: this.configService.get<string>('sqs.queue_name'),
};
return defer(() => this.queueJobService.save(input)).pipe(
switchMap(() => {
return from(sqs.sendMessage(sqsMessage).promise()).pipe(
tap(() => {
return true;
}),
catchError((error) => {
throw new InternalServerErrorException(error);
}),
);
}),
);
}
}

Step 5: ConsumerModule — Handling Jobs from SQS

In NestJS, the ConsumerModule is designed to handle incoming jobs from the SQS queue. It configures the AWS SQS consumers and defines how to process messages. Here's how to configure the ConsumerModule:

@Module({
imports: [
ProducerModule,
SqsModule.registerAsync({
imports: [ConfigModule], // Import the ConfigModule to use the ConfigService
useFactory: async (configService: ConfigService) => {
const accessKeyId = configService.get<string>('sqs.accessKeyId');
const secretAccessKey = configService.get<string>(
'sqs.secretAccessKey',
);

// Retrieve the required configuration values using ConfigService
return {
consumers: [
{
name: configService.get<string>('sqs.queue_name'), // name of the queue
queueUrl: configService.get<string>('sqs.url'), // url of the queue
region: configService.get<string>('sqs.region'), // using the same region for the producer
batchSize: 10, // number of messages to receive at once
// visibilityTimeout:10,
// waitTimeSeconds:300,
terminateGracefully: true, // gracefully shutdown when SIGINT/SIGTERM is received
sqs: new SQSClient({
region: configService.get<string>('sqs.region'),
credentials: {
accessKeyId: accessKeyId,
secretAccessKey: secretAccessKey,
},
}),
},
],
producers: [],
};
},
inject: [ConfigService],
}),
],
controllers: [],
providers: [
],
exports: [ConsumerService, JobHandlerFactory],
})
export class ConsumerModule {}

Step 6: ConsumerService — Processing Job Messages

The ConsumerService is a critical part of the integration. It processes job messages, checks for different job types, and delegates tasks accordingly. The handleMessage method in the ConsumerService is where the magic happens:

@Injectable()
export class ConsumerService {
constructor(
private readonly queueJobService: QueueJobService,
private readonly jobHandlerFactory: JobHandlerFactory,
) {}

@SqsMessageHandler(/** name: */ config.sqs.queue_name, /** batch: */ false)
async handleMessage(message: Message) {
const msgBody: MessageBody = JSON.parse(message.Body) as MessageBody;
console.log('Consumer Start ....:', msgBody.messageId);

if (!JOB_TYPES.includes(msgBody.MessageAttributes.job.value)) {
Logger.error('Invalid job type ' + msgBody.MessageAttributes.job.value);
throw new InternalServerErrorException(
'Invalid job type ' + msgBody.MessageAttributes.job.value,
);
}

try {
//Todo
// handle the message here

} catch (error) {
console.log('consumer error', JSON.stringify(error));
//keep the message in sqs
Logger.error(error.message);
throw new InternalServerErrorException(error);

}
}

Step 7: Error Handling and Retry Mechanisms

No job processing system is complete without robust error handling and retry mechanisms. In this section, we discuss strategies for handling errors, including timeout and processing errors. We’ll also provide code snippets for your reference.

@SqsConsumerEventHandler(
/** name: */ config.sqs.queue_name,
/** eventName: */ 'timeout_error',
)
async onTimeoutError(error: Error, message: Message) {
const msgBody: MessageBody = JSON.parse(message.Body) as MessageBody;

Logger.error(error.message);
return this.queueJobService
.findOneBy({ message_id: msgBody.messageId })
.pipe(
switchMap((queueJob) => {
if (queueJob) {
return this.queueJobService.update(queueJob.id, {
counter: queueJob.counter + 1,
status: QueueJobStatus.TIMEOUT,
error: JSON.parse(JSON.stringify(error)),
updated_at: () => 'CURRENT_TIMESTAMP',
});
}
return of(true);
}),
)
.subscribe((res) => {
Logger.error('Timeout Error messageId : ' + msgBody.messageId);
});
}

@SqsConsumerEventHandler(
/** name: */ config.sqs.queue_name,
/** eventName: */ 'processing_error',
)
async onProcessingError(error: Error, message: Message) {
const msgBody: MessageBody = JSON.parse(message.Body) as MessageBody;
Logger.error(error.message);
return this.queueJobService
.findOneBy({ message_id: msgBody.messageId })
.pipe(
switchMap((queueJob) => {
if (queueJob) {
return this.queueJobService.update(queueJob.id, {
counter: queueJob.counter + 1,
status: QueueJobStatus.FAILED,
error: JSON.parse(JSON.stringify(error)),
updated_at: () => 'CURRENT_TIMESTAMP',
});
}
return of(true);
}),
)
.subscribe();
}

Step 8: Tracking Job Progress and Cleanup

To ensure the integrity of your job processing system, you need to track job progress and perform periodic cleanup. The QueueJobService is responsible for these tasks. We'll share code snippets and best practices for tracking progress and scheduling cleanup.

@Injectable()
export class QueueJobService extends BaseService<QueueJob> {
private readonly logger = new Logger(QueueJobService.name);
constructor(
@InjectRepository(QueueJob)
protected readonly repository: Repository<QueueJob>,
) {
super(repository);
}

read more about BaseService

Step 9: Testing and Deployment

Before deploying your NestJS-SQS integration to production, thorough testing is crucial. We’ll provide instructions on running tests and building the application for production deployment.

# Run tests
npm test

# Build for production
npm build

Conclusion

Integrating NestJS with Amazon SQS brings the power of a robust job processing system to your applications. With NestJS’s efficiency and AWS SQS’s scalability, you can streamline your job processing workflow. This comprehensive guide has taken you through the necessary steps, from setting up your development environment to handling job messages and tracking job progress.

Feel free to explore the complete code and take your job processing capabilities to the next level. By harnessing these technologies, you can build applications that are both efficient and scalable.

Happy coding!

Related :

Contact Me

--

--

Ahmad Alhourani

https://www.linkedin.com/in/ahmad-alhourani Experienced Software Engineer & Team Lead with 10+ years in RESTful APIs, Microservices,TypeScript, and Blockchain.