วิธีการ ใช้ SQS ด้วย NestJS

Tan T
Tri Petch Digital
Published in
6 min readSep 20, 2024

NestJS และ AWS SQS คืออะไร

Nest.js เป็น framework สำหรับทำ server-side services ซึ่ง Nest.js จะมีแนวทางในการแบ่งโครงสร้างของ code ค่อนข้างชัดเจนและต่างจาก framework อื่นๆ (opinioned และมี architecture เป็นของตัวเอง) ทั้งหมดนี้ก็เพื่อให้ code base อ่านเข้าใจง่าย มีมาตรฐานที่ทุกคนเข้าใจตรงกัน แยกหน้าที่เป็นสัดส่วน และต่อเติมได้ง่ายเมื่อ code base เริ่มใหญ่และซับซ้อนขึ้นเรื่อยๆ

อ่านเพิ่มเติม: https://docs.nestjs.com/

AWS SQS เป็นบริการของ AWS ในการรับและส่ง message ระหว่างแต่ละ service โดยการรับส่งจะมีลักษณะเป็น queue เพื่อการันตีลำดับของ message (First in, first out) และ message จะคงอยู่ใน queue จนกว่าจะถูกสั่งลบโดย consumer เพื่อป้องกันการตกหล่นระหว่างที่ message นั้นๆ กำลังถูกใช้งาน

อ่านเพิ่มเติม: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html#sqs-difference-from-amazon-mq-sns

เรามีโจทย์อะไร ทำไมถึงเลือกใช้ Queue ในการสื่อสารระหว่าง Service

มาดูข้อดีของการสื่อการด้วย queue กันก่อน

  • ลดการผูกติดกัน (Decoupling): Service ต่างๆ ไม่จำเป็นต้องรู้รายละเอียดการทำงานของกันและกัน เพียงแค่ส่งงานเข้าคิวแล้วทำหน้าที่ของตัวเองต่อไป ทำให้ระบบยืดหยุ่นและง่ายต่อการปรับเปลี่ยน
  • เพิ่มประสิทธิภาพ: เมื่อมีงานเข้ามาจำนวนมาก Queue จะช่วยจัดการลำดับการทำงานได้อย่างมีประสิทธิภาพ ป้องกันไม่ให้ Service หนึ่งต้องรอ Service อื่นเสมอไป
  • รองรับการทำงานแบบ Asynchronous: Service สามารถส่งงานเข้าคิวแล้วดำเนินการต่อได้ทันที ไม่จำเป็นต้องรอผลลัพธ์ ทำให้ระบบทำงานได้เร็วขึ้น
  • จัดการกับ Load ได้ดี: ในช่วงที่มีงานเข้ามาจำนวนมาก Queue จะช่วยกระจายภาระงานไปยัง Service ต่างๆ ได้อย่างสม่ำเสมอ ป้องกันไม่ให้ Service ใด Service หนึ่งทำงานหนักเกินไป
  • เพิ่มความน่าเชื่อถือ: หาก Service ใด Service หนึ่งเกิดขัดข้อง งานที่อยู่ในคิวจะไม่สูญหายไป แต่จะรอจนกว่า Service นั้นจะกลับมาทำงานได้อีกครั้ง

โดยในตัวอย่างด้านล่างนี้ จะเป็นการให้เห็นภาพแบบง่ายๆ โดยเราอาจมี service อยู่ตัวหนึ่ง คอยจัดการการรับ order จากลูกค้า และหลังจากรับ order จากลูกค้า เรามีงานที่ต้องการทำต่อคือการ ส่งอีเมลล์, ส่ง sms และ การสร้างใบเสร็จ โดยที่งานเหล่านี้ ไม่จำเป็นต้องรอกัน (Asynchronous) ใครจะทำเสร็จเมื่อไหร่ก็ได้

หลักการทำงานโดยสื่อสารผ่าน queue

งานที่ไม่เหมาะจะเป็น queue

