Exploring golang events-based job queues for asynchronous task processing — 探索 Golang 的事件驅動任務隊列

zach.yang
iKala 技術部落格
16 min readApr 25, 2023
大家好,我是 Zach,目前在於 iKala KOL Radar 擔任軟體工程師半年了,在軟體開發領域奮鬥了四年的時間。對於軟體開發有很大的熱情和興趣,因此決定嘗試寫一篇技術文章來分享我的經驗和觀點。這是我第一次嘗試寫技術文章,歡迎大家提出寶貴的建議和意見,希望這篇文章能為大家帶來一些啟發和幫助。

前言

iKala KOL Radar 擁有亞太區最大的網紅數據庫,擁有並持續運算超過 100 萬名網紅和 5 億筆以上的貼文資料。作為軟體工程師,我負責開發和維護這個平台的技術架構和功能。因應平台的不斷擴充,像是洞察報表,商案媒合報告,用戶信件發送等等的功能,越來越多的功能需要因事件觸發而進行非同步處理。

The Problem

然而現有的處理方式幾乎都是依賴 goroutine 進行同步處理,goroutine 雖然可以進行 concurrency,但很難系統性追蹤錯誤,程式碼也很難管理,這讓系統的穩定性和可維護性帶來了挑戰。因此,我們引入了 events-based job queues for asynchronous task 分散式任務處理解決方案 。本篇技術文章將介紹我們使用的解決方案,並分享其如何有效提高穩定性和可維護性的經驗。

我們借鑒了 grafana eventbus 的部份概念,開發了 EventBus System 套件來處理發布/訂閱事件,它可以簡化事件數據傳輸的流程,使發送事件及新增事件接收器變得非常容易

EventBus 流程圖

EventBus System

EventBus System 有幾個主要的元件,分別如下:

  • 註冊表 (Registry) : 事件註冊表,紀錄事件 (event) 和相對應的監聽器(listeners)
  • 事件分發器 (Dispatch) : 用來發送事件到 Job Queue 當中
  • 工作隊列 (Job Queue) : 等待執行的監聽器佇列
  • 處理程序 (Processor) : 監聽執行器,負責執行監聽器工作
  • 監聽器任務 (Job Listener): 把要進行的任務方法抽象成 interface ,可以透過 Processor 呼叫

當一個事件被觸發時,Dispatch 會透過 Registry 找出與之對應的 listener,將事件傳遞到 Job Queue 等待執行。而 Processor 則是一個持續運行的 daemon,負責從 Job Queue 中取出 listener 並執行對應的 Job Listener。

在接下來的內容當中,我將逐一介紹上述提到的元件。同時,也會分享相關的程式碼範例,以幫助更好地理解我們所使用的分散式任務處理解決方案。

註冊表 (Registry):

註冊表 (Registry) 就是一個對應表,將事件類型與動作進行了映射。在這個映射表中,每一個事件類型都會對應到一個或多個動作,當該事件發生時,就會執行相應的動作,這讓我們能夠知道發生的事件為何以及觸發的監聽器做的事情是什麼。

package eventbus

import "{kolradar}/pkg/eventbus"

// EventType 代表發生的事件
const (
FinishedFirstBlogPostEvent eventbus.EventType = "finished_first_blog_post"
)

// Listener 用於對應 EventType,代表事件發生後要執行的 listener
const (
SubmitToMediumListener eventbus.Listener = "submit_to_medium"
ShareToFriendListener eventbus.Listener = "share_to_friend"
)

// Event2Listener 為 EventType 對應到的 listener
// 此處列出所有的對應表,外部只需要使用需要的對應即可
var Event2Listener = map[eventbus.EventType][]eventbus.Listener{
FinishedFirstBlogPostEvent: {
SubmitToMediumListener,
ShareToFriendListener,
},
}

type Dispatcher struct {
cfg Config
publisher pubsub.Publisher
listeners map[EventType][]Listener
}

// 用於註冊定義的註冊表把定義好的事件加入 dispatcher.listeners map 當中
func (dispatcher *Dispatcher) Register(registry map[EventType][]Listener) {
if registry == nil {
return
}

dispatcher.listeners = registry
}

