Go: Concurrency Patterns — Pipeline

Eric G. Huang
6 min readAug 9, 2019

--

這次我們要聊聊的是在 Go concurrency 程式設計裡一個很有用的 pattern — Pipeline

什麼是 Pipeline

那麼什麼是 Pipeline 呢? 顧名思義就是由一些管子塔建起的線路,工業上會用這樣子技術來運輸水、油或者是瓦斯,而在軟體工程裡我們也就借用了這樣的一個名詞來運輸我們的 “資料”,在用這樣子的概念在開發時很重要的一個流程就是: 接收資料 -> 處理資料 -> 傳回資料,而這樣跟一般 function 有什麼不一樣呢?一般的 function 接收的資料跟傳回的資料不一定是一樣的,而因為我們要運送、傳輸 “資料”,所以接收的資料跟傳回的資料會是一樣的型態,function 就好像是前面提到工業上管線跟管線之間的中繼站一樣,下面我們看點簡單的例子

上面這個簡單的例子我們可以看到兩個 function

multiply 傳入一個 []int 的 slice 與要相乘的數字,建立一個與傳入 slice 一樣大小新的 slice,把傳入 slice 的每個數字都乘上要相乘的數字賦值給新建立的 slice,並回傳新建立的 slice

add 傳入一個 []int 的 slice 與要相加的數字,建立一個與傳入 slice 一樣大小新的 slice,把傳入 slice 的每個數字都乘上要相加的數字賦值給新建立的 slice,並回傳新建立的 slice

接著我們可以看到第28行 add(multiply(num, 2), 1)

裡面的 function 會先執行而後將結果丟回給外面的 function 做傳入值,我們的資料流會像是這個樣子

data([]int) -> stage(multiply function) -> data(處理後) -> stage(add function)

有沒有就像是一開始說的資料是以一種流動的方式在進行

所以第28行的程式口語來說就會是,將一個 slice []int 傳給 multiply function 將每個 slice 裡的數字乘上要相乘的數字,接著將處理後的結果回傳出去給 add function 當做輸入參數,再將每個 slice 裡的數字加上要相加的數字,回傳出去

結果會是這樣:

3
5
7

使用 Channel

上面的做法其實會有幾個問題存在

  • 每進去一次 function 都會複製一個新的 slice 出來,對 memory 佔用是一個問題
  • 每次都要將整個 slice 做完相加或相乘才能再到下一個 function 做處理,無法平行執行

圖1:

我們再來看一下上面這張圖(圖1),每個 gopher 把整堆書都做完處理了,才再把整堆書給下一個 gopher 做處理,但我們希望更有效率的做這件事,即每個 gopher 對一本書做完處理就直接把這本書丟給下一個 gopher 做處理,接著繼續處理手上的下一本書,依序做完

我們希望資料能夠像下圖(圖2)這樣子流動,會讓整個 process 有效率一些

圖2:

圖中會看到除了 multiply 跟 add 還多了一個 generator,多了一個 generator 的原因是我們需要一個地方用來建立 channel 並且把 slice 內的數字依序的透過這個 channel 來傳遞資料,我們來看一下怎麼用 channel 來幫助我們達到這樣子的效果

我們來對上面程式碼做幾點重點整理

generator(line:7 to 23)

首先來看一下它的 function definition

generator := func(done <-chan int, nums []int) <-chan int

這個 function 主要是要將傳入的 slice 依序的傳入 channel,並把這個 channel 給傳出去讓其他的 function 使用

multiply(line:25 to 39)

一樣來看一下它的 function definition

multiply := func(done <-chan int, intStream <-chan int, multiplier int) <-chan int

第二個參數接收一個 channel,在 function 實作中,先建立一個 channel,啟動一個 goroutine 在裡面接收第二個參數的 channel 傳入的值,並且把接收到的值計算後傳入先前建立的 channel,goroutine 外將這個建立的 channel 給回傳出去

add(line: 44 to 61)

這個 function 的實作跟 multiply 大同小異,只差在計算從 channel 傳入的值一個是相乘,另一個是相加而已

最後我們可以發現除了 generator 這個 function 沒有傳入 channel 當參數以外,其他兩個 function 其實都是接收了一個 channel 與回傳另一個在 function 內建立的 channel,因此我們可以將 function 跟 function 之間用 channel 串連起來達到上面圖2那樣子的效果

pipeline := add(done, multiply(done, intStream, 2), 1)for p := range pipeline {
fmt.Println(p)
}

總結

這次學習了 pipeline 的觀念,讓資料可以以 “stream” 的形式來流動以達到我們想要的結果,比如我們希望傳入一堆圖片做一系列的處理

傳入圖片 -> 切裁 -> 加上浮水印

我們可以用 pipeline 的方式來將每一個不同目的的程式做組合應用,並且達到職權的分離,還可以用 goroutine 加上 channel 的方式來達到 concurrency 的操作增加效率,真是個不錯的概念呀

--

--