Redis Pub/Sub 是什麼、會造成什麼問題呢?

簡單來說,接收者(Sub)先去訂閱(Command: SUBSCRIBE)特定 “通道(Channel)”,而內容提供者 (Pub),再將訊息推送(Command: PUBLISH)到這一個 “通道(Channel)”,此時所有有訂閱這個 “通道(Channel)” 的接收者(Sub),都會收到這個訊息。

Jerry’s Notes
What’s next?
9 min readOct 10, 2022

--

Redis Pub/Sub 運行方式,請參考下圖:

Redis Pub/Sub 是什麼?

Redis version 2 開始提供 Redis Pub/Sub 的功能,它是一種消息傳送的模式,由發佈者(Pub)發送消息,訂閱者(Sub)接收消息,發佈者(Pub)發送消息到一個特定的一個chanel(通道)中,而不是發送給指定的訂閱(Sub)者,發佈者(Pub)也不知道可能有哪些訂閱者(Sub),並在 Redis version 7 之後,提供 Sharded Pub-Sub 的功能,讓在叢集模式(cluster mode enable)下,無需跨分片傳播通道信息,從而提高了可擴展性。

  • Pub: Publish 發佈者
  • Sub: Subscribe 接收者

訂閱者(Sub)可以訂閱一個、或多個channel,只接收來自訂閱的channel的消息,並且不知道有哪些發佈者(Pub),這種模式,訊息提供者(Pub)以及訊息接收者(Sub)之間,是沒有直接關係的。但是 Pub/Sub 的內容,是不會使用鍵值的方式去保存的,所以這邊訊息自然也無法持久化。此外,Pub/Sub 也跟所在的與數據庫無法,例如發佈者(Pub)在 db2 上發送消息,而訂閱者(Sub)在 db db1 也是能夠收到訊息的。

What is Redis Pub/Sub?

SUBSCRIBE, UNSUBSCRIBE and PUBLISH implement the Publish/Subscribe messaging paradigm where (citing Wikipedia) senders (publishers) are not programmed to send their messages to specific receivers (subscribers). Rather, published messages are characterized into channels, without knowledge of what (if any) subscribers there may be. Subscribers express interest in one or more channels, and only receive messages that are of interest, without knowledge of what (if any) publishers there are. This decoupling of publishers and subscribers can allow for greater scalability and a more dynamic network topology.

Pub/Sub 使用上的限制

  • Pub/Sub 的內容,是不會使用鍵值的方式去保存的,所以這邊訊息自然也無法持久化,所以數據丟失是最大的問題。
  • 訂閱者(Sub)連線中斷時,當時該 Redis Client 已取得的數據就會丟失。
  • Redis Server 端當機,因為不是以鍵值的方式保存,所以數據會丟失。
  • COB 訊息堆積時,緩衝區(buffer)溢出,訂閱者(Sub)會被強制踢下線,數據也會丟失。
  • client-output-buffer-limit pubsub 32mb 8mb 60。
  • 32mb:緩衝區(buffer)一旦超過 32MB,Redis 直接將訂閱者(Sub)會被強制踢下線。
  • 8mb + 60:緩衝區超過 8MB,並且持續 60 秒,Redis 也會把訂閱者(Sub)會被踢下線。

引用 `作者:劉Java` 的文章中的說明。

Redis pubsub_channels是壹個dict字典結構,key(數組元素)為channel,value就是某個client。當客戶端訂閱某壹個頻道之後,Redis 就會往 pubsub_channels 這個字典中新添加壹條channel和client數據,不同的client可以訂閱相同的channel,client以鏈表的方式串聯起來,這樣就能保存多個client對同壹個channel的關系,非常的巧妙。

作者:劉Java
鏈接:https://juejin.cn/post/7112434646851584013

