Distributed task queue dengan Golang Asynq

Sammi Aldhi Yanto
9 min readJul 13, 2023

--

Photo by Michał Parzuchowski on Unsplash

Apa itu Tasks Queue?
Task Queue adalah sebuah mekanisme yang digunakan untuk mengatur dan menjalankan tugas atau pekerjaan yang memerlukan eksekusi terpisah dalam sebuah sistem (asynchronous).

Dengan menggunakan Task Queue, kita dapat memisahkan tugas-tugas tersebut dari proses utama dan membagikannya ke beberapa proses atau thread untuk nge handle task tersebut. Hal ini memungkinkan sistem untuk secara efisien mengelola workload yang kompleks dan meningkatkan kinerja serta skalabilitas aplikasi.

Task Queue buat apa?
Pemrosesan Gambar
Ketika kita memiliki aplikasi atau layanan yang memerlukan pemrosesan gambar, seperti thumbnail generation atau filter aplikasi foto, Task Queue dapat digunakan untuk membagi tugas pemrosesan gambar menjadi bagian-bagian kecil yang dapat dijalankan secara paralel.

Pengiriman Email Massal
Ketika kita perlu mengirim email massal ke ribuan atau bahkan jutaan pelanggan, Task Queue dapat digunakan untuk mengatur dan menjalankan tugas pengiriman email secara terpisah. Setiap email dapat diwakilkan sebagai tugas dalam antrian, dan proses atau worker dapat mengambil tugas tersebut dan mengirim email dengan efisien. Dengan menggunakan Task Queue, kita dapat menghindari pemblokiran atau penundaan dalam pengiriman email, mengelola antrian pengiriman dengan baik, sehingga meningkatkan responsivitas dan pengalaman pengguna.

Notifikasi
PDF Report
Backup
Daan masih banyak lagi contoh kasus penggunaan Task Queue lainnya

Apa itu Asynq?

Asynq adalah sebuah library Go yang digunakan untuk mengantri tugas-tugas (tasks queues) dan memprosesnya secara asynchronous dengan menggunakan worker. Di balik layar Library ini menggunakan Redis dan dirancang agar mudah digunakan dan scalable.

Secara umum, Cara kerja Asynq sebagai berikut berikut:

  1. Klien enqueue tugas-tugas pada sebuah antrian (queue).
  2. Server menarik tugas-tugas dari antrian dan memulai goroutine worker untuk setiap tugas.
  3. Tugas-tugas diproses secara concurrent oleh beberapa worker.

Antrian tugas digunakan sebagai mekanisme untuk mendistribusikan pekerjaan ke berbagai mesin ataupun concurrent process pada mesin yang sama. Sistem dapat terdiri dari beberapa server worker dan broker, yang memungkinkan ketersediaan/ yang tinggi (high availability) dan horizontal scaling.

Berikut ilustrasi cara kerja Asynq

https://github.com/hibiken/asynq

Fitur fitur yang ditawarkan asynq sebagai berikut:
Guaranteed at least one execution of a task
Jika terjadi failure yang menyebabkan hilangnya pesan atau membutuhkan waktu terlalu lama untuk recovery, pesan akan dikirim ulang untuk memastikan pesan terkirim setidaknya satu kali.

Scheduling of tasks
Dapat menjadwalkan tugas untuk dieksekusi di masa depan dengan menggunakan asynq.ProcessIn(time)

Retries of failed tasks
Dapat mengatur jumlah maksimal retry untuk tugas yang gagal, dengan menggunakan asynq.MaxRetry(10)

Automatic recovery of tasks in the event of a worker crash
Jika worker crash, tugas yang sedang diproses akan dikembalikan ke antrian untuk diproses ulang.

Weighted priority queues
Dapat mengatur prioritas tugas, misalnya menggunakan asynq.Queue(“critical”) untuk tugas yang kritis atau penting.

Strict priority queues
asynq mendukung antrian prioritas ketat, yang berarti tugas dengan prioritas lebih tinggi akan selalu dieksekusi terlebih dahulu dibandingkan dengan tugas dengan prioritas lebih rendah.

Low latency to add a task since writes are fast in Redis
Penulisan data pada Redis sangat cepat, sehingga asynq memiliki latensi rendah dalam menambahkan tugas ke dalam antrian.

De-duplication of tasks using unique option
Anda dapat menghindari duplikasi tugas dengan menggunakan opsi unik pada tugas-tugas yang sama. Jika sebuah tugas dengan opsi unik sudah ada dalam antrian, tugas baru dengan opsi yang sama akan diabaikan.

