[ Elixir ] Enum, Stream, Flow 有什麼不一樣?

最近看到一篇 Blog ,內容是如何調教 Flow.from_enumerable, 與 Flow.partition 的 :stages 與 :max_demand 參數。 第一篇是版主探討他怎麼用 flow 打造他期望的工作模型。

TUNING ELIXIR GENSTAGE/FLOW PIPELINE PROCESSING

第二篇是版主應回應的要求,教大家怎麼建立第一篇使用的工具。

MEASURING AND VISUALIZING GENSTAGE/FLOW WITH GNUPLOT

該版主用 
1. File 作為紀錄時間搓的工具,並以 stage_work_name.log 為檔名。 
2. gnuplot 讀取 log 並繪圖。 
這種作法很好上手,大概一個小時內你就可以套在自己想要檢測的 flow 上了,其他需要尋找資料流卡點的,也可以套用這手法。 
如何 log 與 製作圖表的 blog 包含 elixir code ,可以直接拿來用。所以本文不重複這塊。 推薦大家花一個小時左右的時間,把 log 與圖表那篇實作起來。

圖表工具與檢視併發執行情況的小套路講完了。 
回到正題。其實是要嘗試呈現下圖描述的東西。

Elixir 不同的 Collection 的資料流,到底是怎麼一回事呢?上述的行為,跑起來真實樣貌是長怎樣的? 目前 Elixir 內建有 Enum, Stream 兩個處理 Collection 的庫。

另外還有一個最近從 GenStage 拆出來的庫 Flow 。 ( 2017–01–18 ) 分家的版本是 0.11 版。可檢視 0.11 的 change log

首先,先解釋圖表,圖表呈現每筆資料何時進入到不同的 Job 上,並在完工前打卡。 座標分別是 x: time ( ms ) 與 y: counter 。 解讀的方式是, 
1. 你可以畫一個橫線。橫線會經過四個,或是更多的符號,該符號代表 該次的 counter 經過特定 job 時打卡的時間。 job1 與 job 2 的間隔,就是打卡的時間差。 也可以只看第一個與最後一個打卡的時間差。可以看出一個 element 通過 pipeline 的效率。 
2. 畫一個直線,直線碰到的符號個數,代表該時段有多少個 process 打卡。在本篇可以拿來解讀同時有多少個 process 在工作。

另外我的示範代碼。每個 job 都簡化成一個 花費 1 ms ( 0.001 second ) 的任務。所以對於同一個 counter 而言,圖表上應該沒有機會重疊。

所有代碼的結構如下, job1 到 job4 在 stream, flow 版本不會有變化。然後 pipeline 部分, Enum 會換成 Stream 與 Flow 的版本。後續只會貼上 pipeline 的部分。

@max 25

def plot_enum do
 Progress.start_link([:enum_job1, :enum_job2, :enum_job3, :enum_job4])
 
 job1 = fn x -> Process.sleep(@sleep_time); Progress.incr(:enum_job1) ; x end
 job2 = fn x -> Process.sleep(@sleep_time); Progress.incr(:enum_job2) ; x end
 job3 = fn x -> Process.sleep(@sleep_time); Progress.incr(:enum_job3) ; x end
 job4 = fn x -> Process.sleep(@sleep_time); Progress.incr(:enum_job4) ; x end
 
 1..@max
 |> Enum.map(&(job1.(&1)))
 |> Enum.map(&(job2.(&1)))
 |> Enum.map(&(job3.(&1)))
 |> Enum.map(&(job4.(&1)))
 
 Progress.stop()
 end
 然後是 Steam 版本的代碼。

1..@max

|> Stream.map(&(job1.(&1)))
 |> Stream.map(&(job2.(&1)))
 |> Stream.map(&(job3.(&1)))
 |> Stream.map(&(job4.(&1)))
 |> Enum.to_list()
 我們來看 Enum 與 Stream 的圖表吧。

兩個圖表,花費的時間都差不多,不過這不是本文的重點。原則上不該這樣,因為我的示範資料是很小的,而且不是從外部來的,每個 job 又都要花 1 ms 所以, stream 的其他特性沒辦法發揮。

但我們可以比較 Enum 與 Strem 資料流的特徵。
首先,可以看到, Enum 是把 25 count 都跑完 job1 之後, job2 再從頭 count 1 依序處理到 25 ,然後 job3 與 job4 。 這是 Enum 的特性, list 進 list 出。 對比的 Stream 可以看到, count 1 先獨自跑完 job1,2,3,4 ,然後才換 count 2 跑 這四個 job ,直到 count 25 跑完。

這兩種方式有什麼差別呢?假設job4 處理完後,是大家集合。那 Enum, Stream 就沒差別。 但假設 job 是從叫客戶叫披薩,到快遞的送達的時間,那其實 job1 到 job4 中間越少等待時間越好。這種情境下, Steam 就有優勢。

