Kotlin Coroutines Flow 那一兩件事情

Jast Lai
Jastzeonic
Published in
37 min readJul 5, 2020

前言

上回我們聊到了 Channel ,他的概念比較偏近於 hot observable ,那這回咱們來聊聊概念接近 cold observable 的 Flow 吧。

自從 Kotlin Coroutines 開始被各種推廣後,他就一直被拿來跟很受歡迎的 Rxjava 做比較,那麼用起來最像的部分就是 Flow 了,因為 Rxjava 裏頭有個叫做 Flowable 的玩意,前面四個字母很像,不過,恩,兩者在使用取向很接近,但實際上用起來大相逕庭,雖然還是有幾分相似的影子在。

用它想要解決甚麼問題?

那老樣的,我們來問問使用 Flow 是想解決甚麼樣的問題,上一篇我們聊到了 Channel ,主要是想解決我想要某些 task 可以做到併發,但這些 task 需要某些必要資訊,而我卻不知道那些 task 必要的資訊會在這個 task 實作前或實作後得到,所以可以利用 Channel 的 send 和 receive 的 suspend 來解決。

Flow 的概念和 Channel 相似,使用他的情況會是,我有一串需要經過一段耗時流程得到的結果,但我不知道誰會用到他,甚至沒有必要被用到,所以就放在那邊,等有需要的人來取用。

舉例來說,我們假設這有一串數字明牌 9527 每一個數字都是需要經過耗時處理才能得到的:

suspend fun goodNumbers() {
listOf(9, 5, 2, 7).forEach {
delay(500)
channelForResult.send(it)
}
channelForResult.close()
}

這裡我用 Channel 來做傳接球會變成這樣:

fun main() {
runBlocking {
launch {
goodNumbers()
}
launch {//in same thread
for(i in channelForResult) {
println("play something $i")
}
}
}
}
// Result
play something 9
play something 5
play something 2
play something 7

原則上這不太會有問題,只是有的時候總是會發生一些光怪陸奇的事情,好比說,忘記 call goodNumbers() 這個 method 了:

fun main() {
runBlocking {
launch {//in same thread
for(i in channelForResult) {
println("play something $i")
}
}
}
}

那依照 Channel 的特性,除非 Scope 被終結,不然它會在那邊等,鑽石恆久遠一顆永流傳。

所以這種情況下,我們會預期把傳接球這個職責歸屬給一個物件,那就是 flow 了:

val flowSomething = flow {
listOf(9, 5, 2, 7).forEach {
delay(500)
emit(it)
}
}

fun main() {
runBlocking {
flowSomething.collect {
println("play something $it")
}
}
}
// Result
play something 9
play something 5
play something 2
play something 7

那根據上面的解釋,我們可以很明確的知道, Flow 與 Channel 最具體的差別是甚麼? Channel 比較偏向是一個溝通的管道,它通常是用來傳遞耗時過程後最後的結果,換句話說它只負責傳接球。至於 flow 則是會包含一整串耗時的流程,還有接收耗時流程的結果。

此外,Flow 還有個特性是,它的 collect 沒有被 invoke 是不會開始作動的,所以才會說 hot channel、 cold flow。

生命週期跟著 Scope

在使用上會注意到, Flow 的 collect 是個 suspend method ,那大家 Coroutines 也用多了,也知道 suspend method 必須要在 scope launch 的 block 內執行。

也就是說,不管 Flow 有沒有做完它分內的事情,只要 Scope ,或者 Scope launch 產生出來的 job 被 Cancel ,它也是很自然地會被終止的

fun main() {
scope.launch {
flowSomething.collect {
println("$it")
}
}
runBlocking {
delay(1100)
scope.cancel()
println("scope cancel")
delay(1000)
println("process finish")
}
}
// Result
9
5
scope cancel
process finish

各式的運算子(operators)

Flow 時常被拿來和 Rxjava 比較,原因大概是因為它有提供 operators 給開發者使用,自然 Flow 的 operators 總量是比不上 Rxjava 的(想想我那個填不完的坑),儘管如此 Operators 還是不少,那我這邊挑幾個比較常用的來講,知道這些 operators 應該就可以用來對付不是那麼複雜的情況了。

