[Python] RabbitMQ 訊息佇列-實作

PC Chen
程式乾貨
Published in
7 min readFeb 22, 2021

近期工作上常常聽到後端同學在使用 kafka, rabbitMQ,... 等服務,發現後端用的招還是挺多的,怎樣都學不完XD,就決定打個筆記來記錄一下實作的流程。

網路上關鍵字搜尋就有很多介紹,我們這篇不打算講得太複雜,先來講一下原理吧~
所謂的 Message Queue(MQ) 顧名思義就是把 訊息message 放到 佇列Queue,這裡的 Queue 就跟資料結構中一樣有先進先出FIFO的特性,那又是為什麼要把訊息放入 Queue 中呢?我們可以把它想成是「郵局」,作為「寄件者」與「收件者」之間訊息傳遞的橋樑,郵局的功用除了確認這些訊息準確地傳遞外,最大的好處就是傳遞失敗的時候訊息(郵件)還會保留在郵局中、不會消失,只要事後發現錯誤原因(EX. 地址填錯、門市關轉等),就可以重新傳遞郵件:)

使用情境

現在有一個訂單系統,開發邏輯是當某個使用者user下了訂單,就會在資料庫裡面寫一筆紀錄,可以把 user 提出的訂單當作是一種訊息,而後台的程式接收到了訊息就會把資訊寫到資料庫裡,架構像以下:

所有使用者一次把訂單灌入 database

這樣真的好嗎?

試想一下,假設某個商品是限時搶購的,代表一瞬間會有大量的使用者下訂單,這時候如果所有的訂單訊息一次灌入資料庫,這想必是承受不住的!這時候要怎麼解決的?

沒錯~就是 RabbitMQ,我們可以利用它先把大量灌入的訂單資訊先排進 Message Queue,接著再根據後台程式自己的效能控制,慢慢消化這些訂單、一筆一筆地寫入資料庫,減輕壓力!
另外也可以保證即使中間因為斷線或是訂單系統當機,也不會漏失掉任何訂單訊息,因為一開始就已經存在 Queue 裡面(郵局)了,即使中間有任何硬體問題(EX. 資料庫斷線、訂單系統當機等),訂單訊息也不會遺失,可以事後修正好後台程式,再把沒消化的訂單記錄好就好,是不是很讚呢!

RabbitMQ 當作中間傳遞媒介,讓後端程式可以慢慢消化訂單

安裝與啟動 RabbitMQ

以下說明環境針對 macOS(windows用戶抱歉啦XD),使用 homebrew 做快速安裝:

$ brew update
$ brew install rabbitmq

讓子彈飛一會兒~等待安裝完成後將 /usr/local/sbin 添加到 ~/.bash_profile 中,這樣之後才能直接用終端指令啟動 rabbitmq:

$ vim ~/.bash_profile$ export PATH=$PATH:/usr/local/sbin## :wq儲存後退出,重新啟動 bash_profile
$ source ~/.bash_profile

接著接著在終端機輸入下面指令,就可以啟動 rabbitmq 囉:

$ rabbitmq-server

不放心的話,可以在瀏覽器輸入 localhost:15672 ,查看網頁版管理介面,帳號密碼全部都輸入「guest」就可以登入:

rabbirmq 網頁管理頁面

開始實作

準備工作

我們需要先在本地的 mysql 建一張表 tb_order 在 db test 之中,紀錄邏輯是只要有使用者下訂單,就記一筆 user_id 到表中(就是這麼簡單~)

首先

我們先來看看一個完全沒有使用 MQ 的範例,模擬有 100個 user 迴圈來訂單下統下訂單。迴圈 for i in range(1,100000) 數字 1~100 代表每個 user分別進來,我們一樣把數字輸入 sha-256函數 後、代表每個 user_id。
P.S 如果對 sha-256函數 不熟的讀者可以參考這篇,也有應用到~

給定一切順利?BUT…

假設資料庫因為網路連線不穩、或其他因素導致斷線,我們在 for loop 裡面加一個隨機斷線因子,作為模擬。
執行完就發現如果直接做塞入資料庫,中間如果斷線,所有使用者的 user_id 都會遺失,這樣子等於訂單資訊都丟光光了,實在母湯!

Message Queue 架構

整體架構面很單純,我們就是要在 user_id 訂單訊息 塞入資料庫 之間多一個 MQ 的中間層。過程中我們會需要用到 pika 這個套件,幫助我們連進安裝好的本地 RabbitMQ,具體實作程式如以下:

send.py : 將收到的 數字i 轉換成 user_id,並送入 MQ

request.py : 模擬 1~100 的 user 發出 send.py

receive.py : 消化 request.py 送過來的訊息,並插入資料庫

send.py

執行結果如以下:

這個時候可以用指令 $ rabbitmqctl list_queues 檢查,發現真的多了一個長度為 1 的佇列:hello

request.py

可以看到程式就是把 1~100 當作參數輸入到 send.py,執行結果如以下:

這時候一樣用$ rabbitmqctl list_queues 檢查,發現 hello 這個佇列長度真的變成 100 了:

receive.py

我們一樣在 insert_into_db 這個函式中塞入隨機斷線因子,執行結果為以下:

可以看到中間因為有模擬斷線,訊息只有部分被消費就結束了,這時候我們一樣檢查一下 $ rabbitmqctl list_queues ,發現有 hello queue有部分訊息被消費了,長度只剩下 92:

所以關鍵是?

讀者可能會疑惑:「咦那這樣跟一開始直接塞差在哪邊?」,其實仔細觀察可以發現雖然斷線了,但其他未被消費的 92 筆訂單資訊仍被存在 Message Queue 中,因此我們只要繼續執行 receive.py 、直到所有訊息都被消耗完就好了~

如此一來就達到我們使用 RabbitMQ 的初衷,在使用者把訂單灌入資料庫前,我們就先存放在 Message Queue 中,並由另外的程式控制如何慢慢消化這些訂單、如果斷線遺失也不會把所有訂單都丟掉,一舉兩得!

再執行一次 receive.py
又消費了 2 筆~

延伸閱讀

我們可以手動一直跑 $ python3 receive.py 直到 hello 這個 MQ 的長度變為 0為止,就代表訂單消費完了。也可以寫一另外一隻 .py 運行 receive.py 直到 MQ 長度為 0,這部分就交給讀者自行開發囉~相關程式都放在我的 github 上,有興趣的讀者可以抓下來實作。

--

--

PC Chen
程式乾貨

喜歡接觸與動手實作各種軟體技術的後端數據工程師 A data- backend engineer who is enthusiastic in learning and implementing any techniques in software engineering.