在 Linux 上一些工作日常上,一定會用到一個指令 — tee (講得好像國民日常一樣),那麼這個指令是拿來做什麼用的呢?而又跟我們今天要談得 Tee Channel 什麼關係呢?讓我們先來看張照片

Image for post
Image for post

嘿,對!你沒有看錯,就是這個你家廁所、洗手台都會看到的東西,這玩意叫做 T型管,所以它的作用是什麼?沒錯!分流!

那我們在 Linux 下會怎麼用這根水管呢?我們來看一下,假設我們想要 ping 某個網址在銀幕上馬上看到結果,但我又想要順便把結果存到檔案裡,方便等等查閱,我們可以這樣做

ping google.com | tee output.txt

這樣子就會立即看到結果,又順便把結果存到檔案 output.txt 裡面

好了,前情提要完了,現在我們來正式談談在 Go 裡要怎麼應用這樣子的概念。

假設我們從某個地方取出了一些資料,依序的把這些資料一筆筆的傳入一個 channel,另一方面我們希望有個地方去接收這個 chaanel 的資料做處理,並且同時的將這些資料也傳到另一個地方做處理,有點饒舌,不過意思就是像我們前面提到的 Linux 那個意思是一樣的。

那麼該怎麼實現呢?我們可以同時建立兩個 channel 搭配 select 語法來完成,不多說直接看一下例子。

我們建立了一個 produce function 來將傳入的數字依序的傳入 inStream channel 中。

接著建立一個 tee function 回傳兩個 channel,function 內跑了一個 i < 2 的 loop,這裡比較要注意的是我們用 select 語法來讓整個流程不會 block,並且在傳入資料到 out1 或 out2 channel 後,我們就將該 channel 給設為 nil,這樣 loop 就不會丟到兩個一樣的 out channel 了。

另外一點要注意的是,在 line 29 我們將 var out1, out2 = out1, out2 給做了一個 copy 的動作,如果不這樣做的話,會把設定 out channel 為 nil 的時候,把原本的 out channel 也設定成 nil,這樣外面在讀取的時候就會出錯。

程式輸出為

Process out1: 1 and Log out2: 1 
Process out1: 2 and Log out2: 2
Process out1: 3 and Log out2: 3

小結

今天算是一個小品,不是太複雜的東西,但是蠻有用的概念的,希望可以幫助到大家。


在上回 Go: Concurrency Patterns — Pipeline 談到我們可以利用 pipeline 的方式將各個操作視為一個個獨立的執行單元,讓我們可以較好的拆解並組合應用,但有時會碰到某個單元的執行效率較差而拖慢整個 pipeline 的情況發生,這時我們就可以利用 Fan-out — Fan-in 這個 pattern 來提升效率。

先來看下面這個例子

我們建立了幾個 pipeline 的 stage:

  • intGenerator: 負責將 nums 一個一個的丟到 intStream 這個 channel 裡
  • increase: 負責將 intStream 收到的 num 做加1並丟到 increaseStream 的 channel 裡,這裡為了要示範這是一個很慢的操作,所以我們 sleep 3秒來模擬這個情況
  • 最後我們將兩個 stage 串起,並查看一下執行了多長的時間

我們可以看到這段程式最後輸出執行了大約 9秒 的時間,因為在 increase 需要做完 sleep 3秒並加1 的動作後才會準備在接收下一個進來的 num,這段程式的 bottleneck 在 increase 這個動作

Fan-out — Fan-in 的使用條件

並不是所有情況都適合使用 fan-out fan-in 來解決所有像上述提到的類似問題,在考慮使用 Fan-out Fan-in 時,有兩個前題:

  • Stage 的操作不依賴自身的上一個計算結果,也就是該操作是沒有順序性的
  • 該 stage 的操作是一個慢的操作(不然好好的幹嘛優化它呢?)

Fan-out

那我們要如何把每個 stage 處理需要等待3秒這件事情給優化呢?我們知道 increase 這個 stage 會開一個 goroutine 來處理操作,那我們就可以開啟多個 increase stage 來同時處理操作。

我們將原本的

stream := increase(done, intGenerator(done, []int{1, 2, 3}))

改為

intStream := intGenerator(done, []int{1, 2, 3})

increaseNum := runtime.NumCPU()
increases := make([]<-chan int, increaseNum)
for i := 0; i < increaseNum; i++ {
increases[i] = increase(done, intStream)…


Image for post
Image for post

這次我們要聊聊的是在 Go concurrency 程式設計裡一個很有用的 pattern — Pipeline

什麼是 Pipeline

那麼什麼是 Pipeline 呢? 顧名思義就是由一些管子塔建起的線路,工業上會用這樣子技術來運輸水、油或者是瓦斯,而在軟體工程裡我們也就借用了這樣的一個名詞來運輸我們的 “資料”,在用這樣子的概念在開發時很重要的一個流程就是: 接收資料 -> 處理資料 -> 傳回資料,而這樣跟一般 function 有什麼不一樣呢?一般的 function 接收的資料跟傳回的資料不一定是一樣的,而因為我們要運送、傳輸 “資料”,所以接收的資料跟傳回的資料會是一樣的型態,function 就好像是前面提到工業上管線跟管線之間的中繼站一樣,下面我們看點簡單的例子

上面這個簡單的例子我們可以看到兩個 function

multiply 傳入一個 []int 的 slice 與要相乘的數字,建立一個與傳入 slice 一樣大小新的 slice,把傳入 slice 的每個數字都乘上要相乘的數字賦值給新建立的 slice,並回傳新建立的 slice

add 傳入一個 []int 的 slice 與要相加的數字,建立一個與傳入 slice 一樣大小新的 slice,把傳入 slice 的每個數字都乘上要相加的數字賦值給新建立的 slice,並回傳新建立的 slice

接著我們可以看到第28行 add(multiply(num, 2), 1)

裡面的 function 會先執行而後將結果丟回給外面的 function 做傳入值,我們的資料流會像是這個樣子

data([]int) -> stage(multiply function) -> data(處理後) -> stage(add function)

有沒有就像是一開始說的資料是以一種流動的方式在進行

所以第28行的程式口語來說就會是,將一個 slice []int 傳給 multiply function 將每個 slice 裡的數字乘上要相乘的數字,接著將處理後的結果回傳出去給 add function 當做輸入參數,再將每個 slice 裡的數字加上要相加的數字,回傳出去

結果會是這樣:

3
5
7

使用 Channel

上面的做法其實會有幾個問題存在

  • 每進去一次 function 都會複製一個新的 slice 出來,對 memory 佔用是一個問題
  • 每次都要將整個 slice 做完相加或相乘才能再到下一個 function…

About

Eric G. Huang

Eric G

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store