Kotlin Coroutines Dispatchers 那一兩件事

Jast Lai
Jastzeonic
Published in
26 min readJul 31, 2020

前言

某次講 Coroutines 的時候,講到 Dispatcher 時,有提到 Kotlin Coroutines 有把 CoroutinesDispatcher 當作是 interface 提供出來給開發者使用。

val customDispatcher = object:CoroutineDispatcher(){
override fun dispatch(context: CoroutineContext, block: Runnable) {
//TODO
}

}
suspend fun test1(name:String,id:Int):String = withContext(customDispatcher){
test()
}

這時被問到了:「那我要怎麼利用這個 Dispatcher 做排程」。

其實被這麼一問我當下也愣了,因為要搞排程這事情可能就不是我這次講題的範疇了,排程這個字遠遠比想像中的抽象,畢竟不管是怎樣龐大的系統,原則上就是邏輯 + 排程去組成的,這甚麼意思?這意思就是,要達成這個目標的方法很多,以目的來說,就是找到符合自己情形的方法就可以了。

所以我後來只能草草帶過,因為要講大概抓個人來討論一下說也說不完了。

但是這也讓我產生了一個疑問,究竟 Coroutines 的 Dispatchers 是怎麼做排程的?

那就不囉嗦,咱們開始吧。

Dispatchers.Main

其實直接進去 Dispatchers 就可以看到很精美的註解了,不過這邊大致上是在講 Dispatchers.Main 的定義,那可以從 MainDispatcherLoader 往下追。

這邊可以發現 loadMainDispatcher 主要會去找 MainDispatcherFactory,若是找不到,則會去執行 createMissingDispatcher(e)。

createMissingDispatcher 如果是寫 Android 應該會很熟悉,若沒有加一個 dependency 的話,會一個錯,要你把某個 dependency 加上,就是他噴的。

那 MainDispatcherFactory 呢?

可以發現他是個 interface,所以很明顯 Main 是根據平台去決定要怎麼做的,畢竟各個平台的 main thread 有這麼一點不一樣。

那我這邊一樣,以我自己的老巢 Android 來說,Android 的部分,是會進到 FastServiceLoader 的那行。

可以發現裏頭直接用反射建了一個叫 AndroidDispatcherFactory 的玩意。

利用全域搜尋,可以找到它。

寫 Android 多年,看到 Looper.getMainLooper() 大概就知道是怎麼一回事了,如果還是不知道的話,沒關係,我們可以順著 HandlerContext 看下去。

HandlerDispatcher 其實就是繼承 CoroutineDispatcher 的一個 sealed class,我在猜設計成 sealed class 可能是為了未來擴充為目的,但目前看不大出來他這樣設計的目的,這就不深究了。

要放焦點的是可以注意到被 override 的 dispatch 直接用 handler.post(Runable)了,也就是說,如果在 Android 使用 WithContext(Dispatcher.Main){ blablabla },後面這塊 { blablabla }會被 Dispatcher 直接餵給 MainLooper 的 Handler ,通常 Runnable 進到 post 會馬上執行,這是 Handler 的機制,至於 Handler 怎麼運作的,那是 Android 另一個古老的傳說,這邊也就暫且不提了。

Dispatchers.Default

本來是想先寫 IO 的,但是 IO 其實是基於 Default 構建上去的,所以先寫 Default 好了。

Main 不難懂,不懂其實也沒差,知道他最終會執行在 Main thread 上就好了,Default 相比之下有故事的多。

Dispatchers.Default 容易跟 CoroutineStart.Default 搞混(所以我得更正我另一篇文章給的定義,因為我搞混惹),前者指的是 Dispatchers 的預設,它會去 Run Coroutines 預設的 Scheduler ,基本上不會跑在 main thread 上。後者則是馬上把 Coroutines 加入自身 Coroutines context 的排程中,它是在 launch 或者是 runBlocking 等方法中沒塞 Dispatchers 的預設值,原則上是在哪呼叫就跑在哪個 thread 上。

那話說回來,甚麼是預設的 Scheduler ,且聽我娓娓道來。

從 Dispatchers 開頭可以發現 Default 調用了一個叫 createDefaultDispatcher() 的方法,那它就會直接使用 DefaultScheduler。

那可以發現 DefaultScheduler 是繼承 ExperimentalCoroutineDispatcher 的

那就直接看過去 ExperimentalCoroutineDispatcher 的 Dispatch 吧

裡面做的事情也頗單純,就是呼叫使用 coroutineScheduler 的 dispatch

不過看到 CoroutineScheduler 的 dispatch ,頭痛的地方就來了