Pub/Sub 使用上的考量重點

  • 基於Pub/Sub 的內容,是不會使用鍵值的方式去保存的,所以重要的數據不適合用這個方式來傳遞。
  • 發佈者(Pub)發送消息後,並不的管訂閱者(Sub)是否有接收消息,也沒有 ACK 機制,所以無法確保訂閱者(Sub)一定有接收到訊息,故不建議用於消息可靠性要求高的場景中。
  • 訂閱者(Sub)會佔用一個 Redis 服務器端的連線(Connection),所以要注意在服務器端,是否有佔用過多的連線(Connection)。
  • 當發佈者(Pub)發送 “大量” 訊息,若訂閱者(Sub)來不及耗化的話,數據會阻塞在通道channel中,阻塞時間越久,數據丟失的風險越大,當訊息堆積時過量後,造成緩衝區(buffer)溢出,就會導致數據丟失、或是客戶端連線主動被 Redis 端中斷的行為。

3158:M 22 May 2024 04:15:26.063 # Closing client due to COB limit

3158:M 22 May 2024 04:15:26.064 # Client id=791120 addr=10.31.2.182:10344 laddr=10.31.15.188:6379 fd=35 name= age=28 idle=28 flags=P db=0 sub=0 psub=0 ssub=1 multi=-1 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=0 rbp=0 obl=0 oll=0 omem=33595110 tot-mem=33596710 events= cmd=ssubscribe user=default redir=-1 resp=3 scheduled to be closed ASAP for overcoming of output buffer limits.

從上面的 Redis.log 上,可以很明確的了解,客戶端 `Client id=791120 addr=10.31.2.182:10344` 這一條連線,因為 buffer `omem=33595110` 滿了,而被 Redis Server 主動去中止這條連線。

  • 當發佈者(Pub)發送 “大量” 訊息,在 “Redis 多節點” 的架構下,也會造成節點負載增加及 pub/sub 廣播風暴 的問題。 ← !!! 要小心。
  • 訂閱者(Sub)需要實做重新 subscribe 的機制,來重新繼續接收消息。

Sharded Pub-Sub

Sharded Pub/Sub 重要改善

  • 可以跨分片組通道(shard channels),來傳遞訊息,分片組通道會同分配鍵值的方式一樣,將特定”分片組通道”,交由特定的分片組(shard/slot)來處理,所以客戶端(Redis Client)就只會到該分片組上的節點,來取得訂閱的內容。

Shard channels are assigned to slots by the same algorithm used to assign keys to slots.
A shard message must be sent to a node that own the slot the shard channel is hashed to.

  • 優點是,當發佈者(Pub)去 SPUBLISH 傳送內容到 “分片組通道(shard channels)” 時,就只會在特定的 “分片組(shard/slot)” 內來處理,而訂閱者(Sub)也必需到該分片組(shard/slot)節點,來訂閱的內容,這樣可以避免發佈者(Pub),傳遞內容到所有的分片組(shard/slot)上。
  • Redis7 開始提供三個新的命令 SSUBSCRIBE, SUNSUBSCRIBE, SPUBLISH 來達到 Sharded Pub/Sub 目的。

SSUBSCRIBE, SUNSUBSCRIBE and SPUBLISH are used to implement sharded Pub/Sub.

引用 `作者:平凯星辰` 的文章中的說明。

[+] 从Redis7.0发布看Redis的过去与未来-PingCAP | 平凯星辰:
https://cn.pingcap.com/article/post/6017.html

Redis的pubsub是按channel频道进行发布订阅,然而在集群模式下channel不被当做数据处理,也即不会参与到hash值计算无法按slot分发,所以在集群模式下Redis对用户发布的消息采用的是在集群中广播的方式。

那么问题显而易见,假如一个集群有100个节点,用户在节点1对某个channel进行publish发布消息,该节点就需要把消息广播给集群中其他99个节点,如果其他节点中只有少数节点订阅了该频道,那么绝大部分消息都是无效的,这对网络、CPU等资源造成了极大的浪费。

Sharded-pubsub便是用来解决这个问题,意如其名,sharded-pubsub会把channel按分片来进行分发,一个分片节点只负责处理属于自己的channel而不会进行广播,以很简单的方法避免了资源的浪费。

以上文檔中,也很明確地說明,在 Redis 7 以上,一般的 Redis Pub/Sub 會有 當大量寫入 時,會造成節點之間的 廣播風暴 問題,而造成 Redis 整體效能下降的問題。