Map

這個 operators 大家都知道,將 flow 發送出來的資料轉為對應的結果:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

fun main() {
runBlocking {
demoFlow.map {
"become a string $it"
}.collect {
println(it)
}
}
}
//Result
become a string 9
become a string 5
become a string 2
become a string 7

Transform

跟 map 類似,只是將發送回來的結果當作是原料再另外發射出去,要注意是若是沒有 emit ,則不會有任何的結果。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

fun main() {
runBlocking {
demoFlow.transform<Int,String> {
emit("become a string 1 $it")
emit("become a string 2 $it")
}.collect {
println(it)
}
}
}
//Result
become a string 1 9
become a string 2 9
become a string 1 5
become a string 2 5
become a string 1 2
become a string 2 2
become a string 1 7
become a string 2 7

Take

常用 Kotlin 的 List 就會常常用到這個詞,在 flow 也是一樣的意思,就是只取一定的數量。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

fun main() {
runBlocking {
demoFlow.take(2).collect {
println(it)
}
}
}
//Result
9
5

filter

這個同樣也常常在 list 會用到,稍微提一下這個字彙的意思,中文「過濾」的詞意,常常會把以為是「過濾掉」- 把符合條件的過濾掉,但實際上這個 operator 是把不合條件的過濾掉,filterNot 才是把符合條件的過濾掉,這應該只是中文詞彙上的問題,看到 filter 這個詞把它直接當成名詞,當它是個篩子,篩子會留下符合條件的東西這樣比較恰當。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

fun main() {
runBlocking {
demoFlow.filter {
it
== 2
}.collect {
println(it)
}
}
}
//Result2

flowOn

說起來 flow 會怎樣切換 thread 呢?這是個有點哲學的問題。

基本上,flow 它是以 Coroutines 為基礎去延伸的東西,那很直覺得會想用 withContext 吧

val demoFlow = flow {
withContext(Dispatchers.IO) {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}
}

fun main() {
runBlocking {
demoFlow.collect {
println(it)
}
}
}

本來以為可以順利的把 9527 給印出來的時候,卻發現它噴了很多的 Exception:

顯然是不能這樣用的。

那樣到底要怎麼切換 thread 呢?使用 flowOn

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
log("emit")
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.flowOn(Dispatchers.IO)
.collect {
log("collect $it")
}
}
}
// Result :
[DefaultDispatcher-worker-1] emit
[DefaultDispatcher-worker-1] emit
[DefaultDispatcher-worker-1] emit
[DefaultDispatcher-worker-1] emit
[main] collect 9
[main] collect 5
[main] collect 2
[main] collect 7

可以看到 thread 被很精美的切換了。

說到 Rxjava 最反人類的 operators 是甚麼?不意外的話就是用得最頻繁但是字面意思又容易搞混的 subscribeOn 和 observeOn 了吧。

響應式套件不外乎都是以觀察者模式為基礎下去實作延伸的,那一般我們在觀察者模式會把被觀察者稱為 subject,觀察者則稱為 observer,只是 Rxjava 把 subject 稱為 observable ,這其實不是問題,畢竟觀察者觀察可觀察對象的並沒有甚麼不妥。

只是問題是 observeOn 跟 observable 很像,時常看不準就以為 observeOn 是控制 observable 執行的 thread,然後在訂閱 observable 時,使用的 operator 是 subscribe,從字面意思來看,subscribeOn 好像是控制訂閱對象的 thread ,但事實上 subscribeOn 是控制 observable 在哪個 thread 上執行, observeOn 是控制觀察者在哪個 thread 上執行,跟認知上剛好相反。

那在 flow 上,就比較沒有這個問題,flow 用一個 operator 控制它的 thread,就叫 flowOn,至於 collect 會在哪執行,從它是 suspend function 應該就可以很輕易推斷出來,它會執行在 invoke 它的 scope launch 的 thread上。

此外,它與 Rxjava 最大的不同就在於,Rxjava 在 operators 的設定上是 builder Pattern,而 Flow 則是以 flowOn 設定處得以上。