這種方式就是畫橫線看。一個東西從原料開始,完成組裝,出貨,送達。我們可以看到 Enum 與 Stream 在這邊表現的不同。 Enum 不在乎單個 element 的情況,Enum 總是看整體結果的。而 Stream 是看個體。

圖表也顯示了, Enum 版本的 counter 在每個 job 打卡都隔了很長的時間。我們不太喜歡這樣。Stream 解決了這樣的問題。

這是第一次的進化。 透過 Stream 。

不過圖表還可以檢視出一個狀況, Enum, Stream 都是單個 process 。我們可以畫直線,很明顯的,一個時間點,就只會出現一個打卡符號。

所以我們可以更細部的看 Stream 的處理過程。 我們可以觀察 count10 ,你可以發現, 處理 job1 時,其他的 job 都是沒在運作的。 count10 都完工時, count11 才可以啟動。 Stream 似乎有進步的空間。

所以我們引進了 Flow , flow 可以分割 pipe 成為不同的 process 個體。下圖是在 pipe 最上頭分配 4 個 process 的結果。語法是在 Flow 的起點用 from_enumerable( stages: 4 )

# @max_demand 是 另一個參數,本文目前不探討,但設定太多或太少,會嚴重影響執行的表現。

1..@max
 |> Flow.from_enumerable(stages: 4, max_demand: @max_demand)
 |> Flow.map(&(job1.(&1)))
 |> Flow.map(&(job2.(&1)))
 |> Flow.map(&(job3.(&1)))
 |> Flow.map(&(job4.(&1)))
 |> Enum.to_list()

我們可以看到,圖表很像 Stream 的版本,不過這次有 4 個 process 同時進行。比起 Stream 版本, 會有 3 個 counter 可以提早完成任務。 效果是根據你有多少 cpu 核心,或啟用多少個 process (語法上用 stages: n ,設定)。

畫橫線看, Flow 與 Stream 取得的結果是一樣的。 但畫直線,可以看到同一個時間點有 4 個 process 一起 做 job1 。 然後一起做 job2 。最後一起處理完 job4 ,之後再從下一輪的 job1 開始。

我們可以觀察到,總時間大概只要 1/3 或 1/4 上下,這就是加 processs 的好處。(補充一點, 這裡有 4 個 process , 每個 process 裡面,都負責處理整套的 job1~job4 。)

這次是第二次的進化, 用 Flow ,獲得多個 process 的效益。

接下來我們再刁鑽一點,先來觀察 flow 的圖表, 每批 process 在處理 job4 的時間差,基本上就是隔了一套 job1 ~ job4 的時間,這個時間差跟 sream 版本依樣。我們可不可以縮短這個時間差呢? 可以,我們用 Flow.partition( stages: n ) 來加 process 。

1..@max

|> Flow.from_enumerable(stages: 1, max_demand: @max_demand)
 |> Flow.map(&(job1.(&1)))
 |> Flow.partition(stages: 1, max_demand: @max_demand)
 |> Flow.map(&(job2.(&1)))
 |> Flow.partition(stages: 1, max_demand: @max_demand)
 |> Flow.map(&(job3.(&1)))
 |> Flow.partition(stages: 1, max_demand: @max_demand)
 |> Flow.map(&(job4.(&1)))
 |> Enum.to_list()
 首先先說明代碼的變化。上一版本, flow 一開始分配了 stages: 4 。 這一版本,一開始是 stages: 1 ,接著 job1 ,然後在其他 job 之前,用 Flow.partition(stage: 1 ) 切割。 兩個版本都是使用了 4 個 process 。只是分工的方式不同。

舉個簡單的類比:

上一個版本是開了四個工廠,每個工廠只有一個人。 4 process 
這一個版本是一個工廠,每道 Job 有一個人負責做事。 4 process 
再來看看這個版本的圖。 job4 與 job4 之間的時間差縮短了。

這次我們的 pipeline 終於有個真正的「流水線作業」流程了。

我們再來比較一下兩種拆 process 的不同處。 上一個確實能更提早發貨。核心數越多,就越能享有更早的完工時間。不過另一個觀點是,就拿紡織廠來舉例,以4工廠版本來說,job1在跑的時候, job2,3,4 的機器設備就晾在那邊沒做事了,設備買多餘了。反而是有四個人的那個工廠,才不會浪費機器的生產力,人與設備都滿載。

但我們軟體的設備是函式,似乎沒關係。4 工廠的方式目前來看是更好的拆法。把 兩個 flow 版本重疊, job4 圍出來的每個三角形,就是被提早的時間量。目前來說, partition 沒有帶來新的進化。

但如果這些任務是動畫,那 partition 的版本,會比上一個版本更流暢,這時候用 partition 分配 process 會更好。因為上一個版本的結果,4 個同時完成,畫面只會有一個,反而會造成跳格的視覺感受, 60 fps 變成 15 fps 的體驗。所以要根據我們的需求來分配切割的方式。

