GO Messaging System ด้วย RabbitMQ และ GO AMQP

Event Driven Microservices with RabbitMQ + GO AMQP

Teerapong Singthong 👨🏻‍💻
iamgoangle
6 min readJun 29, 2019

--

Original Photo by Joel Muniz on Unsplash

การที่เราต้องการแลกเปลี่ยนข้อมูลระหว่าง service-to-service เรามักจะเจออยู่ 2 ประเภทหลักๆ ได้แก่ Synchronous หรือ Asynchronous ที่ทำงานอยู่บนพื้นฐานหลากหลาย protocol เช่น HTTP, RPC, gRPC, AMQP, MQTT เป็นต้น นอกจากนั้นถ้าหากเราต้องทำงานที่เกี่ยวกับระบบ Distributed System การสื่อสารแบบ Synchronous Pattern อาจไม่ตอบโจทย์ทั้งหมด จนเราต้องมองหา Pattern ที่เหมาะสม การมาของ Event Sourcing / CQRS / Distribute Event Streaming หรือจะเรียก Event Driven ก็มีแนวความคิดที่คล้ายๆกัน ที่ตอบโจทย์เงื่อนไขนี้

เพื่อลงลึกกว่านั้นว่าแนวคิดการสื่อสารแต่ละประเภทแตกต่างกันอย่างไร แนวความคิดของ Orchestration แตกต่างกับ Choreography มากแค่ไหน เพื่อนๆ ผู้อ่านสามารถศึกษาเพิ่มเติมได้จากบทความเก่าของผมด้านล่างนะครับ

TL;DR

บทความนี้จะใช้การสื่อสารรูปแบบ AMQP โดยใช้เครื่องมือ RabbitMQ + ภาษา Go และนำพาผู้อ่านเข้าใจแนวคิดของแต่ละ feature ที่ RabbitMQ ทำให้เราใช้กัน ตั้งแต่

  • Default exchange
  • Direct exchange
  • Fanout Exchange และ Topic Routing Exchange
  • Dead-Letter-Exchange จนไปถึงการทำ Retry ด้วยการไม่ใช้ปลั๊กอิน
  • Delay Queue ขอไว้บทความหน้านะครับ

บทความนี้เหมาะกับคนที่เขียน Go มาบ้างแล้ว เพราะตัวอย่างโค๊ดผม ได้เขียน internal package ห่อหุ้ม https://godoc.org/github.com/streadway/amqp อีกทีต้องขอภัยผู้อ่านด้วยครับที่ต้อง clone project ผมมาไว้ในเครื่องก่อน _/|\_

รู้หรือไม่? RabbitMQ vs Kafka แม้ว่าจะใช้สถาปัตยกรรม Messaging System เหมือนกัน แต่แนวคิด และ Usecase การใช้งานคนละรูปแบบ อย่าลืมเลือกเครื่องมือให้เหมาะกับงานกันนะครับ

AMQP ทำงานอย่างไร?

https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/architect-microservice-container-applications/communication-in-microservice-architecture

Synchronous Communication เชื่อว่าหลายท่านคงคุ้นเคยดีอยู่แล้วว่า การสื่อสารแบบนี้จะเกิด Blocking I/O ผู้ส่ง ต้องรอ ผู้รับตอบกลับมา ถึงจะถือว่าการสื่อสารนั้นสมบูรณ์ แต่อย่างที่เคยบอกไปว่าถ้าต้องการประสิทธิภาพในงาน Distributed System การสื่อสารแบบนี้คงไม่เหมาะกับทุก Usecase เราจึงต้องหาตัวช่วย

Asynchronous Communication ผ่านทาง AMQP Protocol เป็นอีกรูปแบบที่หลายๆที่ใช้กัน เหมาะกับ Usecase Message Queue ที่ต้องการส่งข้อมูลประมาณมากๆ และ ตอบโจทย์ อย่างที่ RabbitMQ เคลมไว้ว่า General Purpose Messaging System

โดยมี Producer ทำหน้าที่ส่งข้อมูล เข้าไปที่ Broker และ Exchange ทำหน้าที่กระจายข้อมูลไปพักข้อมูล Queue ตามเงื่อนไข Routing Key ทำให้ Consumer ที่สนใจ Queue นั้น สามารถทยอยดึงข้อมูลไปประมวลผล และ ทำงานต่อ

https://specify.io/concepts/microservices#choreography

