Belajar Redis Pub/Sub: Komunikasi Asynchronous dalam Sistem Terdistribusi

Sammi Aldhi Yanto
11 min readJul 24, 2023

--

Photo by Growtika on Unsplash

Pub/Sub (Publish/Subscribe) adalah architectural design pattern yang digunakan dalam sistem terdistribusi untuk komunikasi asynchronous antara berbagai komponen atau layanan yang berbeda. Pola ini memungkinkan movement of message (pergerakan pesan) antara komponen sistem tanpa saling mengetahui (terpisah), seperti Cinta dalam diam 😊

Asynchronous Communication adalah mekanisme dalam sistem komunikasi di mana pengiriman pesan atau pertukaran data antara komponen atau layanan terjadi tanpa harus menunggu tanggapan atau respon langsung

Dengan Pub/Sub, pengirim (publishers) mengirimkan pesan ke sebuah topik, dan penerima (subscribers) menerima pesan dari topik tersebut. Pengirim dapat mengirim pesan tanpa mengungkapkan identitasnya, meskipun dapat diidentifikasi oleh penerima jika diperlukan.

Kapan kita menggunakan Pub/Sub?

Asynchronous integration yang ditawarkan oleh Pub/Sub meningkatkan fleksibilitas dan robustness sistem secara keseluruhan. Berikut beberapa contoh use cases Pub/Sub

  • Real-time messaging dan chat
    Pub/Sub dapat digunakan untuk membuat aplikasi messaging dan chat secara real-time, seperti pada platform media sosial, aplikasi pesan instan, dan lingkungan kerja kolaboratif.
  • Perangkat IoT (Internet of Things)
    Pub/Sub dapat digunakan untuk menghubungkan perangkat IoT ke cloud, di mana mereka dapat berkomunikasi dengan broker terpusat dan mengirim serta menerima data. Dengan metode ini, jumlah besar data yang dihasilkan oleh perangkat IoT dapat dikumpulkan dan diproses, kemudian digunakan untuk analisis data.
  • Pembaruan berita dan alerting
    Para pelanggan (subscribers) dapat menerima pembaruan berita dan peringatan secara real-time. Kasus penggunaan ini umumnya digunakan pada platform perdagangan saham, aplikasi berita, dan sistem tanggap darurat.
  • Distributed computing and microservices
    Pub/Sub dapat digunakan untuk membangun sistem terdistribusi dan arsitektur microservice di mana berbagai komponen aplikasi berkomunikasi secara terpisah, memungkinkan skalabilitas dan fleksibilitas.
  • Event-driven architectures
    Pub/Sub mendukung arsitektur berbasis event, di mana berbagai komponen aplikasi merespons tindakan yang diambil oleh komponen lain. Ini memberikan fleksibilitas dalam desain aplikasi dan menyederhanakan alur kerja yang kompleks.
  • Decoupling components and reducing dependencies
    Pub/Sub dapat mendekupling dan mengurangi ketergantungan antara komponen aplikasi, memudahkan pemeliharaan aplikasi dari waktu ke waktu.
  • Fan-out and Fan-in processing
    Proses mengirimkan satu pesan secara bersamaan kepada banyak pelanggan disebut fan-out processing. Hal ini digunakan dalam mendistribusikan data atau peristiwa kepada banyak consumer.
  • Refreshing distributed caches
  • Load balancing for reliability
  • Event notifications
  • Distributed
  • Real-Time Sensor Updates
  • Real-Time Health Analytics
  • etc.

Redis

Redis adalah database yang menyimpan data langsung di dalam memori tanpa menyimpannya di disk. Hal ini penting untuk diingat ketika menggunakan Redis. Jika sistem tempat Redis berjalan kehilangan daya, Anda akan kehilangan data yang disimpan di memori. Meskipun begitu, Redis menyimpan data melalui snapshot, sehingga hanya data baru sejak snapshot terakhir yang hilang. Inilah sebabnya mengapa banyak orang sering mengatakan bahwa Anda sebaiknya menggunakan Redis sebagai Cache untuk menyimpan data. Meskipun begitu, Redis memiliki mekanisme persistensi, jadi tidak sepenuhnya benar bahwa semua data hilang, baca lebih lanjut tentang hal ini di https://redis.io/

Redis Pub/Sub

Redis Pub/Sub adalah mekanisme Publish dan Subscribe dalam Redis Server. Di sini, para pengirim (publishers) tidak diprogram untuk mengirim pesan langsung ke para penerima (subscribers). Sebagai gantinya, mereka mengirim pesan-pesan ini ke dalam channel, di mana setiap channel berisi informasi tentang topik tertentu.

