RabbitMQ — 訊息的如何插隊?(Priority)

Han
阿Han的圖文解字筆記
6 min readDec 23, 2020

雖然我們的一般的狀況下我們都希望訊息能夠按照順序被處理, 但有時候我們仍希望某些重要的訊息能夠優先被處理,也就是插隊的概念,正好RabbitMQ也有提供這樣需求的解決方案,以下是需要知道的幾個重點。

  • 宣告Queue的時候必須要設定 x-max-priority, 通常10就夠用了。
  • 數字越大優先序越高。
  • 一般不要設定太大,因為這算是特殊狀況,正常狀況下應該照順序來處理。
  • 沒有指定優先序的訊息會預設為0。
  • 生產端發送的訊息優先序若超過配置的最大值,則會以最大值做為優先序的標示。

配置範例

這邊會以管理界面作為範例, 其他配置方式則大同小異, 可以參考官方文件。

  • 首先進到管理界面的Queues,並加入新的Queue,並配置優先序最大號碼為20。
  • 參數的欄位填上: x-max-priority:20
  • 型態記得要選number。
  • 配置完我們的Queue會有Pri的標記。
  • 接著我們將Queue綁定到交換機上。

開始進行實驗

  • 實驗方向:依照訊息重要程度分為1~20個號碼, 雖然我們先送出一個19號的訊息,再送出20號的訊息,但正確堆積在佇列的訊息應該要確保20號的訊息優先於19號。
  • Producer端會送出1、5、3、20的訊息。
(async () => {
const conn = require("amqplib").connect(
"amqp://test:test@127.0.0.1:5672/test"
);
const channel = await (await conn).createChannel();
const priorityNums = [1, 5, 3, 20];
const routingKey = "priority";
const exchange = "qa_exchange_direct";
for (let i = 0; i < priorityNums.length; i++) {
const priority = priorityNums[i];
const msg = `Priority_${priority}`;
channel.publish(exchange, routingKey, Buffer.from(msg), {
priority,
});
console.log(`Send Message: ${msg}`)
}
})();
Send Message: Priority_1
Send Message: Priority_5
Send Message: Priority_3
Send Message: Priority_20
  • Comsumer端則會依照優先序接收20、5、3、1的訊息
(async() => {
const conn = require('amqplib').connect('amqp://inuqa:inuqa@127.0.0.1:5672/inuqa');
const channel = await (await conn).createChannel();
await channel.prefetch(1); // 最多能消費unacked的數量
channel.consume("優先序隊列", (msg) => {
setTimeout(() => {
console.log(msg.content.toString());
channel.ack(msg);
}, 1000)
}, {
noAck: false
})
})()
Priority_20
Priority_5
Priority_3
Priority_1

注意事項

  • RabbitMQ預設是不支援優先序的, 因為每個隊列的每個優先級都會有一些CPU跟磁碟的開銷成本。
  • 官方建議不要使用Policy的方式來配置, 因為Policy是動態的, 可以在Queue宣告後進行參數的修改, 但優先序隊列在宣告後就無法更改其數量。
  • 假設一開始Queue沒有指定優先序,後續要加上優先序會需要將原本的Queue移除再重新宣告,因此最好一開始規劃功能時就將這一塊給考慮進去!!!!!!!!!!

如何取得訊息中某個優先序號碼剩下的數量?

雖然有了優先序可以幫我們依照訊息的重要程度來分配工作,但就應用面上來看, 我們可能會需要去掌握某個優先序號碼在Queue裡面還剩下幾個未處理, 此時我們可以藉由Management Plugin的API來進行查詢, 查詢方式如下:

<http://$>{ip}/api/queues/${vhost}/${隊列名稱}
// <http://127.0.0.1:8083/api/queues/test/優先序隊列>

GET之後會得到以下的Response, 我們只需要取backing_queue_status.priority_lengths.${優先序號碼}, 就能知道剩餘數量有多少:

{
...,
"backing_queue_status": {
...,
"priority_lengths": {
"0": 0,
"1": 2,
"2": 0,
"3": 2,
"4": 0,
"5": 2,
"6": 0,
"7": 0,
"8": 0,
"9": 0,
"10": 0,
"11": 0,
"12": 0,
"13": 0,
"14": 0,
"15": 0,
"16": 0,
"17": 0,
"18": 0,
"19": 0,
"20": 2
},
...
},
...
}

--

--