舉例來說:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
log("emit")
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.flowOn(customDispatcher)
.flowOn(Dispatchers.IO)
.collect {
log("collect $it")
}
}
}

val customDispatcher = object : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
Thread(block).start()
}
}

我自己開了一個 Dispatcher,然後我在 flow 上設定 flowOn,隨後又加了一個 Dispatchers.IO ,來看看會發生甚麼結果:

[Thread-0] emit
[Thread-0] emit
[Thread-0] emit
[Thread-0] emit
[main] collect 9
[main] collect 5
[main] collect 2
[main] collect 7

其實可以看得出來沒有任何東西跑在 Dispatchers.IO 上,這是因為兩個 flowOn 之間並沒有任何其他的 operators ,所以並不會有任何事情執行在 Dispatchers.IO。

所以可以看出 flowOn 的設定只對自己以上的對象。

另外還需要注意的一件事情是,設定不會去設定接下來的 operator。

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
log("emit")
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.flowOn(customDispatcher)
.transform {
log("transform")
emit(it)
}
.flowOn(Dispatchers.IO)
.map {
log("transform")
it
}
.flowOn(customDispatcher)
.collect {
log("collect $it")
}
}
}

val customDispatcher = object : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
Thread(block).start()
}
}

這樣的結果會是:

[Thread-4] emit
[Thread-4] emit
[Thread-4] emit
[Thread-4] emit
[DefaultDispatcher-worker-1] transform
[DefaultDispatcher-worker-1] transform
[DefaultDispatcher-worker-1] transform
[DefaultDispatcher-worker-1] transform
[Thread-5] transform
[Thread-5] transform
[Thread-5] transform
[Thread-5] transform
[main] collect 9
[main] collect 5
[main] collect 2
[main] collect 7

Buffering

Buffering 的意思是緩衝,雖說是這樣,但是這個 operators 有點意外的不好懂,Emm….這邊假設一個情況,flow 本身有耗時,而 collect 本身也有耗時,那一個 flow 所需的時間就會串在一塊變成 flow 本身的耗時時間 + collect 的耗時時間:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(1000)
emit(it)
}
}

fun main() {
runBlocking {
launch(Dispatchers.IO) {
for(i in 1..8) {
delay(1000)
println("$i 秒経過")

}
}
demoFlow
.collect {
delay(1000)
log("collect $it")
}
}
}
// Result
1 秒経過
2 秒経過
[main] collect 9
3 秒経過
4 秒経過
[main] collect 5
5 秒経過
6 秒経過
[main] collect 2
7 秒経過
8 秒経過
[main] collect 7

這樣變成了 flow 需要等 collect 做完才能做下一個 emit (概念與 Channel 的 send 沒人來收就在那邊等一樣),那這時加了 buffer,就不一樣了。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(1000)
emit(it)
}
}

fun main() {
runBlocking {
launch(Dispatchers.IO) {
for(i in 1..8) {
delay(1000)
println("$i 秒経過")

}
}
demoFlow
.buffer()
.collect {
delay(1000)
log("collect $it")
}
}
}
//Result
1 秒経過
2 秒経過
[main] collect 9
3 秒経過
[main] collect 5
4 秒経過
[main] collect 2
5 秒経過
[main] collect 7
6 秒経過
7 秒経過
8 秒経過

這樣就不需要等到 collect 做完才能發射下一個資料了。

Conflation

它的意思是合併,但是並不是把結果合併起來,它的意思更像是只取之後的結果。舉例來說,我 flow 發射每一個資料耗時需要一秒,在 collect 則會費時兩秒,那中間那一秒發射的項目就會被合併掉,那比較需要知道的概念是,它不是過濾,而是合併,所以沒有合併的對象就不會被消失。甚麼意思?以 9527 來說,9 是第一個發射的項目,也就是起始資料,所以必然會收到出來;5 因為是 9 在 collect 做耗時過程中被射出的,所以會被下一個射出的對象,也就是 2 給合併掉;那 7 是最後一個射出的對象,因為沒有下一個對象被射出來,所以最後也會拿到 7 ,因此最終的結果會是 957:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(1000)
emit(it)
}
}