Allow timeout and deadline per task
Allow aggregating group of tasks to batch multiple successive operations
Flexible handler interface with support for middlewares
Ability to pause queue to stop processing tasks from the queue
Periodic Tasks
Support Redis Cluster for automatic sharding and high availability
Support Redis Sentinels for high availability
Integration with Prometheus to collect and visualize queue metrics
Web UI to inspect and remote-control queues and tasks
CLI to inspect and remote-control queues and tasks

Supeer duper komplit kan? Selengkapnya, temen2 bisa baca di sini https://github.com/hibiken/asynq/wiki/ ya hehe

Nyobain implementasi Asynq dengan studi kasus sederhana
Mari kita jelajahi penerapan Asynq pada sebuah aplikasi blog sederhana yang melibatkan admin, subscriber, dan tugas-tugas terkait email notification dan analitik.

Dalam aplikasi ini, admin dapat membuat post, dan para pembaca dapat berlangganan newsletter. Setiap kali admin membuat post, subscriber akan menerima email notifikasi. Selain itu, kita ingin melacak berapa banyak subscriber yang membaca post melalui tautan di email notifikasi, serta berapa banyak yang membaca langsung di website.

  1. Admin membuat post: Ketika admin membuat post baru, kita akan menerapkan konsep tasks queue. Setelah post dibuat, kita akan mengirimkan tugas “kirim notifikasi email” ke dalam antrian “email”. Tugas ini akan dikonsumsi oleh worker email yang akan mengirimkan email notifikasi kepada semua subscriber.
  2. Pembaca berlangganan newsletter: ketika pembaca berlangganan newsletter untuk mendapatkan update terbaru ketika artikel baru rilis, kita akan mengirimkan tugas “kirim welcome email”
  3. Pembaca membaca post melalui tautan di email: Ketika pembaca membuka tautan di email notifikasi, kita akan menerapkan konsep tasks queue sekali lagi. Setiap kali ada klik tautan, kita akan menambahkan tugas “lacak pembacaan melalui tautan” ke dalam antrian “analytic”. Tugas ini akan dikonsumsi oleh worker analitik yang akan mencatat bahwa pembaca tersebut membaca post melalui tautan.
  4. Pembaca membaca post langsung di website: Ketika pembaca membaca post langsung di website, kita juga akan menerapkan konsep tasks queue. Setiap kali ada aksi membaca post, kita akan menambahkan tugas “lacak pembacaan langsung” ke dalam antrian “analytic”. Tugas ini akan dikonsumsi oleh worker analitik yang akan mencatat bahwa pembaca tersebut membaca post langsung di website.

Untuk mendeteksi pembaca membuka link dari email atau website nya langsung kita tambahkan parameter UTM (Urchin Tracking Module) pada link yang dikirimkan ke email. Parameter UTM ini akan kita gunakan untuk melacak pembacaan post.

Apa aja yang kita butuhkan?
1. Go, kita akan menggunakan Go sebagai bahasa pemrograman
2. Redis, kita akan menggunakan Redis sebagai broker untuk Asynq
3. Docker & Docker Compose, kita akan menjalankan Redis menggunakan Docker
4. Dan package2 go seperti chi router, dll

Constrains
1. Datastore (penyimpanan post, dll) kita hanya akan menggunakan memory (map), jadi ketika server mati, data akan hilang hehe 🙃
2. Kita ga bikin tests (unittest,apitest,dll) krna lama bet bikinnya, jadi kita akan langsung coba run aja (blackbox test) 🙃

Show me the code 👨‍💻
Eitss, sabar dulu.. berikut struktur project kita

├── cmd
└── main.go
├── article_analytic.go
├── article_analytic_handler.go
├── article.go
├── article_handler.go
├── article_subscriber.go
├── article_subscriber_handler.go
├── http_server.go
├── mail_sender.go
├── task_send_new_article_notification_email.go
├── task_send_new_subscriber_welcome_email.go
├── task_track_read_article.go
├── utm_generator.go
├── utm_generator_test.go
├── worker_distributor.go
├── worker_logger.go
├── worker_processor.go
├── go.mod
├── go.sum
├── docker-compose.yaml
└── playground.http

Kode nya bisa temen temen lihat di github ya, Disini kita akan fokus pada beberapa file kodingan saja yang relate sama asynq

worker_distributor.go

package rmq