e.g. 以這個範例來說,觸發的事件為完成第一篇部落格 (finished_first_blog_post), 然而這個事件會對應我們接續要做的兩件事情分別為提交到 medium (submit_to_medium) 以及分享給朋友(share_to_friend)事件分發器 (Dispatch):

事件分發器 (Dispatch):

事件分發器 (Dispatch) 的主要作用是從 Registry 當中找到對應的Listener,並且發送事件到 Job Queue 當中。在這裡,我們使用的為 watermill 的開源專案來傳遞接收 Job Queue 的資料,此外它也支援多種 Pub/Sub 系統。

func (dispatcher *Dispatcher) Dispatch(ctx context.Context, eventType EventType, payload json.RawMessage, opts ...Option) error {
dispatcherCfg := newDefaultConfig(dispatcher.cfg)
for _, opt := range opts {
opt(dispatcherCfg)
}

topic := dispatcherCfg.TopicName()
attributes := map[string]string{}
// check if there is any listener for this event type
if _, ok := dispatcher.listeners[eventType]; !ok {
return errors.Errorf("eventType: %s, no listener for this event type, check if you have registered the listener.\n", eventType)
}

for _, listener := range dispatcher.listeners[eventType] {
workerPayload := WorkerPayload{
Payload: payload,
Listener: listener,
}

b, err := json.Marshal(workerPayload)
if err != nil {
return err
}

publishMessage := message.NewMessage(watermill.NewUUID(), b)
publishMessage.Metadata = attributes
publishMessage.SetContext(ctx)

if err := dispatcher.publisher.Publish(topic, publishMessage); err != nil {
return err
}
}

return nil
}

...
func main() {
if err := dispatcher.Dispatch(ctx, eventType, payload, eventbus.Priority(eventbus.PriorityDefault));
err != nil {
return err
}
}

透過封裝過的 Dispatch 函式省去了實作 Queue 需要實作的程式碼,並且可以透過 Option 來設定這個任務的優先度,

工作隊列 (Job Queue)

工作隊列 (Job Queue) 用於處理事件的系統,它包括了 publishers 和 subscribers 兩部分。publishers 將事件發送到隊列中,subscribers 從隊列中取出事件並進行相應的處理。使用工作隊列可以實現事件的異步處理,提高系統的吞吐量和可擴展性。

目前,我們團隊所使用的工作隊列 Job Queue 是 Google Cloud 提供的 Pub/Sub 系統,有提供 dashboard 可以監控任務發送的數量。當事件處理時有錯誤發生的時候可以根據設定次數來 retry ,並且錯誤次數高達系統設定的上限時,事件將會進入 Dead-Letter Queue (DLQ) 中,讓我們可以後續手動進行調試和修復。

處理程序(Processor)

處理程序 (Processor) 是指從 Job Queue 中提取對應的工作及參數,並進行相應的處理的過程。在我們的系統中,我們使用 Handler 從工作隊列中提取相應的工作和參數,並將它們傳遞給 Processor 函式進行處理。Processor 會根據傳入的 ListenerPayload,找到相應的工作,我們使用 interface 的多型抽象這層工作,並呼叫其 Handle 方法進行具體的處理。

// daemon 將會執行 Handler 不斷接收傳遞過來的工作
func (e *EventBus) Handler(ctx context.Context) error {
e.router.Router.AddMiddleware(eventbus.Trace())
e.router.Router.AddNoPublisherHandler(
e.cfg.HandlerName(),
e.cfg.WorkerTopicName(),
e.subscriber,
func(msg *message.Message) error {
var workerPayload eventbus.WorkerPayload
if err := json.Unmarshal(msg.Payload, &workerPayload); err != nil {
return errors.New("destructure failed")
}
msgCtx := msg.Context()
if err := e.eventbus.Process(msgCtx, workerPayload); err != nil {
logger.WithContext(msgCtx).Errorf("pulling message fail err:%v", err)
return err
}
return nil
},
)
return e.router.Router.Run(ctx)
}

// 接收會執行相對應的 listeners
func (e *EventBus) Process(ctx context.Context, workerPayload WorkerPayload) error {
return e.Processor.process(ctx, workerPayload)
}

type processor struct {
listeners map[eventName]ListenerContract
}