สิ่งที่มาพร้อมกับ queue คือการทำงานแบบ asynchronous ฉะนั้น รูปแบบการทำงานที่ต้องมีการรอกัน เช่น Service A ต้องรอคำตอบจาก Service B เพื่อนำไปคำนวนต่อ งานในรูปแบบนี้จะไม่เหมาะที่จะนำ queue เข้ามาใส่เพราะเราไม่สามารถรู้ได้เลย ว่า งานใน queue ของเราจะเสร็จเมื่อไหร่ อาจจะหลายนาทีจนถึงเป็นชั่วโมงก็ได้

งานแบบ synchronousใช้ queue ไม่ได้

นอกจากนี้ ตัวของ AWS SQS เองยังไม่ได้การันตีลำดับของการทำงานเป็นค่าเริ่มต้น หากผู้ใช้ต้องการงานที่มีลำดับแน่นอน จะต้องระมัดระวังในจุดนี้ด้วย

ต่อไป จะเป็นขั้นตอนการติดตั้ง และตัวอย่าง code สำหรับการใช้ NestJS และ AWS SQS

วิธีสร้าง queue บน AWS SQS

ก่อนที่จะเริ่มโค้ด เราจะสร้าง Queue กันก่อนสำหรับ โดยไปที่ AWS SQS และกด Create Queue

จากนั้นให้เก็บค่านี้เอาไว้ เราจะนำไปใช้ต่อในตอนตั้งค่า

ติดตั้ง library สำหรับ AWS SQS บน NestJS

หากยังไม่ได้ติดตั้ง nestjs แนะนำให้ทุกคนทำตามขั้นตอนนี้ก่อน https://docs.nestjs.com/first-steps

หลังจากที่ลง nestjs เรียบร้อยแล้ว เราจะใช้ library nestjs-sqs และ @aws-sdk/client-sqs สำหรับการ เขียน (Produce) และอ่าน (Consume) ค่า จาก queue โดยในส่วนนี้ เราจะใช้ library nestjs-sqs เพราะตัว library เองมีการเขียนให้นำมาใช้งานกับ NestJS โดยง่ายอยู่แล้ว https://github.com/ssut/nestjs-sqs

npm i --save @ssut/nestjs-sqs @aws-sdk/client-sqs

และนอกจากนี้ เราจะมีการบันทึกค่าลง database ในครั้งนี้ เราจะใช้ postgresql เป็นตัวอย่าง จึงต้องลง library ที่เกี่ยวข้องด้วย

npm i --save @nestjs/typeorm typeorm pg

เริ่มต้นโค้ดบน NestJS

สิ่งที่เราจะทำในบทความ นี้คือ เราจะให้มี API สำหรับการ แสดง posts ทั้งหมด รวมถึงการสร้าง post ด้วย แต่แทนที่เราจะสร้าง post ทันทีแบบ synchronous ในบทความนี้ เราจะสร้าง queue ไปที่ AWS SQS แล้วอ่าน queue message แล้วนำมาใช้สร้าง post ต่อไป

ซึ่งต้องขอบอกไว้ก่อนว่า การสร้าง post แบบ asynchronous จะถูกใช้ได้ดีกับระบบที่เป็น microservice โดยจะแยก service ที่ทำการจัดการ post ออกไปจาก service หลัก แต่ในที่นี้เราขอใช้ตัวอย่างนี้ไว้บน service เดียวกัน เพื่อให้ง่ายต่อความเข้าใจและความกระชับของตัวอย่าง

ก่อนอื่น เราจะมี database หน้าตาแบบนี้

CREATE TABLE public.posts (
id BIGSERIAL PRIMARY KEY,
title varchar NOT NULL,
body text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);

และเราจะสร้าง module, controller และ entity สำหรับการ post

nest generate module posts
nest generate controller posts/posts
nest generate service posts/posts
nest generate class /posts/entities/post.entity --flat
nest generate class /posts/consumers/posts.consumer --flat

ทำารตั้งค่า TypeORM บน app.module.ts

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { PostsModule } from './posts/posts.module';
import { TypeOrmModule } from '@nestjs/typeorm';
import { join } from 'path';