เพื่อให้เข้าใจมากขึ้น Customer Service ทำหน้าที่ Produce ข้อมูลไปยัง Customer Create Event ในเลเยอร์นี้ ก็คือ Broker / Queue ส่วน Consumer ได้แก่ Loyalty points Service, Post Service และ Email Service จะได้รับ Event ไปทันที เมื่อมีการเปลี่ยนแปลงของที่อยู่ใน Queue ถ้าหากใครคุ้นเคยกับ Event Topic / Publish and Subscribe Pattern น่าจะร้องอ๋อออออออ….

Messaging System

ก่อนจะรู้ว่า RabbitMQ มี Feature อะไรบ้างนั้น มาทำความเข้าใจกันก่อนว่าระบบ messaging system สำหรับบทความนี้ ประกอบด้วย component อะไรบ้าง

  • Producer := ผู้ส่งข้อมูลเข้าไปยัง Message Broker
  • Broker := จัดการ Meta Data และ ทำหน้าที่คอยดูแลความสัมพันธ์ระหว่าง Exchange และ Queue
  • Exchange := กำหนด Routing Key ทิศทางการไหลของข้อมูลว่าจะให้ไป Queue ไหนในระบบ
  • Binding := กำหนด Routing Queue ให้กับ Queue
  • Queue := ถังเก็บข้อมูลแบบเรียงลำดับ
  • Consumer := ผู้รับข้อมูลจาก Queue

RabbitMQ

โปรดัคที่สร้างระบบ General purpose message broker for message queuing โดยอยู่บนพื้นฐาน AMQP Protocol รองรับสถานการณ์ routing key ที่หลากหลาย และซับซ้อน การออกแบบเพื่อรองรับ High-throughput ส่วนแนวทางการ Scaling นั้น มักจะเป็น Vertical scale นะครับ และถูกพัฒนาด้วยภาษา Erlang

Feature การส่งและรับข้อมูล มีหลายรูปแบบ ดังนี้

  1. Direct Exchange ทำการส่ง message ไปยัง Queue ที่ระบุใน message routing key แบบ exact match
  2. Topic Exchange ทำการกระจาย message ไปยัง Queue ที่ match wildcard ระหว่าง routing key และ routing pattern ใช้กับงานประเภท Pub/Sub
  3. Fanout Exchange ทำการส่ง message แบบ broadcast ไปทุกๆ Queue ที่ Available อยู่ ทุก consumer ได้รับ message เหมือนกัน
  4. Headers คือ การใช้ header attribute สำหรับการ routing

นอกจากนี้ถ้าเราไม่ใช้ Exchange แต่กำหนดเท่ากับค่าว่าง ก็จะเป็นการส่งเข้า Queue ตรงๆเลย

Setup Docker

docker-compose.yml

Clone Example Project

git clone https://github.com/iamgoangle/rabbit-go

(1) Simple Direct Queue

หัวข้อนี้จะพาผู้อ่านเริ่มต้นโปรเจคที่เป็นพื้นฐาน ทำการสร้าง Producer + Queue + Consumer และ กำหนด exchange ให้อยู่ในรูปแบบ default นั่น คือ กำหนดแบบ "" empty string สิ่งที่จะเกิดขึ้นคือ เราต้องการเอาของเข้า Queue ให้ได้ก่อน และ มี consumer มากวาดเอาข้อมูลใน queue ไปแสดงผล

producer.go

ในตัวอย่างนี้ต้องการสร้าง producer เพื่อส่งข้อความ Hello World 10 ครั้ง ไปยัง queue ที่ชื่อว่า hello-simple

10–16) เปิด connection ไปยัง Rabbitmq broker
18-25) กำหนด exchange = "" เพื่อใช้ default exchange และ กำหนด queue ที่ต้องการเชื่อมต่อ (ขั้นตอนนี้จะทำการ สร้าง exchange และ queue declare ให้)
27-30) สร้าง producer
32-38) produce message 10 ครั้งไปยัง queue
ผลลัพธ์การสร้าง Queue http://localhost:15672/#/queues

consumer.go

consumer ทำหน้าที่ต่อไปยัง queue hello-simple ทันทีที่มี message เข้ามายัง queue consumer จะได้รับ message ใหม่ทันที

10-15) สร้าง connection ไปยัง rabbitmq
18-22) กำหนด config queue ที่ต้องการเชื่อมต่อ
24-26) สร้าง consumer
29-32) subscribe ของใน queue โดย method จะส่งข้อมูลออกมาทาง go channel
36-42) อ่านข้อมูลจาก go channel ที่ได้รับ

