วิธีการ ใช้ SQS ด้วย NestJS
NestJS และ AWS SQS คืออะไร
Nest.js เป็น framework สำหรับทำ server-side services ซึ่ง Nest.js จะมีแนวทางในการแบ่งโครงสร้างของ code ค่อนข้างชัดเจนและต่างจาก framework อื่นๆ (opinioned และมี architecture เป็นของตัวเอง) ทั้งหมดนี้ก็เพื่อให้ code base อ่านเข้าใจง่าย มีมาตรฐานที่ทุกคนเข้าใจตรงกัน แยกหน้าที่เป็นสัดส่วน และต่อเติมได้ง่ายเมื่อ code base เริ่มใหญ่และซับซ้อนขึ้นเรื่อยๆ
อ่านเพิ่มเติม: https://docs.nestjs.com/
AWS SQS เป็นบริการของ AWS ในการรับและส่ง message ระหว่างแต่ละ service โดยการรับส่งจะมีลักษณะเป็น queue เพื่อการันตีลำดับของ message (First in, first out) และ message จะคงอยู่ใน queue จนกว่าจะถูกสั่งลบโดย consumer เพื่อป้องกันการตกหล่นระหว่างที่ message นั้นๆ กำลังถูกใช้งาน
อ่านเพิ่มเติม: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html#sqs-difference-from-amazon-mq-sns
เรามีโจทย์อะไร ทำไมถึงเลือกใช้ Queue ในการสื่อสารระหว่าง Service
มาดูข้อดีของการสื่อการด้วย queue กันก่อน
- ลดการผูกติดกัน (Decoupling): Service ต่างๆ ไม่จำเป็นต้องรู้รายละเอียดการทำงานของกันและกัน เพียงแค่ส่งงานเข้าคิวแล้วทำหน้าที่ของตัวเองต่อไป ทำให้ระบบยืดหยุ่นและง่ายต่อการปรับเปลี่ยน
- เพิ่มประสิทธิภาพ: เมื่อมีงานเข้ามาจำนวนมาก Queue จะช่วยจัดการลำดับการทำงานได้อย่างมีประสิทธิภาพ ป้องกันไม่ให้ Service หนึ่งต้องรอ Service อื่นเสมอไป
- รองรับการทำงานแบบ Asynchronous: Service สามารถส่งงานเข้าคิวแล้วดำเนินการต่อได้ทันที ไม่จำเป็นต้องรอผลลัพธ์ ทำให้ระบบทำงานได้เร็วขึ้น
- จัดการกับ Load ได้ดี: ในช่วงที่มีงานเข้ามาจำนวนมาก Queue จะช่วยกระจายภาระงานไปยัง Service ต่างๆ ได้อย่างสม่ำเสมอ ป้องกันไม่ให้ Service ใด Service หนึ่งทำงานหนักเกินไป
- เพิ่มความน่าเชื่อถือ: หาก Service ใด Service หนึ่งเกิดขัดข้อง งานที่อยู่ในคิวจะไม่สูญหายไป แต่จะรอจนกว่า Service นั้นจะกลับมาทำงานได้อีกครั้ง
โดยในตัวอย่างด้านล่างนี้ จะเป็นการให้เห็นภาพแบบง่ายๆ โดยเราอาจมี service อยู่ตัวหนึ่ง คอยจัดการการรับ order จากลูกค้า และหลังจากรับ order จากลูกค้า เรามีงานที่ต้องการทำต่อคือการ ส่งอีเมลล์, ส่ง sms และ การสร้างใบเสร็จ โดยที่งานเหล่านี้ ไม่จำเป็นต้องรอกัน (Asynchronous) ใครจะทำเสร็จเมื่อไหร่ก็ได้
งานที่ไม่เหมาะจะเป็น queue
สิ่งที่มาพร้อมกับ queue คือการทำงานแบบ asynchronous ฉะนั้น รูปแบบการทำงานที่ต้องมีการรอกัน เช่น Service A ต้องรอคำตอบจาก Service B เพื่อนำไปคำนวนต่อ งานในรูปแบบนี้จะไม่เหมาะที่จะนำ queue เข้ามาใส่เพราะเราไม่สามารถรู้ได้เลย ว่า งานใน queue ของเราจะเสร็จเมื่อไหร่ อาจจะหลายนาทีจนถึงเป็นชั่วโมงก็ได้
นอกจากนี้ ตัวของ AWS SQS เองยังไม่ได้การันตีลำดับของการทำงานเป็นค่าเริ่มต้น หากผู้ใช้ต้องการงานที่มีลำดับแน่นอน จะต้องระมัดระวังในจุดนี้ด้วย
ต่อไป จะเป็นขั้นตอนการติดตั้ง และตัวอย่าง code สำหรับการใช้ NestJS และ AWS SQS
วิธีสร้าง queue บน AWS SQS
ก่อนที่จะเริ่มโค้ด เราจะสร้าง Queue กันก่อนสำหรับ โดยไปที่ AWS SQS และกด Create Queue
จากนั้นให้เก็บค่านี้เอาไว้ เราจะนำไปใช้ต่อในตอนตั้งค่า
ติดตั้ง library สำหรับ AWS SQS บน NestJS
หากยังไม่ได้ติดตั้ง nestjs แนะนำให้ทุกคนทำตามขั้นตอนนี้ก่อน https://docs.nestjs.com/first-steps
หลังจากที่ลง nestjs เรียบร้อยแล้ว เราจะใช้ library nestjs-sqs
และ @aws-sdk/client-sqs
สำหรับการ เขียน (Produce) และอ่าน (Consume) ค่า จาก queue โดยในส่วนนี้ เราจะใช้ library nestjs-sqs เพราะตัว library เองมีการเขียนให้นำมาใช้งานกับ NestJS โดยง่ายอยู่แล้ว https://github.com/ssut/nestjs-sqs
npm i --save @ssut/nestjs-sqs @aws-sdk/client-sqs
และนอกจากนี้ เราจะมีการบันทึกค่าลง database ในครั้งนี้ เราจะใช้ postgresql เป็นตัวอย่าง จึงต้องลง library ที่เกี่ยวข้องด้วย
npm i --save @nestjs/typeorm typeorm pg
เริ่มต้นโค้ดบน NestJS
สิ่งที่เราจะทำในบทความ นี้คือ เราจะให้มี API สำหรับการ แสดง posts ทั้งหมด รวมถึงการสร้าง post ด้วย แต่แทนที่เราจะสร้าง post ทันทีแบบ synchronous ในบทความนี้ เราจะสร้าง queue ไปที่ AWS SQS แล้วอ่าน queue message แล้วนำมาใช้สร้าง post ต่อไป
ซึ่งต้องขอบอกไว้ก่อนว่า การสร้าง post แบบ asynchronous จะถูกใช้ได้ดีกับระบบที่เป็น microservice โดยจะแยก service ที่ทำการจัดการ post ออกไปจาก service หลัก แต่ในที่นี้เราขอใช้ตัวอย่างนี้ไว้บน service เดียวกัน เพื่อให้ง่ายต่อความเข้าใจและความกระชับของตัวอย่าง
ก่อนอื่น เราจะมี database หน้าตาแบบนี้
CREATE TABLE public.posts (
id BIGSERIAL PRIMARY KEY,
title varchar NOT NULL,
body text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
และเราจะสร้าง module, controller และ entity สำหรับการ post
nest generate module posts
nest generate controller posts/posts
nest generate service posts/posts
nest generate class /posts/entities/post.entity --flat
nest generate class /posts/consumers/posts.consumer --flat
ทำารตั้งค่า TypeORM บน app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { PostsModule } from './posts/posts.module';
import { TypeOrmModule } from '@nestjs/typeorm';
import { join } from 'path';
@Module({
imports: [
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'username',
password: 'password',
database: 'database',
entities: [join(__dirname, 'src/**/*.entity.{js,ts}')],
synchronize: false,
autoLoadEntities: true,
}),
PostsModule,
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
ในไฟล์ posts.module.ts
ให้ import SqsModule
และ TypeOrmModule
เพิ่มเติม รวมทั้งใส่ข้อมูลของ AWS SQS จากหัวข้อก่อนหน้านี้
โดย consumers
จะเป็นตัวกำหนดให้โค้ดของเราพยายาม consume AWS SQS และ producers
เป็นตัวกำหนดว่าเราจาก produces queue message ไปที่ไหนบ้าง
import { Module } from '@nestjs/common';
import { PostsController } from './posts/posts.controller';
import { PostsService } from './posts/posts.service';
import { PostEntity } from './entities/post.entity';
import { TypeOrmModule } from '@nestjs/typeorm';
import { SqsModule } from '@ssut/nestjs-sqs';
@Module({
imports: [
TypeOrmModule.forFeature([PostEntity]),
SqsModule.register({
consumers: [
{
name: 'create-post-queue',
queueUrl: 'http://queue-url-from-aws', // SQS queue URL
region: 'ap-southeast-1', // AWS region where your queue is located
},
],
producers: [
{
name: 'create-post-queue',
queueUrl: 'http://queue-url-from-aws', // SQS queue URL
region: 'ap-southeast-1', // AWS region where your queue is located
}
],
}),
],
controllers: [PostsController],
providers: [PostsService, SqsModule],
})
export class PostsModule {}
PostEntity
เพิ่มโค้ดเข้าไปดังนี้ โดยในส่วนนี้ จะเป็นการอธิบายให้ TypeORM ทราบว่า column ต่างๆ ของเราเป็นอย่างไรบ้าง
import {
Column,
CreateDateColumn,
Entity,
PrimaryGeneratedColumn,
} from 'typeorm';
@Entity({ name: 'posts' })
export class PostEntity {
@PrimaryGeneratedColumn('increment')
id: number;
@Column()
title: string;
@Column()
body: string;
@CreateDateColumn({ name: 'created_at' })
createdAt: Date;
}
PostsService
Service สำหรับเก็บ business logic. ในที่นี้เรามี 3 ฟังชั่น getAllPosts()
สำหรับ แสดง post ทั้งหมด และ create()
สำหรับสร้าง post และ createPostBySqs()
สำหรับส่ง post ไปที่ AWS SQS
import { Injectable } from '@nestjs/common';
import { PostEntity } from '../entities/post.entity';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
@Injectable()
export class PostsService {
constructor(
@InjectRepository(PostEntity)
private readonly postRepository: Repository<PostEntity>,
) {}
async getAllPosts(): Promise<PostEntity[]> {
return await this.postRepository.find({ order: { createdAt: 'DESC' } });
}
async create(post: PostEntity): Promise<PostEntity> {
return await this.postRepository.save(post);
}
// ถ้าเขียนแบบ microservice ฟังชั่นนี้จะอยู่บน service อื่นๆ ที่ไม่ได้จัดการ post
async createPostBySqs(post: PostEntity): Promise<void> {
// ส่งค่าไปที่ AWS SQS แทนการ save ลง database
await this.sqsService.send('create-post-queue', {
id: '1',
body: { title: post.title, body: post.body },
});
return;
}
}
PostsController
เราจะเตรียม method สำหรับการ create และ แสดงผล post ทั้งหมด โดยการสร้าง post นี้ เราจะส่งไปที่ AWS SQS
import { Controller, Get } from '@nestjs/common';
import { PostsService } from './posts.service';
@Controller('posts')
export class PostsController {
constructor(
private readonly postsService: PostsService,
) {}
@Get()
async getAllPosts(): Promise<string> {
const posts = await this.postsService.getAllPosts();
return posts
.map((post) => { // หรือ map บน view แทนตามหลัก MVC
return `<p><h3>Title: ${post.title}</h3>Body: ${post.body}</p>`;
})
.join('<hr>');
}
@Post()
async create(): Promise<string> {
this.postsService.createPostBySqs(/** post data from body */);
return 'Sent message to queue';
}
}
อ่านค่าจาก AWS SQS มาเพื่อสร้าง post
หลังจากที่เราสามารถส่ง queue message ไปบน AWS SQS ได้แล้ว ขั้นตอนต่อไป เราจะอ่านค่าจาก SQS แล้วนำมาสร้างเป็น post ใหม่
posts.consumer.ts
การทำงานคือ เราจะเรียก decorator SqsMessageHandler
และ SqsConsumerEventHandler
เพื่อให้ตัว NestJS รู้ว่า เมื่อมี queue item เข้ามาจาก queue ไป ให้ส่งต่อไปที่ฟังชั่นอะไร
ในที่นี้คือ ถ้า queue ที่มาจาก 'create-post-queue'
เข้ามา เราจะเข้ามา process ที่ฟังชั่นนี้ และเราจะ return true เมื่อทำงานสำเร็จ เพื่อบอกให้ sqs รับทราบว่าเราทำงานเสร็จแล้ว หากไม่มีส่วนนี้ queue message เดิมจะสามารถวนกลับเข้ามาซ้ำได้
import { Message } from '@aws-sdk/client-sqs';
import { Injectable } from '@nestjs/common';
import { SqsConsumerEventHandler, SqsMessageHandler } from '@ssut/nestjs-sqs';
import { PostsService } from '../posts/posts.service';
@Injectable()
export class PostsConsumer {
constructor(private readonly postsService: PostsService) {}
@SqsMessageHandler('create-post-queue', false)
public async handleMessage(message: Message) {
const body = JSON.parse(message.Body);
this.postsService.createPost(body);
return true; // บอก AWS SQS ว่าเราทำงานสำเร็จ
}
@SqsConsumerEventHandler('create-post-queue', 'processing_error')
public onProcessingError(error: Error, message: Message) {
// handling processing error
}
}
ทดลองใช้งาน
เราจะเริ่มโดยการ POST ไปที่ url ของ NestJS จากนั้นตัวของ message จะถูกส่งไปที่ AWS SQS แล้วถูกอ่านกลับมาผ่าน consumer
ทำการ POST ผ่าน Postman
ซึ่งในจังหวะนี้ consumer
ของเราอ่านค่าจาก SQS แล้วนำมาสร้างโพสลง database
// posts.consumer.ts
@SqsMessageHandler('create-post-queue', false)
public async handleMessage(message: Message) {
const body = JSON.parse(message.Body);
this.postsService.createPost(body);
return true; // บอก AWS SQS ว่าเราทำงานสำเร็จ
}
จากนั้น เมื่อเราเข้า URL ด้านล่าง เพื่อเรียก method getAllPosts
ตัว NestJS ก็จะ render post ทั้งหมดออกมา
บทส่งท้าย
บทความนี้ ได้กล่าวถึงวิธีการเบื้องต้น ในการรับส่งข้อมูลผ่าน SQS โดยการใช้ NestJS
แต่ทว่าบนระบบจริงนั้น เพื่อที่จะทำการสร้าง microservice architecture ขึ้นมาให้ใช้งานได้จริง เราอาจจะต้องคำนึงสิ่งอื่นๆ เพิ่มเติมด้วย เช่นการส่งข้อมูลผ่าน Simple Notification Service (SNS) สำหรับการ ส่งข้อมูลไปทีละหลายๆ queue รวมทั้งการ handle เวลาที่ queue ทำงานผิดพลาด หรือทำงานช้าเกินไป
แต่เพื่อไม่ให้บทความนี้ยาวเกินไป จึงต้องขอไม่กล่าวถึงส่วนเหล่านั้นโดยละเอียดในที่นี้ ถึงเช่นนั้น ผู้เขียนก็อยากให้ผู้อ่านทำการศึกษาเรื่องราวเหล่านั้น ควบคู่ไปกับบทความนี้ด้วย