@Module({
imports: [
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'username',
password: 'password',
database: 'database',
entities: [join(__dirname, 'src/**/*.entity.{js,ts}')],
synchronize: false,
autoLoadEntities: true,
}),
PostsModule,
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}

ในไฟล์ posts.module.ts ให้ import SqsModule และ TypeOrmModule เพิ่มเติม รวมทั้งใส่ข้อมูลของ AWS SQS จากหัวข้อก่อนหน้านี้

โดย consumers จะเป็นตัวกำหนดให้โค้ดของเราพยายาม consume AWS SQS และ producers เป็นตัวกำหนดว่าเราจาก produces queue message ไปที่ไหนบ้าง

import { Module } from '@nestjs/common';
import { PostsController } from './posts/posts.controller';
import { PostsService } from './posts/posts.service';
import { PostEntity } from './entities/post.entity';
import { TypeOrmModule } from '@nestjs/typeorm';
import { SqsModule } from '@ssut/nestjs-sqs';

@Module({
imports: [
TypeOrmModule.forFeature([PostEntity]),
SqsModule.register({
consumers: [
{
name: 'create-post-queue',
queueUrl: 'http://queue-url-from-aws', // SQS queue URL
region: 'ap-southeast-1', // AWS region where your queue is located
},
],
producers: [
{
name: 'create-post-queue',
queueUrl: 'http://queue-url-from-aws', // SQS queue URL
region: 'ap-southeast-1', // AWS region where your queue is located
}
],
}),
],
controllers: [PostsController],
providers: [PostsService, SqsModule],
})
export class PostsModule {}

PostEntity

เพิ่มโค้ดเข้าไปดังนี้ โดยในส่วนนี้ จะเป็นการอธิบายให้ TypeORM ทราบว่า column ต่างๆ ของเราเป็นอย่างไรบ้าง

import {
Column,
CreateDateColumn,
Entity,
PrimaryGeneratedColumn,
} from 'typeorm';

@Entity({ name: 'posts' })
export class PostEntity {
@PrimaryGeneratedColumn('increment')
id: number;

@Column()
title: string;

@Column()
body: string;

@CreateDateColumn({ name: 'created_at' })
createdAt: Date;
}

PostsService

Service สำหรับเก็บ business logic. ในที่นี้เรามี 3 ฟังชั่น getAllPosts() สำหรับ แสดง post ทั้งหมด และ create() สำหรับสร้าง post และ createPostBySqs()สำหรับส่ง post ไปที่ AWS SQS

import { Injectable } from '@nestjs/common';
import { PostEntity } from '../entities/post.entity';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';

@Injectable()
export class PostsService {
constructor(
@InjectRepository(PostEntity)
private readonly postRepository: Repository<PostEntity>,
) {}

async getAllPosts(): Promise<PostEntity[]> {
return await this.postRepository.find({ order: { createdAt: 'DESC' } });
}

async create(post: PostEntity): Promise<PostEntity> {
return await this.postRepository.save(post);
}

// ถ้าเขียนแบบ microservice ฟังชั่นนี้จะอยู่บน service อื่นๆ ที่ไม่ได้จัดการ post
async createPostBySqs(post: PostEntity): Promise<void> {
// ส่งค่าไปที่ AWS SQS แทนการ save ลง database
await this.sqsService.send('create-post-queue', {
id: '1',
body: { title: post.title, body: post.body },
});
return;
}
}

PostsController

เราจะเตรียม method สำหรับการ create และ แสดงผล post ทั้งหมด โดยการสร้าง post นี้ เราจะส่งไปที่ AWS SQS

import { Controller, Get } from '@nestjs/common';
import { PostsService } from './posts.service';

@Controller('posts')
export class PostsController {
constructor(
private readonly postsService: PostsService,
) {}

@Get()
async getAllPosts(): Promise<string> {
const posts = await this.postsService.getAllPosts();
return posts
.map((post) => { // หรือ map บน view แทนตามหลัก MVC
return `<p><h3>Title: ${post.title}</h3>Body: ${post.body}</p>`;
})
.join('<hr>');
}


@Post()
async create(): Promise<string> {
this.postsService.createPostBySqs(/** post data from body */);
return 'Sent message to queue';
}

}