fun main() {
runBlocking {
launch(Dispatchers.IO) {
for(i in 1..8) {
delay(1000)
println("$i 秒経過")

}
}
demoFlow
.conflate()
.collect {
delay(2000)
log("collect $it")
}
}
}
//Result
1 秒経過
2 秒経過
3 秒経過
[main] collect 9
4 秒経過
5 秒経過
[main] collect 2
6 秒経過
7 秒経過
[main] collect 7
8 秒経過

collectLatest

第一次用這個 operator 會覺得這個 operator 很玄…

字面意思上看,會覺得它只取最後一個收到的資料。但問題來了,對一個持續發射資料的響應式的結構而言,你怎麼知道哪一次發射的資料是最後一個資料?

那所以對這個 operators 而言,如果 collect 沒有需要耗時的過程,那每一個 flow emit 的資料都會是最後一個:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(1000)
emit(it)
}
}

fun main() {
runBlocking {
launch(Dispatchers.IO) {
for(i in 1..8) {
delay(1000)
println("$i 秒経過")

}
}
demoFlow
.collectLatest {
log("collect $it")
}
}
}
//Result
1 秒経過
[main] collect 9
2 秒経過
[main] collect 5
3 秒経過
[main] collect 2
4 秒経過
[main] collect 7
5 秒経過
6 秒経過
7 秒経過
8 秒経過

這樣不就跟 collect 一樣嗎?

所以我說很玄學,那我要這個 operator 幹嘛?

別急別急,這個情況是要應付 collect 也有耗時的情況,我們上一個提到 conflate ,概念與它類似,也就是說當 collect 有耗時時,就會取最後一個了:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(1000)
emit(it)
}
}

fun main() {
runBlocking {
launch(Dispatchers.IO) {
for(i in 1..8) {
delay(1000)
println("$i 秒経過")

}
}
demoFlow
.collectLatest {
delay(1100)
log("collect $it")
}
}
}
//Result
1 秒経過
2 秒経過
3 秒経過
4 秒経過
5 秒経過
[main] collect 7
6 秒経過
7 秒経過
8 秒経過

這其實沒甚麼問題,反正就是取代和被取代的差異就是了。只是以官方文件提供的 demo 非常的玄學,它把一個 print 放到了 collect 的 delay 前面,結果這麼玄學的,居然兩者都印出來了:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(1000)
emit(it)
}
}

fun main() {
runBlocking {
launch(Dispatchers.IO) {
for(i in 1..8) {
delay(1000)
println("$i 秒経過")

}
}
demoFlow
.collectLatest {
log("collect $it")
delay(2000)
log("latest collect $it")
}
}
}
//Result
1 秒経過
[main] collect 9
2 秒経過
[main] collect 5
3 秒経過
[main] collect 2
4 秒経過
[main] collect 7
5 秒経過
6 秒経過
[main] latest collect 7
7 秒経過
8 秒経過

Emm….畢竟是 Coroutines , suspend 前和 suspend 後的 method 可以分開看。

概念就是,flow 實際發送數值時,看 collect 有沒有在等耗時處理,有在等地就會直接覆蓋原本處理耗時流程這個作業接著等,沒在等地就印出來。

怎麼說呢?以 9 來說,在 flow 本身耗時等一秒後到了 collect 需要耗時兩秒才印出來,等兩秒的期間拿到了 5 ,那就不等原本的,等新的唄,所以重新開始等兩秒,接著是 2 接著是 7 ,直到拿到 7 的 collect 等完了拿到它的兩秒後,這才把 7 印出來。

Zip

就是把兩個 flow 包在一塊:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4").forEach {
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.zip(demoFlow2) { a, b ->
val result = "$a + $b"
result
}
.collect {
println(it)
}
}
}
//Result :
9 + 0
5 + 2
2 + 0
7 + 4