嗚哇,好多精美的註解阿,莫急莫荒,我們先把焦點放到這裡

這邊比較需要知道的是, Runnable 進到這裡,會被轉裝成 task。

然後再用 submitToLocalQueue 被放進去 currentWorker 裏頭。

那問題來了, currentWorker 是甚麼? submitToLocalQueue 又是甚麼?

可以看的出來 currentWorker 是抓目前的 Thread。

咦等等,我們甚麼時候決定 Thread 了?

嘛,是根據你執行的 invoke 該 Coroutine 時所決定的,也就是說如果你在 main thread 上 launch ,那就會在 main thread 上跑,畢竟,這裡還沒有跑那個 Runnable ,而且那個 Runnable 也不是塞給他。

比較主要的是這邊會把 currentThread cast 成 Worker。

Worker 是啥?還有 cast 成 Worker又是怎麼搞得?其實這動作很關鍵,這個稍後再提。然後把 執行的 CoroutineScheduler assign 給 Worker,最後把 Worker return 回去。

恩,那 submitToLocalQueue 是幹嘛?顧名思義,就是 submit 給 Queue ,欸不是,別吐槽我就英翻中,這裡確實是這個意思,那延伸的問題是 localQueue 是甚麼?

可以發現 localQueue 是一個 WorkQueue ,恩,其實這也就只是個 Queue ,放進去等著被 CoroutineScheduler poll 出來執行那樣。

這邊問題又來了,Dispatchers.IO 並不像是 Dispatchers.Main 那樣可以很明確看出來是由 MainLooper 去 run Runnable 的,這裡除了設定以外,根本壓根子沒有提到甚麼時候 Run 在甚麼 thread 上。

所以問題就是,那究竟是甚麼時候決定了這個 Runnable 要 run 在甚麼 thread 上?

其實也不難理解,回頭看向 CoroutineScheduler 。

可以發現他就是個 executor,此外還有 core pool size 、 max pool size 那些零零散散,跟 thread 上限有關的數值設定,那可以推斷出,thread 肯定是在 CoroutineScheduler 生成的。

那,是在哪呢?

我們回頭看過去 dispatch ,把焦點放到下半部分

可以注意到當 task 是 non blocking 時(使用 default 時預設的)的時候,會呼叫一個叫 signlCpuWork(),那咱們接著看下去:

這一整串其實就是建立一個 Worker (createNewWorker()),或者是取得一個現有的 Worker (parkedWorkersStackPop()),那…Worker 到底是甚麼?

直接追下去,可以發現他就是一個包裝過後的 Thread。那再來看看 createNewWorker()

比較關鍵的是 worker.start(),也就是說,在這一步,我們建立了我們要 run Runnable 的 task 的另一個 thread,也同時開始了他的運行。

咦…等等?剛才不是說 dispatch 的時候是 run 在 dispatch 它的 thread 上,那意思是假設我 dispatch 時在 main thread 上,那 runnable 也會跑在 main thread 上嗎?

甭急,還記得剛剛把 Thread.currentThread cast 成 Worker 的片段嗎?

可以發現的是用的是 as? ,也就是說 cast 失敗會回傳 null,回傳 null 會發生甚麼事?會因為 submitToLocalQueue 失敗而回傳該 task,接著會發生甚麼事情?

沒錯,就是 addToGlobalQueue。

從這個結果可以推論一個很有趣的結論是:若非是在 Coroutines 底下 launch 的 Coroutines ,Runnable 都會進到 global queue。

for(i in 1..5) {
GlobalScope.launch(Dispatchers.Default) {
//DO something
}
}

這樣,原則上五個都會進 global queue,因為 launch 它並 dispatch 該 Runnable 的人都是 main thread,而 main thread 並非 Worker。

GlobalScope.launch(Dispatchers.Default) {
//Enter Global queue
for (i in 1..5) {
launch(Dispatchers.Default) {
//Enter local queue
}
}
}

這樣則有一次進 global queue ,而五次進 local queue

那在還記得 run -> runWorker -> findTask 的片段嗎?最後 Worker(也就是 thread)開始 run 的時候就會被從 global Queue 拉出來去執行了。

呼,好這樣總算說明 CoroutineSchedular 是怎麼新開 Thread 然後把 Runnable 拿來執行的了。

OK,把話說回來,那個排程…

呃…

嘛, Default 的排程其實也沒想像中的複雜,關鍵就在於,使用 CoroutineScheduler 最終會生成幾個 Worker 。

這就看回 ExecutorCoroutineDispatcher 的建構