建議做法: 改使用 Redis 7+ Sharded Pub/Sub 的功能。

What is Redis Pub/Sub improvement?

Redis has supported the publish-subscribe mechanism since 2.0. Users using the pubsub command family can establish a message subscription system. However, Redis pubsub has some problems in the cluster mode; the most significant of which is the broadcast storm brought by large-scale clusters.

Redis pubsub is published and subscribed by channel. However, channels are not treated as data processing in cluster mode. They do not participate in hash value calculation and cannot be distributed by slot. Therefore, Redis broadcasts messages to users in cluster mode.
The problem is clear. If a cluster has 100 nodes and users publish messages to a channel at node 1, the node needs to broadcast the messages to the other 99. If only a few of the other nodes subscribe to the channel, most of the messages are invalid, which causes waste to the network, CPU, and other resources.
Sharded-pubsub is used to solve this problem. It distributes channels by shards. A shard node is only responsible for processing its channels rather than broadcasting them, which simply avoids the waste of resources.

What is Redis Sharded Pub/Sub?

From 7.0, sharded Pub/Sub is introduced in which shard channels are assigned to slots by the same algorithm used to assign keys to slots. A shard message must be sent to a node that own the slot the shard channel is hashed to. The cluster makes sure the published shard messages are forwarded to all nodes in the shard, so clients can subscribe to a shard channel by connecting to either the master responsible for the slot, or to any of its replicas. SSUBSCRIBE, SUNSUBSCRIBE and SPUBLISH are used to implement sharded Pub/Sub.

Sharded Pub/Sub helps to scale the usage of Pub/Sub in cluster mode. It restricts the propagation of message to be within the shard of a cluster. Hence, the amount of data passing through the cluster bus is limited in comparison to global Pub/Sub where each message propagates to each node in the cluster. This allows users to horizontally scale the Pub/Sub usage by adding more shards.

Redis 2+ Pub/Sub 命令的測試記錄

SUBSCRIBE | PUBLISH | UNSUBSCRIBE | PSUBSCRIBE | PUNSUBSCRIBE

在 Redis 7.0.4 及 Redis 6.2.6 出來的結果是一樣的,

$  $(pwd)/src/redis-cli -h 127.0.0.1 -p 6381 info server | grep redis_version
redis_version:7.0.4
$ $(pwd)/src/redis-cli -h 127.0.0.1 -p 6381 info server | grep redis_version
redis_version:6.2.6
### SUBSCRIBE SUBSCRIBE channel [channel ...]
subscribe: means that we successfully subscribed to the channel given as the second element in the reply. The third argument represents the number of channels we are currently subscribed to.
客戶端使用 SUBSCRIBE channel [channel ...] 命令訂閱通道,可以多次執行該命令,也可以一次訂閱多個通道,多個客戶端可以訂閱相同的通道。## 訂閱2個通道(channel): shard01-primary-node (相同分片組上的主節點)
127.0.0.1:6381> SUBSCRIBE chat-room-a chat-room-b
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "chat-room-a"
3) (integer) 1
1) "subscribe"
2) "chat-room-b"
3) (integer) 2
1) "message" <----
2) "chat-room-a" <----
3) "send-to-aaaa" <----

## 訂閱2個通道(channel): shard01-replica-node (相同分片組上的從節點)
127.0.0.1:6382> SUBSCRIBE chat-room-a chat-room-b
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "chat-room-a"
3) (integer) 1
1) "subscribe"
2) "chat-room-b"
3) (integer) 2
1) "message" <----
2) "chat-room-a" <----
3) "send-to-aaaa" <----
## 訂閱2個通道(channel): shard02-primary-node (另一個主節點)
127.0.0.1:6383> SUBSCRIBE chat-room-a chat-room-b
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "chat-room-a"
3) (integer) 1
1) "subscribe"
2) "chat-room-b"
3) (integer) 2
1) "message" <----
2) "chat-room-a" <----
3) "send-to-aaaa" <----
------
### PUBLISH channel message
Posts a message to the given channel. In a Redis Cluster clients can publish to every node. The cluster makes sure that published messages are forwarded as needed, so clients can subscribe to any channel by connecting to any one of the nodes.
發送消息 xxxxx 到通道 "chat-room-a" 中。發佈一條消息。 127.0.0.1:6381> PUBLISH chat-room-a send-to-aaaa
(integer) 1
------
### UNSUBSCRIBE    UNSUBSCRIBE [channel [channel ...]] : 取消訂閱的 channel。
unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply. The third argument represents the number of channels we are currently subscribed to. When the last argument is zero, we are no longer subscribed to any channel, and the client can issue any kind of Redis command as we are outside the Pub/Sub state.