func (processor *processor) process(ctx context.Context, workerPayload WorkerPayload) error {
if val, ok := processor.listeners[workerPayload.Listener]; ok {
return val.Handle(ctx, workerPayload.Payload)
} else {
logger.WithContext(ctx).WithField("workerPayload.Listener", workerPayload.Listener).
Error("processor listeners is not found")
}

return nil
}

這是一個不斷執行的 worker,可以根據參數調整提取 Job Queue 的數量,也能在非同步任務很多的情況下 Auto Scaling 機器

// ListenerContract 是一個 interface,它包含了 Name() 和 Handle() 兩個方法。Name() 方法返回 Listener 的名稱,而 Handle() 方法則用於處理相應的事件。在 Handle() 方法中,我們可以根據傳入的 payload 參數進行相應的處理,並返回處理結果。
type ListenerContract interface {
Name() Listener
Handle(ctx context.Context, payload []byte) error
}

透過定義 ListenerContract interface 抽象了每一個 Listener Jobs 需要實作的方法讓程式碼的管理和可讀性增加

監聽器任務 (Job Listener)

監聽器任務 (Job Listener) 是指對應每個事件所要執行的具體任務。接續範例實作 submit_to_medium 和 share_to_friend 兩個 ListenerContract 任務。

/**** Job1 ****/

type SubmitToMedium struct {}

type SubmitToMediumPayload struct {
BlogID uint `json:"blog_id"`
MediumID uint `json:"medium_id"`
}

func (s *SubmitToMedium) Name() eventbus.Listener {
// submit_to_medium
return eventbus.SubmitToMediumListener
}

func (s *SubmitToMedium) Handle(ctx context.Context, payload []byte) error {
p := SubmitToMediumPayload{}
if err := json.Unmarshal(payload, &p); err != nil {
return err
}

medium.Sumbit(p.BlogID, p.MediumID)
return nil
}

type ShareToFriend struct {}

/**** Job2 ****/
type ShareToFriendPayload struct {
ShareUserID uint `json:"share_user_id"`
}

func (s *ShareToFriend) Name() eventbus.Listener {
// share_to_friend
return eventbus.ShareToFriendListener
}

func (s *ShareToFriend) Handle(ctx context.Context, payload []byte) error {
p := SubmitToMediumPayload{}
if err := json.Unmarshal(payload, &p); err != nil {
return err
}
share.send(p.ShareUserID)
return nil
}

分別 submit_to_medium 和 share_to_friend 實作了 interface 需要的 function。

// 初始化所有的 event
func NewRegistry(
submitToMedium *listeners.SubmitToMedium,
shareToFriend *listeners.ShareToFriend,
) *eventbus.Registry {
return &eventbus.Registry{
Process: []eventbus.ListenerContract{
submitToMedium,
shareToFriend,
},
Dispatch: internal_payment.PaymentEvent2Listener,
}
}

// 透過 register 來註冊 processor.listeners 的對應 interface 實作
func (processor *processor) register(listener ListenerContract) {
processor.listeners[listener.Name()] = listener
}

初始化 NewRegistry 之後,分別透過 register 函式定義成 Map,讓 Dispatch 時透過註冊表發送的 listener 能夠對應到 Handle()

以下是使用的實際例子:

透過 Pub/Sub dashboard 可以方便觀測到事件被派送的數量以及消耗的數量

1) GCP pub/sub dashboard

透過 Elastic APM 監測每個非同步任務的狀態和性能表現,有效監測任務中的每一條 DB Query 和最終是否有正確完成。

2) Elastic Application Performance Monitoring (APM)

Summary

以往我們上線一個功能的時候,當我們需要採用分散式任務的方式來實作功能時,會需要去建立及管理新的 Job Queue,也需要重新撰寫 shell script 去啟用的機器來執行 daemon,這也會成為額外機器成本,很難有效的利用以及統一的監測; 但透過 EventBus 套件能夠方便維護管理所有的非同步工作,同時更利於我們監測這些工作的狀態,讓錯誤排除更容易,機器成本可以更有效地被利用,並且當任務數量增加的時候,也能透過 Auto Scaling 機制來增加消化任務的速度。

Reference:

  1. https://github.com/grafana/grafana/blob/main/pkg/bus/bus_test.go
  2. https://github.com/ThreeDotsLabs/watermill

--

--