import "context"

import (
"github.com/hibiken/asynq"
)

type TaskDistributor interface {
DistributeTaskSendNewArticleNotificationEmail(
ctx context.Context,
payload *PayloadNewArticleNotificationEmail,
opts ...asynq.Option,
) error

DistributeTaskSendNewSubscriberWelcomeEmail(
ctx context.Context,
payload *PayloadNewSubscriberWelcomeEmail,
opts ...asynq.Option,
) error

DistributeTaskTrackReadArticle(
ctx context.Context,
payload *PayloadTrackReadArticle,
opts ...asynq.Option,
) error
}

type RedisTaskDistributor struct {
client *asynq.Client
}

func NewRedisTaskDistributor(redisOpt asynq.RedisClientOpt) TaskDistributor {
client := asynq.NewClient(redisOpt)
return &RedisTaskDistributor{
client: client,
}
}

Kode diatas berisi interface/kontrak dari jenis task yang akan di distribusikan.

worker_processor.go

package rmq

import (
"context"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog/log"
)

const (
QueueCritical = "critical"
QueueDefault = "default"
)

type TaskProcessor interface {
Start() error
ProcessTaskSendNewSubscriberWelcomeEmail(ctx context.Context, task *asynq.Task) error
ProcessTaskSendNewArticleNotificationEmail(ctx context.Context, task *asynq.Task) error
ProcessTaskTrackReadArticle(ctx context.Context, task *asynq.Task) error
}

type RedisTaskProcessor struct {
server *asynq.Server
articleRepo ArticleRepository
articleSubscriberRepo ArticleSubscriberRepository
articleAnalyticRepository ArticleAnalyticRepository
mailer EmailSender
}

func NewRedisTaskProcessor(
redisOpt asynq.RedisClientOpt,
articleRepo ArticleRepository,
articleSubscriberRepo ArticleSubscriberRepository,
articleAnalyticRepository ArticleAnalyticRepository,
mailer EmailSender,
) TaskProcessor {
logger := NewLogger()
redis.SetLogger(logger)

server := asynq.NewServer(
redisOpt,
asynq.Config{
Queues: map[string]int{
QueueCritical: 25,
QueueDefault: 5,
},
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
log.Error().Err(err).Str("type", task.Type()).
Bytes("payload", task.Payload()).Msg("process task failed")
}),
Logger: logger,
},
)

return &RedisTaskProcessor{
server: server,
articleRepo: articleRepo,
articleSubscriberRepo: articleSubscriberRepo,
articleAnalyticRepository: articleAnalyticRepository,
mailer: mailer,
}
}

func (processor *RedisTaskProcessor) Start() error {
mux := asynq.NewServeMux()

mux.HandleFunc(TaskSendNewSubscriberWelcomeEmail, processor.ProcessTaskSendNewSubscriberWelcomeEmail)
mux.HandleFunc(TaskSendNewArticleNotificationEmail, processor.ProcessTaskSendNewArticleNotificationEmail)
mux.HandleFunc(TaskTrackReadArticle, processor.ProcessTaskTrackReadArticle)

return processor.server.Start(mux)
}

Kode diatas berisi inisialisasi worker processor dengan redis, kemudian nge registrasiin routes sesuai dengan topik/pattern dan handler yg sesuai untuk memprosesnya.

task_send_new_article_notification_email.go

package rmq

import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
)

const TaskSendNewArticleNotificationEmail = "task:send_new_article_notification_email"

type PayloadNewArticleNotificationEmail struct {
ArticleTitle string `json:"article_title"`
ArticleSlug string `json:"article_slug"`
}

func (distributor *RedisTaskDistributor) DistributeTaskSendNewArticleNotificationEmail(
ctx context.Context,
payload *PayloadNewArticleNotificationEmail,
opts ...asynq.Option,
) error {
jsonPayload, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal task payload: %w", err)
}

task := asynq.NewTask(TaskSendNewArticleNotificationEmail, jsonPayload, opts...)
info, err := distributor.client.EnqueueContext(ctx, task)
if err != nil {
return fmt.Errorf("failed to enqueue task: %w", err)
}

log.Info().Str("type", task.Type()).Str("queue", info.Queue).Int("max_retry", info.MaxRetry).Msg("enqueued task")
return nil
}

func (processor *RedisTaskProcessor) ProcessTaskSendNewArticleNotificationEmail(_ context.Context, task *asynq.Task) error {
var payload PayloadNewArticleNotificationEmail
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", asynq.SkipRetry)
}