另外一個目前代碼沒有表現出來的是, partition 可以多派幾個 process 來處理複雜的 job 。我們來修改吧。 我們把 job1 後面接上一個 job_spent ,讓他耗時 2 ms。 那麼我們可以在 job1 前的 from_enumerable 上,設定 3 倍的 process 量,來平衡 ( job1 + job_spent = 3 ms ) 所消耗的時間 這時候,這邊的 一個 process 要處理 job1, job_spent 兩個工作。 
 
 job_spent = fn x -> Process.sleep(2); x end
 1..@max
 |> Flow.from_enumerable(stages: 3, max_demand: @max_demand)
 |> Flow.map(&(job1.(&1))) # 1 ms
 |> Flow.map(&(job_spent.(&1))) # 2 ms
 |> Flow.partition(stages: 1, max_demand: @max_demand)
 |> Flow.map(&(job2.(&1)))
 |> Flow.partition(stages: 1, max_demand: @max_demand)
 |> Flow.map(&(job3.(&1)))
 |> Flow.partition(stages: 1, max_demand: @max_demand)
 |> Flow.map(&(job4.(&1)))
 |> Enum.to_list()

上面兩個圖表,可以發現 調整過 job1 的 stages: 3 ,成果是接近上面那個所有階段只需要 1ms 的情況。而沒有 調整 stages 的,因為 第一個 process 每次都必須用掉 3 ms 。 即使 job 2,3,4 都只要 1 ms 就完工了。 但情況是瓶頸卡在第一個 process 的地方。全部完成,耗時是最花時間的那個 job 乘上 list 大小。 spent_time = length( list ) * jobs_count * max( [ job1, job2, job3, job4 ] )

現在的問題是, job2 每三秒才會收到一個結果,所以有兩秒是閒閒的,連帶的, job3, 4 都是做一秒,休息兩秒的情況,圖表也能觀察出這樣的狀況。job2 做完就沒事了,畫直線看,每 3 秒碰到一次 job2 ,被前面最耗時的卡著了。所以即使知道其他 job 都很快,但反映出來的時效就是最大的,上面的 jobs_count * max( list ) 就是這麼來的。

假設前面那三秒的 job 是不能再切割的,那要怎麼避免 job2,3,4 的閒置呢?

所以要怎麼調節?做法就是把比較耗時的部分,增加更多的 stages 來貼近最不耗時的。最快的就不必加核心了因為快的job 要等慢的,也就是要調整耗時的。其他慢的就是乘上某個倍數。這樣的結果是,我們把 3 ms 產出一個結果的 job1 , 變回每 1 ms 就產出一個結果的 job 。這樣 job 2 就沒有等待 job1 的空閒產生。(這邊有個小狀況,狀況是: job1 一次丟 3 個成品給 job2 , job2 處理一個,有兩個要先等著! job2 屯貨了!不過 job1 有三秒的間隔,所以 job2 在時間內消化完了。)

我們透過 上述 partition 的分配,把比較耗時的 pipeline 變得跟 上一版本的 flow 有一樣的總時效。 這才是多個 partition 切割方式帶來的 power 。

在耗時的 partition 上,分配更多的 process,降低類似 job1,與 job2 這種 pipeline 之間等待的閒置時間。

這是第三次的演進,使用 partition 調節 stages 比例。

盡可能減少 pipeline 之間 process 的閒置。用多個 partition 調節整個 pipeline 的瓶頸處。使得 pipeline 運作更均衡順暢。 調節好的話,耗時就會從上一個式子往下面的式子貼近。 spent_time = length( list ) * jobs_count * min ( [ job1, job2, job3, job4 ] )

總結

我們從單 process 的 Enum 處理 list ,對照組,一切的開始。 
演化成單 process 的 Stream 操作資料,讓先處理的 element 盡快完成任務,可以先行離開。 
第三步,我們用 Flow 創造 n 個 process ,一次用 n 個 process 跑 Stream 的方式, 總體耗時是原來 的 1/n 或 1/(n -1 ) 左右。獲得相當幅度的進步。大部分的 element 也比純 Stream 又更早完成了。 
第四步,對付難纏的又不可分割的任務(特別吃時間),我們透過 partition 分配更多的 process ,用來填補後續 job 閒置的計算時間。而總體耗時表現,又能變回第三步那樣的水準。而且每個 生成的 process 都保持在運算的狀態。這一步最大的功勞是擠掉了 process 空閒的時間。 process 是拿來算的,不是拿來閒置的。

以上是本文對 Elixir 資料集處理的特徵整理。希望本文能協助大家學習 Elixir 併發功能的理解與使用。

有錯誤或疑問的地方,歡迎大家回饋。


Originally published at aikawadlog.blogspot.com on January 28, 2017.

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.