Para pengirim tidak mengetahui siapa para penerima, dan begitu juga sebaliknya, para penerima tidak mengetahui siapa para pengirim. Redis Pub/Sub dilengkapi dengan serangkaian perintah yang diperlukan untuk mempublikasikan pesan dan membaca pesan-pesan tersebut dengan berlangganan (subscribe) ke channel-channel tersebut.

Redis Pub/Sub bahkan menyediakan perintah-perintah untuk berlangganan ke banyak channel melalui seleksi pola tertentu dan berhenti berlangganan (unsubscribe) dari lebih dari satu channel yang mengikuti pola tertentu. Redis juga mendukung mekanisme Pub/Sub bahkan di dalam shard-nya, yaitu dalam Redis Cluster.

Cara untuk subscribe ke sebuah channel adalah dengan menggunakan perintah SUBSCRIBE.

SUBSCRIBE news_kompas

Untuk publish pesan ke sebuah channel gunakan perintah PUBLISH.

PUBLISH news_kompas "Pemerintah diminta berhati-hati menyikapi kontroversi keberadaan Pondok Pesantren Al Zaytun.'"

Untuk penggunaan perintah lain seperti UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB NUMPAT, PING, dll, silahkan baca dokumentasi Redis https://redis.io/docs/interact/pubsub ya, harusnya gampang dipahami kok.

Delivery semantics

Redis Pub/Sub menggunakan at-most-once message delivery semantics. Ini berarti bahwa sebuah pesan akan dikirimkan paling banyak sekali, Setelah pesan dikirim oleh server Redis, tidak ada kesempatan untuk mengirimkannya lagi. Jika penerima pesan (subscriber) tidak dapat meng-handle pesan tersebut (misalnya, karena terjadi kesalahan atau putusnya koneksi jaringan), pesan tersebut akan forever lost. hahaha

Jika aplikasi kita membutuhkan delivery guarantees, alangkah baiknya mempertimbangkan penggunaan Redis Streams. Pesan dalam streams akan disimpan (persistance), dan mendukung kedua jenis pengiriman, yaitu at-most-once dan at-least-once delivery semantics.

Redis Pub/Sub vs Redis Streams

Redis Pub/Sub adalah mekanisme pengiriman pesan Publish dan Subscribe, sedangkan Redis Streams adalah Struktur Data yang menyimpan pesan atau peristiwa yang masuk. Ini dapat dilihat sebagai data time series yang merupakan daftar penambahan saja (append only). Dalam Redis Pub/Sub, terdapat Publisher yang mempublikasikan pesan ke sebuah channel dan ada Subscriber yang berlangganan ke channel tersebut untuk menerima pesan-pesan tersebut. Pada Redis Streams, terdapat Stream Producer yang menambahkan pesan/data ke stream, di mana setiap entry message dalam Redis Stream memiliki ID unik, dan ada Consumer Groups yang mengonsumsi pesan-pesan ini dengan menggunakan ID unik mereka.

Perbedaan utama lainnya antara Redis Streams dan Redis Pub/Sub adalah retensi pesan. Mari kita bayangkan situasi di Redis Pub/Sub, di mana seorang Publisher mempublikasikan pesan ke channel. Misalkan klien tidak terhubung pada saat itu, yaitu sedang offline, maka Subscriber tidak akan menerima pesan ini. Sementara di Redis Streams, jika klien tidak terhubung dan pesan dikirim, Subscriber akan menerima pesan tersebut ketika mereka terhubung kembali. Ini karena Redis Pub/Sub tidak menyimpan notifikasi, artinya, tidak menyimpan pesan-pesan. Data dikirimkan langsung ke semua Subscriber yang terhubung. Seperti mengikuti prinsip “fire-and-forget”. Di sisi lain, Streams menyimpan pesan-pesan mereka di dalam memori, karena mereka adalah Struktur Data Redis, oleh karena itu mereka menyimpan pesan-pesan di dalam memori mereka untuk dikonsumsi nanti oleh kelompok-kelompok konsumen/Subscriber.

Redis Pub/Sub dipilih ketika diperlukan semantik pengiriman at-most-once. Ini umumnya digunakan dalam aplikasi seperti pesan real-time, notifikasi, siaran, dll. Redis Streams dipertimbangkan dalam situasi di mana diperlukan semantik pengiriman at least once atau at most once. Aplikasinya termasuk penyimpanan data time-series, data streaming, pra-pemrosesan data, dalam aplikasi di mana data terlalu besar untuk muat di dalam memori, agregasi data, dll.

Studi kasus