客戶端使用 UNSUBSCRIBE [channel [channel ...]] 命令取消訂閱指定的通道,可以指定一個或者多個取消的訂閱通道名稱,也可以不帶任何參數,此時將取消所有的訂閱的通道(不包括glob通道)。

127.0.0.1:6381> UNSUBSCRIBE chat-room-a chat-room-b
1) "unsubscribe"
2) "chat-room-a"
3) (integer) 0
------

### PSUBSCRIBE pattern [pattern ...]
Subscribes the client to the given patterns, and supported glob-style patterns:
• h?llo subscribes to hello, hallo and hxllo
• hllo subscribes to hllo and heeeello
• h[ae]llo subscribes to hello and hallo, but not hillo

Redis Pub/Sub 實現支持模式匹配。客戶端使用 PSUBSCRIBE pattern [pattern ...] 訂閱一個或多個glob 通道,這樣就能接收發送到通道名稱與給定模式匹配的通道的所有消息。

127.0.0.1:6379> PSUBSCRIBE a c
Reading messages... (press Ctrl-C to quit)

>> 訂閱符合規則的隊列
127.0.0.1:6381> PSUBSCRIBE chat-room-
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "chat-room-" <----
3) (integer) 1


127.0.0.1:6381> PUBLISH chat-room-b send-to-bbbb
(integer) 1
127.0.0.1:6381> PSUBSCRIBE chat-room-
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "chat-room-"
3) (integer) 1
1) "pmessage"
2) "chat-room-" <----
3) "chat-room-b" <----
4) "send-to-bbbb" <----
------

### PUNSUBSCRIBE PSUBSCRIBE pattern [pattern ...]
Unsubscribes the client from the given patterns, or from all of them if none is given.

取消符合 pattern 訂閱的 channel,客戶端使用 PUNSUBSCRIBE [pattern [pattern ...]] 退訂一個或多個glob 通道。也可以不帶任何參數,此時將取消所有的訂閱的通道(不包括非glob通道)。

127.0.0.1:6381> PUNSUBSCRIBE chat-room-*
1) "punsubscribe"
2) "chat-room-*" <----
3) (integer) 0
------

訂閱內容是有時間性的

  • 後訂閱的 "訂閱者(Sub)" 者,只會收到 "該時間點以後" 從 "發佈者(Pub)" 發佈 (PUBLISH | SPUBLISH) 的新內容,所以是無法取得先前的內容。
  • 所以若是 "線上對談(online chat)" 的服務,"發佈者(Pub)" 可以除了發佈訊息 (PUBLISH | SPUBLISH) 到特定"通道(channel)"外,也建議就訊息寫入一筆記錄,而新的客戶端就可以先讀取舊的訊息後,再透過訂閱 (SUBSCRIBE | SSUBSCRIBE) 特定"通道(channel)" 的方式,來取得即時的串流數據內容。

從下面測試中,右邊兩個”訂閱者(Sub)”,是後面加入的,所以只有 “發佈者(Pub)” 後面時段推送的內容。

Redis 7+ Sharded Pub/Sub 命令的測試記錄