corePoolSize 最終會被傳進 CoroutineScheduler ,並被當成 Worker 建立的上限,那 CORE_POOL_SIZE 會是多少呢?

答案是看 JVM 怎麼給,但至少會有 2 就是了,那這邊我為了方便說明,以我手邊有的機子 — 4 個做假設好了。

其實得到這個數字,答案也就呼之欲出了,還記得我剛提到的 global queue 嗎?

for (i in 1..12) {
GlobalScope.launch(Dispatchers.Default) {
val currentThread = Thread.currentThread().name
println
("$currentThread start $i")
Thread.sleep(1000)
println("$currentThread end $i")
}
}

這裡要注意,這邊要看的是順序而不是求快,所以不能用 Suspend method ,用了 Suspend 會把 Thread 讓出去,順序會完全沒有辦法掌握。

這個結果會是這樣:

DefaultDispatcher-worker-1 start 1
DefaultDispatcher-worker-2 start 2
DefaultDispatcher-worker-3 start 3
DefaultDispatcher-worker-4 start 4
DefaultDispatcher-worker-1 end 1
DefaultDispatcher-worker-1 start 5
DefaultDispatcher-worker-2 end 2
DefaultDispatcher-worker-2 start 6
DefaultDispatcher-worker-4 end 4
DefaultDispatcher-worker-3 end 3
DefaultDispatcher-worker-4 start 7
DefaultDispatcher-worker-3 start 8
DefaultDispatcher-worker-1 end 5
DefaultDispatcher-worker-1 start 9
DefaultDispatcher-worker-2 end 6
DefaultDispatcher-worker-2 start 10
DefaultDispatcher-worker-4 end 7
DefaultDispatcher-worker-3 end 8
DefaultDispatcher-worker-4 start 11
DefaultDispatcher-worker-3 start 12
DefaultDispatcher-worker-1 end 9
DefaultDispatcher-worker-2 end 10
DefaultDispatcher-worker-3 end 12
DefaultDispatcher-worker-4 end 11

這邊情況很多變,我以最初始化的狀態來做討論,這其實也最清楚,然後因為毫秒之差印出來的關係,印出來的結果可能也會有出入,但是取用的順序邏輯大概是這樣。

Core 的扣打有 4 個,所以產生四個 worker ,分別取 1 - 4

剩餘的 8 個,依序進入 global queue。

1 先結束,於是取 queue 中的 5 ,再來 2 結束取 queue 中的 6 ,然後 4 先結束,取 queue 的 7 ,再來 3 結束,取 queue 中的 8 ,以此類推。

這個不難推論,那我們在把 local queue 的條件加進去:

for (i in 1..8) {
GlobalScope.launch(Dispatchers.Default) {
val currentThread = Thread.currentThread().name
println
("$currentThread start $i")

if (i == 1) {
for (j in 1..4) {
launch(Dispatchers.Default) {
val currentThread = Thread.currentThread().name
println
("$currentThread start $i - $j")
Thread.sleep(1000)
println("$currentThread end $i - $j")
}
}
}
Thread.sleep(1000)
println("$currentThread end $i")
}
}

結果會是這樣:

DefaultDispatcher-worker-1 start 1
DefaultDispatcher-worker-2 start 2
DefaultDispatcher-worker-3 start 3
DefaultDispatcher-worker-4 start 4
DefaultDispatcher-worker-2 end 2
DefaultDispatcher-worker-3 end 3
DefaultDispatcher-worker-2 start 5
DefaultDispatcher-worker-3 start 6
DefaultDispatcher-worker-4 end 4
DefaultDispatcher-worker-4 start 7
DefaultDispatcher-worker-2 end 5
DefaultDispatcher-worker-3 end 6
DefaultDispatcher-worker-2 start 8
DefaultDispatcher-worker-4 end 7
DefaultDispatcher-worker-2 end 8
DefaultDispatcher-worker-2 start 1 - 1
DefaultDispatcher-worker-4 start 1 - 2
DefaultDispatcher-worker-2 end 1 - 1
DefaultDispatcher-worker-2 start 1 - 3
DefaultDispatcher-worker-4 end 1 - 2
DefaultDispatcher-worker-2 end 1 - 3
DefaultDispatcher-worker-2 start 1 - 4
DefaultDispatcher-worker-1 end 1
DefaultDispatcher-worker-2 end 1 - 4