Kita ambil contoh sederhana pada sistem toko online katakanlah sammi-shop.com.
Ketika ada seorang pembeli melakukan pemesanan barang melalui aplikasi. Pesanan tersebut diproseslah oleh “seller service”, setelah di proses selanjutnya “seller service” mengirimkan event “order processing”. daaaan temen-temen bisa liat sendiri alurnya pada gambar di bawah ini.

Jadi flow nya kira-kira:

Order service create order
Order service sent event ‘AwaitingConfirmation
Seller service received event ‘AwaitingConfirmation’ and process it
Seller service sent event ‘OrderProcessing
Seller service sent event ‘OrderShipped
Shipping server received event ‘OrderShipped’ and process it
Shipping server sent event ‘OrderArrived
Order service listen all events order status

Struktur Project

docker-compose.yml tugasnya disini cuma bikin redis container

disini saya cuma akan menampilkan isi order service, supaya artikel ini tidak terlalu panjang

order-service (main.go)

package main

import (
"encoding/json"
"fmt"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"log"
"math/rand"
"net/http"
"sync"
)

func main() {
redisClient := ConnectRedis()

go redisClient.Subscribe(OrderProcessing.SubscriberName(), OrderProcessing.ChannelName())
go redisClient.Subscribe(OrderShipped.SubscriberName(), OrderShipped.ChannelName())
go redisClient.Subscribe(OrderArrived.SubscriberName(), OrderArrived.ChannelName())
go redisClient.Subscribe(OrderUnderComplaint.SubscriberName(), OrderUnderComplaint.ChannelName())
go redisClient.Subscribe(OrderCompleted.SubscriberName(), OrderCompleted.ChannelName())
go redisClient.Subscribe(OrderCancelled.SubscriberName(), OrderCancelled.ChannelName())

r := chi.NewRouter()

r.Get("/", func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "index.html")
})

r.Get("/orders", func(w http.ResponseWriter, r *http.Request) {
orders := make([]Order, 0)
orderMap.Range(func(key, value interface{}) bool {
order := value.(Order)
orders = append(orders, order)
return true
})

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(orders)
})

r.Get("/orders/{orderId}/status", func(w http.ResponseWriter, r *http.Request) {
orderIDStr := chi.URLParam(r, "orderId")
orderID, err := uuid.Parse(orderIDStr)
if err != nil {
http.Error(w, "Invalid order ID", http.StatusBadRequest)
return
}

order, ok := getOrder(orderID)
if !ok {
http.Error(w, "Order not found", http.StatusNotFound)
return
}

if order.Status == OrderCompleted || order.Status == OrderCancelled {
http.Error(w, "Order has been completed or cancelled", http.StatusBadRequest)
return
}

// Set headers to enable Server-Sent Events
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

// Create a new channel to notify when the order status changes
statusChange := make(chan OrderStatus)

// Register the new channel with the client's order ID
clients.Store(orderID, statusChange)
defer clients.Delete(orderID)

// Start an infinite loop to check for status changes and send them to the client
for {
select {
case newStatus := <-statusChange:
// Send the new status to the client
_, _ = fmt.Fprintf(w, "data: %s\n\n", newStatus.HumanReadable())
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
}

if newStatus == OrderCompleted || newStatus == OrderCancelled {
close(statusChange)
clients.Delete(orderID)
log.Printf("Channel for order %s closed.\n", orderID)
return
}

case <-r.Context().Done():
// If the client closes the connection, stop the loop
return
}
}
})

r.Post("/orders", func(w http.ResponseWriter, r *http.Request) {
orderId := generateRandomOrder()
redisClient.PublishOrderStatus(orderId, AwaitingConfirmation.ChannelName())
_, _ = fmt.Fprintf(w, "Order created with ID: %s", orderId)
})

log.Println("Order service is running on port 8080...")
_ = http.ListenAndServe(":8080", r)
}

type OrderStatus string

func (os OrderStatus) ChannelName() string {
return string(os)
}

func (os OrderStatus) SubscriberName() string {
return fmt.Sprintf("%sSubscriber", os.ChannelName())
}

func (os OrderStatus) HumanReadable() string {
switch os {
case AwaitingConfirmation:
return "Pembayaran telah terverifikasi, menunggu Penjual menerima pesanan."
case OrderProcessing:
return "Penjual telah menerima pesanan, orderan kamu dalam tahap pengemasan."
case OrderShipped:
return "Pesanan kamu dalam pengiriman oleh jasa kurir."
case OrderArrived:
return "Pesanan kamu telah sampai alamat tujuan, mohon periksa nama penerima pesanan kamu apabila kamu tidak menerima barang tersebut."
case OrderUnderComplaint:
return "Pesanan kamu mengalami masalah dan kamu telah mengajukan komplain."
case OrderCompleted:
return "Kamu telah melakukan konfirmasi barang diterima, dan dana akan diteruskan ke penjual."
case OrderCancelled:
return "Pesanan kamu telah dibatalkan penjual karena suatu alasan."
default:
return "Status Pesanan Tidak Diketahui"
}
}