อ่านค่าจาก AWS SQS มาเพื่อสร้าง post

หลังจากที่เราสามารถส่ง queue message ไปบน AWS SQS ได้แล้ว ขั้นตอนต่อไป เราจะอ่านค่าจาก SQS แล้วนำมาสร้างเป็น post ใหม่

posts.consumer.ts

การทำงานคือ เราจะเรียก decorator SqsMessageHandler และ SqsConsumerEventHandler เพื่อให้ตัว NestJS รู้ว่า เมื่อมี queue item เข้ามาจาก queue ไป ให้ส่งต่อไปที่ฟังชั่นอะไร

ในที่นี้คือ ถ้า queue ที่มาจาก 'create-post-queue' เข้ามา เราจะเข้ามา process ที่ฟังชั่นนี้ และเราจะ return true เมื่อทำงานสำเร็จ เพื่อบอกให้ sqs รับทราบว่าเราทำงานเสร็จแล้ว หากไม่มีส่วนนี้ queue message เดิมจะสามารถวนกลับเข้ามาซ้ำได้

import { Message } from '@aws-sdk/client-sqs';
import { Injectable } from '@nestjs/common';
import { SqsConsumerEventHandler, SqsMessageHandler } from '@ssut/nestjs-sqs';
import { PostsService } from '../posts/posts.service';
@Injectable()
export class PostsConsumer {
constructor(private readonly postsService: PostsService) {}

@SqsMessageHandler('create-post-queue', false)
public async handleMessage(message: Message) {
const body = JSON.parse(message.Body);
this.postsService.createPost(body);
return true; // บอก AWS SQS ว่าเราทำงานสำเร็จ
}

@SqsConsumerEventHandler('create-post-queue', 'processing_error')
public onProcessingError(error: Error, message: Message) {
// handling processing error
}
}

ทดลองใช้งาน

เราจะเริ่มโดยการ POST ไปที่ url ของ NestJS จากนั้นตัวของ message จะถูกส่งไปที่ AWS SQS แล้วถูกอ่านกลับมาผ่าน consumer

ทำการ POST ผ่าน Postman

ซึ่งในจังหวะนี้ consumer ของเราอ่านค่าจาก SQS แล้วนำมาสร้างโพสลง database

// posts.consumer.ts
@SqsMessageHandler('create-post-queue', false)
public async handleMessage(message: Message) {
const body = JSON.parse(message.Body);
this.postsService.createPost(body);
return true; // บอก AWS SQS ว่าเราทำงานสำเร็จ
}

จากนั้น เมื่อเราเข้า URL ด้านล่าง เพื่อเรียก method getAllPosts ตัว NestJS ก็จะ render post ทั้งหมดออกมา

โพสที่ถูกสร้างทั้งหมดผ่าน Queue

บทส่งท้าย

บทความนี้ ได้กล่าวถึงวิธีการเบื้องต้น ในการรับส่งข้อมูลผ่าน SQS โดยการใช้ NestJS

แต่ทว่าบนระบบจริงนั้น เพื่อที่จะทำการสร้าง microservice architecture ขึ้นมาให้ใช้งานได้จริง เราอาจจะต้องคำนึงสิ่งอื่นๆ เพิ่มเติมด้วย เช่นการส่งข้อมูลผ่าน Simple Notification Service (SNS) สำหรับการ ส่งข้อมูลไปทีละหลายๆ queue รวมทั้งการ handle เวลาที่ queue ทำงานผิดพลาด หรือทำงานช้าเกินไป

แต่เพื่อไม่ให้บทความนี้ยาวเกินไป จึงต้องขอไม่กล่าวถึงส่วนเหล่านั้นโดยละเอียดในที่นี้ ถึงเช่นนั้น ผู้เขียนก็อยากให้ผู้อ่านทำการศึกษาเรื่องราวเหล่านั้น ควบคู่ไปกับบทความนี้ด้วย

--

--