บรรทัดที่ 40 เป็นการบอกไปยัง Broker ว่า Acknowledgement ข้อความนี้แล้ว

(2) Direct Exchange

วิธีการส่งข้อมูลผ่านไปยัง exchange และ routingKey ที่กำหนด และ queue ใดๆก็ตามที่ binding ไว้ตาม routingKey แบบ exact match จะได้รับข้อมูลนั้นไป

producer.go

21-27) สร้าง connection ไปหา rabbitmq broker
29-34) กำหนด config producer แบบ ExchangeDirect กำหนด exchange ที่ชื่อว่า asia.exchange และ routingKey ที่ต้องการส่งข้อมูลไป
37-52) produce ข้อมูล โดยที่ข้อมูลจะไหลไปยัง exchange และ routingKey ตามกำหนด
asia.exchange ถูกสร้างขึ้นแล้ว

consumer.go

ทดสอบ consumer

$ ROUTING_KEY=asia.thailand QUEUE=asia.thailand.queue go run consumer.go$ ROUTING_KEY=asia.singapore QUEUE=asia.singapore.queue go run consumer.go

ทดสอบ producer

$ ROUTING_KEY=asia.thailand go run producer.go
$ ROUTING_KEY=asia.singapore go run producer.go

ความสัมพันธ์ระหว่าง exchange และ queue

exchange ทำหน้าที่เป็น queue router เพื่อบอกว่า message ที่ถูก produce มา จะให้ส่งต่อไปยัง routing ไหน จากรูป asia.exchange คือ exchange ที่มี queue 2 ตัว กำลัง binding กับ exchange นี้อยู่ และที่สำคัญ แต่ละตัวกำหนด routingKey แตกต่างกัน

hint: producer ไม่สนใจว่ามี queue อะไรบ้าง ทำหน้าที่ produce message ไปหา exchange และ กำหนด routingKey ก็พอ เดียว exchange จะทำหน้าที่ส่งต่อไปยัง queue ที่กำหนด routingKey ให้เอง

(3) Fanout

ทำการส่งข้อมูลของ producer ส่ง copy message ไปทุก queue หลายๆท่านอาจมีคำถามว่า ทำไมไม่ใช้ direct exchange สาเหตุ เพราะว่า fanout จะทำสำเนา message ไปให้ทุก queue เท่าๆกัน ถ้าใช้ direct exchange จำนวน consumer ทั้งหมดจะถูกมองว่าเป็น consumer worker หรือ consumer group ที่ broker จะ round-robin ของให้

producer.go

consumer.go

Run

$ go run fanout-consumer.go
$ go run fanout-consumer.go
$ go run fanout-producer.go

Demo

(4) Topic Exchange

การแลกเปลี่ยนข้อมูลประเภทนี้ สามารถกำหนด routingKey แบบ wildcard ได้ แตกต่างกับ direct exchange ที่ได้เรียบรู้ไป ที่ต้อง exact match เท่านั้น

* เงื่อนไขเท่ากับ 1 คำเท่านั้น

# เงื่อนไขเท่ากับ 0 หรือ มากกว่า 1 คำ

ถ้ากำหนด routingKey = asia.thailand.#

asia.thailand
asia.thailand.bangkok
asia.thailand.bangkok.sukhumvit
asia.thailand.udonthani

ถ้ากำหนด routingKey = asia.thailand.*

asia.thailand.bangkok
asia.thailand.udonthani
asia.thailand ไม่ match
asia.thailand.bangkok.sukhumvit ไม่ match

producer.go

21-27: สร้าง connection
29-36: สร้าง config สำหรับ producer ส่วนสำคัญอยู่ที่การกำหนด routingKey จากตัวอย่าง อ่านค่าจาก env ว่าจะให้มี wildcard แบบไหน

consumer.go

rbMqConfig := rabbitmq.ConfigConsumer{
Exchange: rabbitmq.ConfigExchange{
Type: rabbitmq.ExchangeTopic,
Name: "asia.exchange.topic",
Durable: true,
},
Queue: rabbitmq.ConfigQueue{
Name: os.Getenv("QUEUE"),
Exclusive: true,
Bind: rabbitmq.ConfigQueueBind{
ExchangeName: "asia.exchange.topic",
RoutingKey: os.Getenv("ROUTING_KEY"),
},
},
}

Exchange

  • Type กำหนด exchange topic
  • Name ชื่อ exchange ในที่นี้ใช้ asia.exchange.topic
  • Durable กำหนดให้ persist data เอาไว้ใน Queue