const (
AwaitingConfirmation OrderStatus = "AwaitingConfirmation"
OrderProcessing OrderStatus = "OrderProcessing"
OrderShipped OrderStatus = "OrderShipped"
OrderArrived OrderStatus = "OrderArrived"
OrderUnderComplaint OrderStatus = "OrderUnderComplaint"
OrderCompleted OrderStatus = "OrderCompleted"
OrderCancelled OrderStatus = "OrderCancelled"
)

type Order struct {
ID uuid.UUID
Items []string
TotalPrice int64
Status OrderStatus

HumanReadableStatus string
}

func (o *Order) changeStatus(status OrderStatus) {
o.Status = status
o.HumanReadableStatus = status.HumanReadable()
}

func (o *Order) save() {
log.Println("Last state of order status: ", o.Status)
orderMap.Store(o.ID, *o)
}

var orderMap = sync.Map{}

func generateRandomOrder() uuid.UUID {
orderID := uuid.New()

randomProductName := func() string {
products := []string{"Baju", "Celana", "Sepatu", "Topi", "Jaket"}
return products[rand.Intn(len(products))]
}

randomTotalPrice := func() int64 {
return int64(rand.Intn(100000))
}

orderMap.Store(orderID, Order{
ID: orderID,
Items: []string{randomProductName(), randomProductName()},
TotalPrice: randomTotalPrice(),
Status: AwaitingConfirmation,
HumanReadableStatus: AwaitingConfirmation.HumanReadable(),
})

return orderID
}

var clients = sync.Map{}

func processOrderStatus(orderID uuid.UUID, status OrderStatus) {
order, exists := getOrder(orderID)
if !exists {
log.Printf("Order %s not found.\n", orderID)
return
}

switch status {
case AwaitingConfirmation:
order.changeStatus(AwaitingConfirmation)
case OrderProcessing:
order.changeStatus(OrderProcessing)
case OrderShipped:
order.changeStatus(OrderShipped)
case OrderArrived:
order.changeStatus(OrderArrived)
case OrderUnderComplaint:
order.changeStatus(OrderUnderComplaint)
case OrderCompleted:
order.changeStatus(OrderCompleted)
case OrderCancelled:
order.changeStatus(OrderCancelled)
default:
log.Printf("Invalid order status %s.\n", status)
return
}

order.save()

// Notify clients about the status change
statusChange, found := clients.Load(orderID)
if found {
log.Printf("Notifying clients about order %s status change.\n", orderID)
statusChange.(chan OrderStatus) <- order.Status
log.Printf("Clients notified about order %s status change.\n", orderID)
}
}

func getOrder(orderID uuid.UUID) (Order, bool) {
val, found := orderMap.Load(orderID)
if !found {
fmt.Printf("Order %s not found.\n", orderID)
return Order{}, false
}

order, ok := val.(Order)
if !ok {
fmt.Printf("Invalid data format for order %s.\n", orderID)
return Order{}, false
}

return order, true
}

Dapat dilihat dari kode diatas, order-service nge-subsribe ke masing-masing events dan ada juga ada beberapa endpoint untuk diakses dari frontend, dan SSE (Server sent event) untuk ambil secara realtime order status dari sebuah order.

order-service (pubsub.go)

package main

import (
"context"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"log"
)

func (rc *RedisClient) Subscribe(subscriber string, channels ...string) {
log.Printf("Subscribing to channel 👉 %s with subscriber name 👉 [%s]", channels, subscriber)

go func() {
pubsub := rc.rc.Subscribe(context.Background(), channels...)
defer func(pubsub *redis.PubSub) {
err := pubsub.Close()
if err != nil {
log.Printf("Error closing pubsub: %v", err)
}
}(pubsub)

for {
msg, err := pubsub.ReceiveMessage(context.Background())
if err != nil {
log.Printf("Error receiving message from channel %s: %v", channels, err)
continue
}

orderID, err := uuid.Parse(msg.Payload)
if err != nil {
log.Printf("Error parsing orderID: %v", err)
continue
}

processOrderStatus(orderID, OrderStatus(msg.Channel))
}
}()
}

func (rc *RedisClient) PublishOrderStatus(orderID uuid.UUID, channel string) {
rc.rc.Publish(context.Background(), channel, orderID.String())
}