恩,原本預期會是負責 1 的 worker-1 會被 local queue 中的 task 卡住到最後,而 5 6 7 8 都被其他 Worker 給帶走,但事實上不是這樣,因為 CoroutineScheduler 還有一個 steal 機制,概念上是會去偷其他 Worker 的 local queue 來做,所以結果反而不會是 worker-1 被卡到最後,所以以結論來說,只要知道 Coroutines 的目標是會盡速把工作完成,要去評估他的流程順序要素實在太多了。

Dispatchers.IO

方才有說明到, IO 是基於 Default 去建立的,那兩者最具體的差別在哪裡?

基本上兩者最大的差異在以創建的 Worker 數量會不同。

甚麼意思呢,別急,我們繼續看下去。

同樣的,我們一樣從 Dispatchers 開始往下追,可以得到 DefaultScheduler 的 IO。

這邊看到 64 可能會誤會 Dispatchers.IO 最多開 64 個 Thread,但事實上那個 64 是 default value ,那它會和 Runtime 的 availableProcessors 做比較,所以能用的不一定是 64 個,如果系統能給的 availableProcessors 大於 64 就會給 大於 64 那個,當然,這種系統不多,所以到最後多半是會給 64。。

從 blocking 追下去,可以看到是新增了一個叫做 LimitingDispatcher 的實體。

這個實體的 Code 不長不短,其實也沒那麼難懂,只要看 LimitingDispatcher 的 dispatch 部分就行了:

inFlight 其實就是一個 atomicInt ,呃,用到 atomic 的目的其實有 thread safety 的目的,在閱讀上就把它當成是 Int 就行了。

那可以看到有一個無限迴圈在跑,當 inFlight 數量小於設定的上限數,就直接 dispatch 下去給 ExperimentalCoroutineDispatcher (),就是 Default 用的那個。

那如果 inFlight 的數量大於設定的上限,那會將 dispatch 進來的 Runnable 裝進去 queue 裏頭,那因為 inFlight 大於上限了,所以會被 return 掉,那因為這邊因為會有 Race Condition 的情況,也就是說會有在加進去 queue 的時候大於上限,但是加進去之後小於上限的情況,如果有就會再做一次迴圈直到合乎情況為止。。

那這邊看著可能會納悶一下,如果 dispatch 被呼叫很多次 inFlight 不會永遠降不下來嗎?

是也不會,可以往下看看到另一個 method 叫 afterTask()

顯然在 task 結束時,會把 queue 中的下一個掏出來,然後執行接著把 inFlight -1 後,再把下一個 Runnable 掏出來,丟給 dispatch ,這麼做是為了防止一些 Runnable 卡住的情況,嘛,就當有異步的時候,會有各式各樣的情況發生,那為了防止整條 queue 卡住,所以會有這個處理。

那 afterTask 何時被 invoke 呢?這時看向 CoroutineScheduler 把 Runnable 轉裝成 Task 的地方:

這邊可以知道 afterTask 是在這裡被 invoke 的。

恩,這樣看過來其實也沒這麼複雜,去掉一些 Race condition 和 task 卡住的情況,就是把 Runnable 丟進來,藉由 CoroutineScheduler 轉裝成 task,然後執行,如果數量超過上限了,那就放到 Queue 裏頭,等到上一個 task 結束後再從 queue poll 出來執行。

那問題也就來了,ExperimentalCoroutineDispatcher 裏頭的 CoroutineScheduler 限制就是 availableProcessors,那 IO 是怎麼有辦法把數量拉到超過 availableProcessors 呢?

關鍵在 LimitingDispatcher 的 dispatch invoke 的是 dispatchWithContext,那因為 LimitingDispatcher 本身是 TaskContext 而非 NonBlockingContext ,所以在 CoroutineScheduler 時會走上另外一條路。

沒錯,就是 signalBlockingWork ,這個 method 裡面會多狀態給二元去算,用一些算式讓 Worker 的數量拉大。至於後面排程的方式大致就和 Default 相同了。

所以可以得到一個結論是, IO 與 Default 的差別在於 Default 開的 thread 會受到 CPU core 限制,而 IO 則會比 Default 多上不少。

Dispatchers.Unconfined

恩,這玩意很佛系,畢竟顧名思義,它的設計就是未受到 Dispatcher 限制,所以它的原始碼長這樣:

怎麼說呢?這個 Dispatcher 給人一個很隨興的感覺。 isDispatchNeeded 很直接就給了 false,原則上 isDispatchNeeded 等於 false 時,會執行到

executeUnconfined。

而它的 dispatch 很難有機會被 invoke 到,唯一的例外是使用到 yield 的時候。

yield 甚麼玩意?就是讓出去的意思,某種程度上跟 suspend 相似,概念如下:

GlobalScope.launch(Dispatchers.Main) {
println("job 1 1")
println("job 1 2")
println("job 1 3")
println("job 1 4")
println("job 1 5")

}

GlobalScope.launch(Dispatchers.Main) {
println("job 2 1")
println("job 2 2")
println("job 2 3")
println("job 2 4")
println("job 2 5")

}

這個順序很自然的會是:

 job 1 1
job 1 2
job 1 3
job 1 4
job 1 5
job 2 1
job 2 2
job 2 3
job 2 4
job 2 5

如果加上 yield

GlobalScope.launch(Dispatchers.Main) {
println("job 1 1")
yield()
println("job 1 2")
yield()
println("job 1 3")
yield()
println("job 1 4")
yield()
println("job 1 5")

}

GlobalScope.launch(Dispatchers.Main) {
println("job 2 1")
yield()
println("job 2 2")
yield()
println("job 2 3")
yield()
println("job 2 4")
yield()
println("job 2 5")
}

結果會變成這樣:

job 1 1
job 2 1
job 1 2
job 2 2
job 1 3
job 2 3
job 1 4
job 2 4
job 1 5
job 2 5

講白了,就是讓出 thread。

而 Unconfined 的 dispatch 則只會在 yield 的時候被 invoke ,而且目的只是告訴 context 我是 Unconfined 的,好像 block 跟他無關似的…。

這樣講的 Unconfined 好像沒啥故事說似的,看一下它的原始碼我還真的沒啥故事能說。

Unconfined 就是那麼的佛系,誰 invoke 它就跑在誰身上。

不過使用它比較特別一點的情況是,如果在被 invoke 然後執行途中有 suspend 切換到另一個 Worker 去的話,它是回由另一個 Worker 來執行的。

舉例來說:

GlobalScope.launch(Dispatchers.Main) {
var name = Thread.currentThread().name
println
("$name")
withContext(Dispatchers.IO){
}
name = Thread.currentThread().name
println
("$name")
}

得到的結果會是:

main
main

但如果把中間的 launch 換成 Unconfined:

GlobalScope.launch(Dispatchers.Main) {
launch(Dispatchers.Unconfined) {
var name = Thread.currentThread().name
println
("$name")
delay(1000)
name = Thread.currentThread().name
println
("$name")
}
}

結果會變成這樣:

main
DefaultDispatcher-worker-1

這個其實也不難理解,Coroutines 的流程簡化一點,我們以這個 case 來說明:

GlobalScope.launch(Dispatchers.Main) {
withContext(Dispatchers.IO){
//
}
}
  1. 執行 launch(Main)
  2. 執行 withContext(IO) 傳入 Continuation(Main)
  3. 透過 Dispatchers.IO 換 IO thread
  4. withContext 執行完畢
  5. invoke Continuation(Main)
  6. 透過 Dispatchers.Main 回到 main。
實際的模樣大概是長這樣

那如果換作是 Unconfined ,會變成是這樣

  1. launch(Unconfined by Main)
  2. 執行 withContext(IO) 傳入 Continuation(Unconfined)
  3. 透過 Dispatchers.IO 換 thread
  4. withContext 執行完畢
  5. invoke Continuation(Unconfined)
  6. 因為沒人在乎 Dispatcher 是啥所以直接在 IO 上執行。
Unconfined 就是下半部分了

結語

各位小夥伴,終於到這了。其實這篇遠比我想像中的難寫很多,因為很多地方是要追流程,時常追著追著斷掉,又或者是追錯地方,或者是關注錯重點,Default 的部分尤其難搞,因為參雜了非常多的狀況(e.g. Race condition, shutdown 等),結果變得非常難解析。光是讀就很辛苦了,想了想如果真的寫起來也是非常的多毛的,看著看著也不得不佩服 Kotlin Coroutines 的設計巧思了。

其實我最初就兩個問題,一個是 Coroutines 怎麼切換 thread ,一個是 Coroutines 怎麼排程,後來看著看著,想說來寫一篇 Dispatchers.IO 好了,寫著寫著想說連其他三個也一起寫下去好了,寫著寫著發現 Default 才是最基礎有排程的玩意,過往的認知有不少的錯誤,連帶的前面寫的文章都得修改,這可好玩了,就這麼看下去,昨晚是小生的生日,獨自一人吃著蛋糕配著酒,竄著竄著把文章給寫了。

如果有任何問題,或是看到寫錯字,請不要吝嗇對我發問或對我糾正,您的回覆和迴響會是我邊緣人我寫文章最大的動力。

--

--

Jast Lai
Jastzeonic

A senior who happened to be an Android engineer.