## New commands for Sharded Pub/Sub:
SSUBSCRIBE shardchannel [shardchannel ...]
https://redis.io/commands/ssubscribe/
---
In a Redis cluster, shard channels are assigned to slots by the same algorithm used to assign keys to slots. Client(s) can subscribe to a node covering a slot (primary/replica) to receive the messages published. All the specified shard channels needs to belong to a single slot to subscribe in a given SSUBSCRIBE call, A client can subscribe to channels across different slots over separate SSUBSCRIBE call.
---
---->> 測試記錄 <<----
## 訂閱1個分片組通道(shard channels): shard01-primary-node (相同分片組上的主節點)
$ redis-cli -c -p 6381
127.0.0.1:6381> SSUBSCRIBE shard-chat-a
Reading messages... (press Ctrl-C to quit)
-> Redirected to slot [12115] located at 127.0.0.1:6383 <----從這邊可以得知該 "分片組通道(shard channels)" 是由 slot [12115] 來管理。
Reading messages... (press Ctrl-C to quit)
1) "ssubscribe"
2) "shard-chat-a"
3) (integer) 1
1) "smessage" <----
2) "shard-chat-a" <----
3) "aaaa" <----
## 訂閱1個分片組通道(shard channels): shard01-primary-node (相同分片組上的從節點)
$ redis-cli -c -p 6382
127.0.0.1:6382> SSUBSCRIBE shard-chat-a
Reading messages... (press Ctrl-C to quit)
-> Redirected to slot [12115] located at 127.0.0.1:6383 <----從這邊可以得知該 "分片組通道(shard channels)" 是由 slot [12115] 來管理。
Reading messages... (press Ctrl-C to quit)
1) "ssubscribe"
2) "shard-chat-a"
3) (integer) 1
1) "smessage" <----
2) "shard-chat-a" <----
3) "aaaa" <----
## 訂閱1個分片組通道(shard channels): shard02-primary-node (另一個分片組上的主節點)
$ redis-cli -c -p 6383
127.0.0.1:6383> SSUBSCRIBE shard-chat-a
Reading messages... (press Ctrl-C to quit)
1) "ssubscribe"
2) "shard-chat-a"
3) (integer) 1
1) "smessage" <----
2) "shard-chat-a" <----
3) "aaaa" <----
---->> 測試記錄 <<----
SPUBLISH shardchannel message
https://redis.io/commands/spublish/
---
In Redis Cluster, shard channels are assigned to slots by the same algorithm used to assign keys to slots. A shard message must be sent to a node that own the slot the shard channel is hashed to. The cluster makes sure that published shard messages are forwarded to all the node in the shard, so clients can subscribe to a shard channel by connecting to any one of the nodes in the shard.
---
---->> 測試記錄 <<----
$ $(pwd)/src/redis-cli -p 6381
127.0.0.1:6381> SPUBLISH shard-chat-a aaaa <--- 在非 cluster mode 模式下,用這個命令會出現 MOVED 的錯誤。
(error) MOVED 12115 127.0.0.1:6383
$ $(pwd)/src/redis-cli -c -p 6381 <--- 加 -c 改成用 cluster mode 的方式來連托。
127.0.0.1:6381> SPUBLISH shard-chat-a aaaa
-> Redirected to slot [12115] located at 127.0.0.1:6383
(integer) 3
127.0.0.1:6383> SPUBLISH shard-chat-b bbb
-> Redirected to slot [7984] located at 127.0.0.1:6382
(integer) 0
---->> 測試記錄 <<----
SUNSUBSCRIBE [shardchannel [shardchannel ...]]
https://redis.io/commands/sunsubscribe/
---
Unsubscribes the client from the given shard channels, or from all of them if none is given. When no shard channels are specified, the client is unsubscribed from all the previously subscribed shard channels. In this case a message for every unsubscribed shard channel will be sent to the client.
---
---->> 測試記錄 <<----
127.0.0.1:6383> SUNSUBSCRIBE shard-chat-a <--- 取消訂閱特定 “分片組通道(shard channels)” 的內容。
1) "sunsubscribe"
2) "shard-chat-a"
3) (integer) 0
---->> 測試記錄 <<----

最後,ElastiCache 也有提供一個 python 使用 Pub/Sub 的範本代碼,以下可以參考看看。

[+] Usage examples — Publish (write) and subscribe (read) from a Pub/Sub channel :

https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/ElastiCache-Getting-Started-Tutorials-Usage.html#ElastiCache-Getting-Started-Tutorials-pub-sub

--

--

Jerry’s Notes
What’s next?

An cloud support engineer focus on troubleshooting with customer reported issue ,and cloud solution architecture.