Kode diatas cuma sebagai wrapper dari method publish dan subscribe yang disediakan oleh library redis-go.

order-service (redis.go)

package main

import (
"context"
"github.com/redis/go-redis/v9"
"log"
)

type RedisClient struct {
rc *redis.Client
}

func ConnectRedis() *RedisClient {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})

_, err := rdb.Ping(ctx).Result()
if err != nil {
log.Fatalf("Error connecting to redis: %v", err)
}

return &RedisClient{rc: rdb}
}

Function ConnectRedis akan membua koneksi ke redis-server

seller-service (pubsub.go)

package main

import (
"context"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"log"
"time"
)

func (rc *RedisClient) Subscribe(subscriber string, channels ...string) {
log.Printf("Subscribing to channel 👉 %s with subscriber name 👉 [%s]", channels, subscriber)

go func() {
pubsub := rc.rc.Subscribe(context.Background(), channels...)
defer func(pubsub *redis.PubSub) {
err := pubsub.Close()
if err != nil {
log.Printf("Error closing pubsub: %v", err)
}
}(pubsub)

for {
msg, err := pubsub.ReceiveMessage(context.Background())
if err != nil {
log.Printf("Error receiving message from channel %s: %v", channels, err)
continue
}

orderID, err := uuid.Parse(msg.Payload)
if err != nil {
log.Printf("Error parsing orderID: %v", err)
continue
}

time.Sleep(time.Second * 3)
rc.PublishOrderStatus(orderID, OrderProcessing.ChannelName())
log.Printf("Order %s processing...", orderID)

time.Sleep(time.Second * 3)
rc.PublishOrderStatus(orderID, OrderShipped.ChannelName())
log.Printf("Order %s shipped...", orderID)
}
}()
}

func (rc *RedisClient) PublishOrderStatus(orderID uuid.UUID, channel string) {
rc.rc.Publish(context.Background(), channel, orderID.String())
}

Kode diatas dibuat delay 3 detik untuk mensimulasikan order processing dan shipped sebelum status order di kirim ke subscribers.

Kemudian yang terakhir untuk index.html untuk menampilkan list orders

<!DOCTYPE html>
<html>
<head>
<title>Order Management System</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-gray-100 min-h-screen">
<div class="mx-auto w-full max-w-7xl py-8">
<h1 class="text-4xl font-bold mb-4">Order Management System</h1>

<button class="bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded" onclick="createOrder()">
Create Order
</button>

<h2 class="text-2xl font-bold mt-8">List of Orders</h2>
<div id="orderList" class="flex flex-wrap items-center justify-center gap-2 mt-4">
</div>
</div>

<script>
function createOrder() {
fetch('/orders', { method: 'POST' })
.then(response => response.text())
.then(orderId => {
console.log('Order created with ID:', orderId);
// Automatically update the list of orders
getOrderList();
})
.catch(error => console.error('Error creating order:', error));
}

function renderOrderList(orders) {
const orderListElement = document.getElementById('orderList');
orderListElement.innerHTML = '';

orders.forEach(order => {
const card = document.createElement('div');
card.className = 'bg-white w-72 h-64 w-full rounded-lg shadow-md p-4';
card.innerHTML = `
<p class="font-bold text-gray-800">${order.ID}</p>
<p class="text-gray-600">Items: ${order.Items.join(', ')}</p>
<p class="text-gray-600">Total Price: ${order.TotalPrice}</p>
<p class="text-gray-600 break-all">Order Status: <span class="text-blue-700 text-sm" id="orderStatus_${order.ID}">${order.HumanReadableStatus}</span></p>
`;
orderListElement.appendChild(card);
getOrderStatus(order.ID);
});
}

function updateOrderStatus(orderId, newStatus) {
const orderStatusElement = document.getElementById(`orderStatus_${orderId}`);
orderStatusElement.innerText = newStatus;
}


function getOrderStatus(orderId) {
const eventSource = new EventSource(`/orders/${orderId}/status`);

eventSource.onmessage = function(event) {
const newStatus = event.data;
updateOrderStatus(orderId, newStatus);
};

eventSource.onerror = function(event) {
eventSource.close();
console.error('Error getting order status:', event);
};
}

function getOrderList() {
fetch('/orders')
.then(response => response.json())
.then(orders => renderOrderList(orders))
.catch(error => console.error('Error fetching order list:', error));
}

// Fetch and display the list of orders when the page loads
getOrderList();
</script>
</body>
</html>

Demo

Kita jalankan semua servicenya terlebih dahulu

Buka di localhost:8080

Kita bisa lihat log untuk masing-masing service nya

Terimakasih telah membaca, semoga bermanfaat ya

--

--