subscribers := processor.articleSubscriberRepo.GetArticleSubscribers()
if len(subscribers) == 0 {
return nil
}

to := make([]string, 0, len(subscribers))

for _, v := range subscribers {
to = append(to, v.Email)
}

subject := "New Article Published"
baseURL := "http://localhost:8080/articles/" + payload.ArticleSlug
source := "newsletter"
medium := "email"
campaign := "newsletter"

link, err := GenerateUTMURL(baseURL, source, medium, campaign)
if err != nil {
return fmt.Errorf("failed to generate utm url: %w", err)
}

content := fmt.Sprintf(`Hello, <br/>
We have published a new article: <a href="%s">%s</a><br/>
`, link, payload.ArticleTitle)

err = processor.mailer.SendEmail(subject, content, to, nil, nil, nil)
if err != nil {
return fmt.Errorf("failed to send verify email: %w", err)
}

log.Info().Str("type", task.Type()).Msg("processed task")

return nil
}

Ini merupakan handler yang bertugas untuk ngehandle ketika ada task dengan tipe/route “TaskSendNewArticleNotificationEmail”.

Untuk handler yang lainnya temen-temen bisa lihat di github ya.

Sekarang kita akan lihat, client kode untuk enqueue task kedalam broker.

func (s *Server) CreateArticleHandler(w http.ResponseWriter, r *http.Request) {
var article Article
err := json.NewDecoder(r.Body).Decode(&article)
if err != nil {
http.Error(w, "Invalid request payload", http.StatusBadRequest)
return
}

article = s.ArticleRepo.CreateArticle(article)

taskPayload := &PayloadNewArticleNotificationEmail{
ArticleTitle: article.Title,
ArticleSlug: article.Slug,
}

opts := []asynq.Option{
asynq.MaxRetry(0),
asynq.Queue(QueueCritical),
}

if taskErr := s.TaskDistributor.DistributeTaskSendNewArticleNotificationEmail(context.Background(), taskPayload, opts...); taskErr != nil {
log.Err(taskErr).Msg("Failed to distribute task")
}

w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(article)
}

Kode diatas merupakan handler untuk menangani pembuatan article, dan nge distribute task “kirim email notifikasi” kepada pembaca yang telah berlangganan ke queue.

Untuk handler yang lainnya temen-temen bisa lihat di github ya hehe.

dan yang terakhir untuk main.go

package main

import (
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
"redis-message-queue"
)

func main() {
articleRepo := &rmq.InMemoryArticleRepository{}
articleSubscriberRepo := &rmq.InMemoryArticleSubscriberRepository{}
articleAnalytics := &rmq.InMemoryArticleAnalyticsRepository{
ArticleRepository: articleRepo,
}

redisOpt := asynq.RedisClientOpt{Addr: "localhost:6379"}
taskDistributor := rmq.NewRedisTaskDistributor(redisOpt)

mailer := rmq.NewFakeGmailSender("sammi", "sammi@gmail.com", "xxx")
taskProcessor := rmq.NewRedisTaskProcessor(redisOpt, articleRepo, articleSubscriberRepo, articleAnalytics, mailer)

go func() {
log.Info().Msg("start task processor")
err := taskProcessor.Start()
if err != nil {
log.Fatal().Err(err).Msg("failed to start task processor")
}
}()

s := rmq.NewServer(articleRepo, articleSubscriberRepo, articleAnalytics, taskDistributor)
s.Start(8080)
}

Kode diatas akan menjalankan task processor nya di goroutine lain, temen2 bisa bikin file terpisah bahkan node terpisah untuk nge run si worker processor ini.

Testing

Sekarang kita akan lakukan test untuk usecase reader subscriber newsletter, admin rilis artikel baru, dan reader membaca artikel.

Pembaca subscribe/berlangganan newsletter
Client mengirim task email welcome dan akan di proses oleh worker
Admin membuat artikel baru
Subscriber akan mendapatkan email notifikasi
Pembaca membuka tautan yang dikirimkan lewat email
Client mengirim track read article dan akan di proses oleh worker
Load test dengan hey -n 1000 -c 100 http://localhost:8080/articles/title-1 1k requests dengan 100 concurrent process
Task akan di enqueue kedalam queue dan worker akan meng eksekusi task task tersebut
Log

Syudah selesai, terimakasih sudah membaca tmen2, semoga bermanfaat, have a nice day 🌞

--

--