Queue

  • Name ชื่อของ Queue
  • Exclusive มีเพียง consumer คนเดียวที่ได้ของจาก queue นี้

Bind

  • ExchangeName กำหนดว่า queue binding กับ exchange ไหน
  • RoutingKey กำหนด routingKey แบบ wildcard

ทดสอบ

$ ROUTING_KEY=asia.thailand.* QUEUE=asia.thailand.all.province go run consumer.go$ ROUTING_KEY=asia.thailand.# QUEUE=asia.thailand.all.province.road go run consumer.go$ ROUTING_KEY=asia.thailand.bangkok go run producer.go$ ROUTING_KEY=asia.thailand.bangkok.sukhumvit go run producer.go

จากตัวอย่างทั้ง 4 รูปแบบแสดงให้เห็นถึงความสามารถของ RabbitMQ ที่ให้เราสามารถเลือกวิธีการสื่อสารระหว่าง service-to-service ด้วย pattern ต่างๆ ที่จะช่วยให้การสื่อสารข้อมูลปริมาณมากๆ ให้มีความน่าเชื่อถือ และ มีประสิทธิภาพ ซึ่งเหมาะกับ Usecase Asynchronous และ Event Driven

ตัวอย่างถัดไปผมจะพูดถึงอีกหนึ่ง feature ที่สำคัญของ RabbitMQ และ Messaging system ทั่วไปนั่น คือ Dead Letter Exchange ที่จะลองนำความสามารถนี้มาทำ Retry Mechanism ในระบบ Queue

Dead Letter Exchange และ TTL

ลองจินตนาการดูว่าถ้า service เรานั้น มีปริมาณ Inbound data จำนวนมากในหนึ่งวินาที หรือ ช่วงเวลานึง และ ถ้า consumer ของเรานั้นยังทำงานได้ดี แต่ service ปลายทางที่ consumer ต้องไปคุยด้วยนั้น เกิดตายขึ้นมาเราจะจัดการกับ fault tolerant อย่างไร? ระบบ DLX จึงเกิดขึ้นมาเพื่อให้เราสามารถออกแบบระบบ retry ใน queue ของเราได้

Dead Letter Exchange สถานะของ message ที่ถูก reject หรือ negative acknowledge จาก consumer และให้เราจัดการต่อว่าจะส่ง message ที่ถูก reject นี้ไปยัง exchange ไหน

TTL กำหนดอายุของ message ถ้าหมดอายุก็จะเข้าสถานะ DLX

ทำความเข้าใจวิธีคิดของการ Retry ด้วย DLX และ TTL

  1. สร้าง work.exchange และ กำหนด routingKey
  2. สร้าง retry.exchange และ กำหนด routingKey
  3. สร้าง work.queue และ Binding กับ work.exchange และ กำหนด routingKey
  4. สร้าง retry.queue และ Binding กับ retry.exchange และ กำหนด routingKey
  • x-dead-letter-exchange = "work.exchange"
  • x-dead-letter-routing-key = "work.routing"
  • x-message-ttl = 10000 10 วินาที หรือ แล้วแต่

setup-exchange.go

setup-queue.go

consumer.go

consumer ทำการสุ่ม เพื่อจำลองว่า remote resource ปลายทางไม่สามารถทำงานได้

producer.go

Demo

สิ่งที่น่าสนใจ คือ ต้องถามก่อนว่าระบบของเราจำเป็นที่ต้องใช้ Messagign System แบบนี้ หรือไม่ ซึ่งจะเห็นได้ว่ามันมีความซับซ้อนอยู่พอสมควร ในช่วงการตั้งค่าให้กับโปรเจค และที่สำคัญหากสเกลของระบบต้องการเครื่องมือแบบนี้ ก็ยังมีคำถามต่อว่าแล้วแบบไหนเหมาะกับโปรเจคของเรา ระหว่าง Apache Kafka ที่ตัวระบบออกแบบมาด้วยแนวคิด Horizal Scale และ Kafka Data Stream ซึ่งคนละแนวคิดกับ RabbitMQ ที่เป็นแนวคิดของ Vertical Scale

สุดท้ายนี้…ต้องเข้าใจความต้องการของระบบก่อนเลือกเครื่องมือนะครับผม

--

--

Teerapong Singthong 👨🏻‍💻
iamgoangle

Engineering Manager, ex-Solution Engineering Lead at LINE | Tech | Team Building | System Design | Architecture | SWE | Large Scaling System