這邊要提一下 flowOn ,通常這樣設定沒有甚麼問題都會跑在我的 customDispatcher 上:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
log("demoFlow emit")
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4").forEach {
log("demoFlow 2 emit")
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.zip(demoFlow2) { a, b ->
val result = "$a + $b"
log(result)
result
}
.flowOn(customDispatcher)
.collect {
println(it)
}
}
}
//Result:
[Thread-1] demoFlow emit
[Thread-2] demoFlow 2 emit
[Thread-1] demoFlow emit
[Thread-4] demoFlow 2 emit
[Thread-3] 9 + 0
[Thread-3] 5 + 2
[Thread-5] demoFlow emit
9 + 0
[Thread-6] demoFlow 2 emit
[Thread-5] demoFlow emit
5 + 2
[Thread-7] 2 + 0
2 + 0
[Thread-8] demoFlow 2 emit
[Thread-10] 7 + 4
7 + 4

只是如果是這樣設定,flowDemo2,就會跑在 Dispatchers.IO 上:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
log("demoFlow emit")
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4").forEach {
log("demoFlow 2 emit")
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.zip(demoFlow2.flowOn(Dispatchers.IO)) { a, b ->
val result = "$a + $b"
log(result)
result
}
.flowOn(customDispatcher)
.collect {
println(it)
}
}
//Result:
[Thread-1] demoFlow emit
[DefaultDispatcher-worker-2] demoFlow 2 emit
[Thread-1] demoFlow emit
[DefaultDispatcher-worker-2] demoFlow 2 emit
[DefaultDispatcher-worker-2] demoFlow 2 emit
[DefaultDispatcher-worker-2] demoFlow 2 emit
[Thread-8] 9 + 0
9 + 0
[Thread-8] 5 + 2
[Thread-9] demoFlow emit
5 + 2
[Thread-9] demoFlow emit
[Thread-11] 2 + 0
2 + 0
[Thread-11] 7 + 4
7 + 4

再來要提到的 combine 也是同一個情況。

combine

基本上用法與 zip 一模一樣,那為何要分一個 combine 一個 zip ?

實際看一次結果就知道了:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4").forEach {
emit(it)
}
}

fun main() {
runBlocking {
demoFlow.combine(demoFlow2) { a, b ->
val result = "$a + $b"
result
}
.collect {
println(it)
}
}
}
//Result
9 + 0
5 + 0
5 + 2
2 + 2
2 + 0
7 + 0
7 + 4

這個結果比較難分析,簡單來說就是當一個 flow 發射資料,會把另一個 flow 最後一次發射的資料給帶上。

加個耗時會比較好理解:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(100)
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4").forEach {
delay(200)
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.combine(demoFlow2) { a, b ->
val result = "$a + $b"
result
}
.collect {
println(it)
}
}
}
//Result
9 + 0
5 + 0
2 + 0
2 + 2
7 + 2
7 + 0
7 + 4

9 + 0 : 由 flow 以及 flow2 同時發射
5 + 0 : 由 flow 發射,帶上 flow 2 最終發射的資料
2 + 0 : 由 flow 發射,帶上 flow 2 最終發射的資料
2 + 2 : 由 flow 2 發射,戴上 flow 最終發射的資料
7 + 2 : 由 flow 發射,帶上 flow 2 最終發射的資料
7 + 0 : 由 flow 2 發射,戴上 flow 最終發射的資料
7 + 4 : 由 flow 2 發射,戴上 flow 最終發射的資料

那這個特性也確保了,所有 flow 都會發射完畢,舉例來說 zip 在一個會射四個,一個會射五個的 flow :

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4", "10").forEach {
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.zip(demoFlow2) { a, b ->
val result = "$a + $b"
result
}
.collect {
println(it)
}
}
}
//Result
9 + 0
5 + 2
2 + 0
7 + 4

可以發現,最後得到的結果就沒有第五個結果了,這個即便加了耗時也會是一樣的。

但如果使用 combine 後結果就不一樣了

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4","10").forEach {
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.combine(demoFlow2) { a, b ->
val result = "$a + $b"
result
}
.collect {
println(it)
}
}
}
//Result9 + 0
5 + 0
5 + 2
2 + 2
2 + 0
7 + 0
7 + 4
7 + 10

可以發現每一筆資料都有確實地被發射。

flatMapConcat

恩,這個 operators 目前仍在 Flow Preview 的階段,可能很快就被 deprecated 了,雖然我覺得就算被 deprecated ,這個概念應該會以另一種形式留存,畢竟 Rxjava 有相似的作法,而且廣為人知。

Rxjava 有一個叫做 concatMap 的玩意,概念與這個類似,concat 就是 concatenate 的縮寫,意思就是串聯(Google 翻譯翻做級聯,聽起來很像某個公國的名稱)。那 concatMap 最終要回傳一個 observable ,flow 一樣,最後要回傳一個 flow。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(100)
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4").forEach {
delay(200)
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.flatMapConcat {
println("get flow $it")
demoFlow2
}.collect {
println(it)
}
}
}
//Resultget flow 9
0
2
0
4
get flow 5
0
2
0
4
get flow 2
0
2
0
4
get flow 7
0
2
0
4

這解果很明顯是這樣的, flow 發射了一個, flow2 就會發射四個。那可以發現,使用 flatMapConcat 是很精美的串聯,不管 flow 射得多快,

flatMapMerge

這個 operator 類似 Rxjava 的 flatMap,相對於 flatMapConcat , flatMapMerge 就是個沒有順序的傢伙,總而言之,就是併發。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(100)
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4").forEach {
delay(200)
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.flatMapMerge {
println("get flow $it")
demoFlow2
}.collect {
println(it)
}
}
}
//Result
get flow 9
get flow 5
0
get flow 2
0
get flow 7
2
0
2
0
0
2
0
2
4
0
4
0
4
4

flatMapLatest

Latest 很自然就是只取最後一個,如果 flow 沒有耗時,那原則上打出來的結果會跟 flatMapConcate 類似(只是 flatMapLatest 在 flat 之後會晚點發射),但如果加了耗時,就會只取最後一個,規則與 collectLatest 類似

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
delay(100)
emit(it)
}
}

val demoFlow2 = flow {
listOf("0", "2", "0", "4").forEach {
delay(200)
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.flatMapLatest {
println("get flow $it")
demoFlow2
}.collect {
println(it)
}
}
}
//Result
get flow 9
get flow 5
get flow 2
get flow 7
0
2
0
4

Exception

每次處裡異步,毛最多的就是處理 Exception 了,常常得為了了成功和失敗再包上一層,那使用 flow 不會有例外的情況,廢話當然會有,那解決的辦法呢?

直接包 try catch 阿。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
if(it == 2){
throw Exception("just a exception")
}
emit(it)
}
}

fun main() {
runBlocking {
try {
demoFlow.collect {
println(it)
}
}catch (e:Exception){
println("${e.message}")
}
}
}
//Result
9
5
just a exception

咦?這樣讓人想到用 async 直接在 async 外包一層 try catch ,會抓不到,那 flow 不會嗎?

不會,flow 可以抓到:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
if (it == 2) {
throw Exception("just a exception")
}
emit(it)
}
}

val demoFlow2 = flow {
listOf(9, 5, 2, 7).forEach {
if (it == 2) {
throw Exception("just a exception flow 2")
}
emit(it)
}
}

fun main() {
runBlocking {
try {
demoFlow.zip(
demoFlow2.flowOn(customDispatcher)
) { a, b ->
"$a $b"
}
.flowOn(Dispatchers.IO).collect {
println(it)
}
} catch (e: Exception) {
println("${e.message}")
}
}
}
//Result
9 9
5 5
just a exception

嘛,畢竟它不像 async 那樣是拿到其他地方去執行。

只是,某些情況,我就是不希望包上那麼多層,所以才用 flow 的,如果是這樣那我用一般的 Scope launch 就好啦?

是的 Flow 有一個 operator ,就叫 catch。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
if (it == 2) {
throw Exception("just a exception")
}
emit(it)
}
}

val demoFlow2 = flow {
listOf(9, 5, 2, 7).forEach {
if (it == 2) {
throw Exception("just a exception flow 2")
}
emit(it)
}
}

fun main() {
runBlocking {
demoFlow.zip(
demoFlow2.flowOn(customDispatcher)
) { a, b ->
"$a $b"
}
.flowOn(Dispatchers.IO)
.catch {
println(it.message)
}
.collect {
println(it)
}
}
}
// Result
9 9
5 5
just a exception

只是這個 Except 跟 flowOn 一樣,是不負責自己的下游的

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
if (it == 2) {
throw Exception("just a exception")
}
emit(it)
}
}

val demoFlow2 = flow {
listOf(9, 5, 2, 7).forEach {
if (it == 2) {
throw Exception("just a exception flow 2")
}
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.flowOn(Dispatchers.IO)
.catch {
println(it.message)
}.zip(
demoFlow2.flowOn(customDispatcher)

) { a, b ->
"$a $b"
}.collect {
println(it)
}
}
}

結果是這樣:

嘛,至少訊息很清楚。使用上需要注意,視情況可能需要將它放在最後面。

Completion

有的時候,我們會希望能夠知道這個 flow 已經把該發射的東西給射完了。

那可以用 try finally:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

fun main() {
runBlocking {
try {
demoFlow.collect {
println(it)
}
}finally {
println("it's done.")

}
}
}
//Result
9
5
2
7
it's done.

嘛,一樣的,我覺得外面再包一層 try 很醜,所以有提供一個 operator 叫 onCompletion。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.onCompletion {
println("it's done.")
}.collect {
println(it)
}
}
}
//Result
9
5
2
7
it's done.

這個 operator 還掛著 ExperimentalCoroutinesApi ,所以使用上需要注意。

另外 onCompletion 自帶一個 exception ,它是用來抓判斷 completion 是否是因為例外結束的:

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {

if (it == 2) {
throw Exception("it a exception.")
}
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.flowOn(Dispatchers.IO)
.catch { }
.onCompletion { cause: Throwable? ->
if (cause != null) {
println("it's A trap !!")
} else {
println("it's done.")
}
}.collect {
println(it)
}
}
}
//Result
9
5
it's A trap !!

要注意的是,沒有加 catch 或者是在外面包一層 try catch 是會噴錯的,因為 Exception 不是負責抓例外的。

此外 onCompletion 因為是結果,而不是設定,所以位置是可以換得,反正在該 flow 結束後,它都會被 invoke 。只是仍舊建議會把它放在尾巴,因為亂放,最後 invoke 的順序會不規則。

val demoFlow = flow {
listOf(9, 5, 2, 7).forEach {

if (it == 2) {
throw Exception("it a exception.")
}
emit(it)
}
}

fun main() {
runBlocking {
demoFlow
.onCompletion { cause: Throwable? ->
if(cause != null){
println("it's A trap !!")
}else {
println("it's done.")
}
}.flowOn(Dispatchers.IO)
.catch { }
.collect {
println(it)
}
}
}
//Result//Sometime
9
5
it's A trap !!
//Sometime
it's A trap !!
9
5
//Sometime
9
it's A trap !!
5

結語

總算到這了。又是一個邊刷 Netflix 邊寫文章的周末。

Flow 進到 1.3.7 版後,終於把一些好用的 operator 的 ExperimentalCoroutinesApi Annotation 拿掉了,這樣應該就可以安心去用它了。

以我自己的使用操作上,Flow 真的很多地方與 Rxjava 相當相似,我猜應該很多概念是抄…我說參照 Rxjava 的沒有錯。當然不免俗地說,畢竟 Flow 是以 Coroutines 為基礎,使用上仍然有很多 Coroutines 的玄學味道在,所以用 Rxjava 的邏輯去思考 flow,還蠻容易有意料外的想像。

記得兩三年前就人在問了,Flow 能不能替換掉 Rxjava ?畢竟 Rxjava 很大一包。

我自己的看法是,Rxjava 有很多很好用的 operator 是 Flow 沒有的,而且因為 Flow 較新,Rxjava 也會相對地較為成熟。但如果你使用 Rxjava 僅止於淺嘗,那換成 Flow 又何不是不可呢?

所以結論是,看需求,這幾乎是討論該不該使用這項工具的萬靈丹解答。

要說 Flow 能取代的掉東西其實不止 Rxjava,或許還有 LiveData,不過這又是另一個故事了。

如果有任何問題,或是看到寫錯字,請不要吝嗇對我發問或對我糾正,您的回覆和迴響會是我邊緣人我寫文章最大的動力。

參考文章

--

--

Jast Lai
Jastzeonic

A senior who happened to be an Android engineer.