筆記: Coroutine簡介(四) 使用Flows

Lin Li
9 min readSep 19, 2023

--

一、響應式編程(Reactive Programing)

在 Android 開發中,我們多年來一直採用 RxJava 來實現響應式編程,以便進行異步數據流操作。

什麼是響應式編程呢?

在 Android 開發的多年歷程中,RxJava 常被作為實現響應式編程的首選,主要用於異步數據流的處理。

那麼,響應式編程到底是什麼呢?

傳統上,在函數式編程中,我們會呼叫一個函數並期待獲得一個返回值。以一個日常例子來說,假如我們想閱讀報紙,通常我們會到鎮上買一份報紙,然後帶回家閱讀。然而,在響應式的情境下,我們會先與送報員建立聯繫,接著只需打開家門,等待報紙被送達。

簡單來說,我們只需「觀察」。當送報員送達報紙時,我們就開始閱讀;反之,我們則不會閱讀。

這就是響應式編程的精髓,即我們只觀察數據是否到來,並在數據到達後執行後續的操作。

儘管 RxJava 一直是 Android 開發中實現異步代碼邏輯的佳選,但 Flow API 的出現讓我們可以使用協程來達到同樣的目的。

事實上,作為一名在多個項目中使用過 RxJava 的開發者,我想說使用 Flow 和協程比起 RxJava 來說更容易學習和掌握。(這句話是老師的經驗,事實上老師也有RxJava的教學,只是筆者很鹹魚沒有去摸而已 >< 筆記太多整理不完,嗚嗚~~)

流是什麼?

流(Flow)是一個特殊的協程,能夠依序發出多個值。這意味著流有能力在一段時間內發射多個數據。

舉例來說,一個普通的返回 int 類型的函數,像這樣:

    fun getValue() : Int {
return 56
}

呼叫這個函數會得到一個(僅此一個)值56。但若是使用一個 int 類型的流,則會隨時間發射出多個 int 值。

二、簡單的例子:

class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//Producer 產生資料流
val myFlow = flow<Int> {
for (i in 1..100){
emit(i)
delay(1000L)
}
}

val textView = findViewById<TextView>(R.id.tvResult)

//Consumer
CoroutineScope(Main).launch {
//flow emit 的值
myFlow.collect{
// Log.i("LinLi", "Current index is $it: ");
textView.text = "Current index is $it: "
}
}
}
}

在這段程式碼中,我建立了一個名為 myFlow 的整數型流。這個流運行在一個迴圈中,每秒鐘發射一個介於1到100之間的整數,這個角色稱為生產者(Producer)。接著,我們可以收集(collect)這些發射出的數據並顯示在 textView 或日誌中,這個角色稱為消費者(Consumer)。當你運行這段程式碼,你會發現 textView 每秒鐘都會更新一次。

要注意的是,collect是一個掛起函數,所以我們需要在協程或是另一個掛起函數中調用。

三、在 Jetpack Compose 中嵌入流

由於這個筆記是在Jetpack Compose專題後,因此老師有說明如何在Compose中使用流。不太清楚何謂Compose的可以跳過這節。之後行有餘力應該會有Jetpack Compose的筆記…屆時會再補上連結。

直接new一個新的專案,使用此模板:

刪掉不相關的樣板程式碼,創建一個TextView,文字大小為25sp:

setContent {
FlowDemoWithComposeTheme {
// A surface container using the 'background' color from the theme
Surface(
modifier = Modifier.fillMaxSize(),
color = MaterialTheme.colors.background) {
Text(text = "Current is ...",
fontSize = 25.sp
)
}
}
}

然後建立一個與上面例子相同的myFlow:

val myFlow = flow<Int>{
for (i in 1..100){
emit(i)
delay(1000L)
}
}

接下來,我們要將這個流轉換為 Compose 可理解的狀態(State)。這裡我們使用 collectAsState 方法,並設定初始值為 1:

val currentValue by myFlow.collectAsState(initial = 1)

這個步驟是必要的,因為 Compose 是基於狀態驅動的 UI 框架,透過將流轉換為狀態,我們能將異步數據無縫地嵌入到 UI 中。

最後,將 Text 組件的內容更新為:

Text(text = "Current is $currentValue"

就可以達成與上面例子一模一樣的效果。

四、流與viewModel一起使用

我們將myFlow放到viewModel,並建立viewModel的實例後在Compose中使用。

要使用ViewModel() 需添加以下依賴(在官方compose的網頁中 可以看到完整的說明):

    // Optional - Integration with ViewModels
implementation 'androidx.lifecycle:lifecycle-viewmodel-compose:2.5.1'

建立MyViewModel,將myFlow程式碼移過去:

class MyViewModel : ViewModel() {

val myFlow = flow<Int>{
for (i in 1..100){
emit(i)
delay(1000L)
}
}
}

在MainActivity中:

setContent {
//創建viewModel實例
val viewModel = viewModel<MyViewModel>()
val currentValue by viewModel.myFlow.collectAsState(initial = 1)
FlowDemoWithComposeTheme {
// A surface container using the 'background' color from the theme
Surface(
modifier = Modifier.fillMaxSize(),
color = MaterialTheme.colors.background) {
Text(text = "Current is $currentValue",
fontSize = 25.sp
)
}
}
}

五、理解「背壓」(Back Pressure)與其應用

在數據流的世界裡,一個經常出現的問題是數據生成(生產)的速度超過其被消費的速度。這種現象被稱為「背壓」(Back Pressure)。當消費者的處理速度無法跟上數據的生成速度時,便產生了這種背壓現象。若在這樣複雜的環境中沒有適當的管理策略,可能會導致如阻塞錯誤或內存溢出等嚴重問題。

例如,若消費者需要暫停數據流以等待下一個事件的到來,可能會觸發阻塞錯誤。反之,如果生產者持續地將數據推送給消費者,而消費者卻無法及時處理,則可能導致內存溢出。

幸好在Kotlin語言中,Flow API 已經嵌入了多種機制來防範背壓問題。更具體地說,生產者會確保只在消費者準備好接收數據時才發送數據。此外,Flow API也提供了諸如 buffercollectLatest 等工具,讓我們能更靈活地管理背壓狀況。

建立背壓模擬環境

首先,在MyViewModel中創建一個backPressureDemo()函數,並建立一個名為myFlow1的數據流。這個流將每秒生成並發送一個整數,持續10秒。接著,使用viewModelScope進行數據收集:

        viewModelScope.launch {
myFlow1.collect {
delay(2000L)
Log.i("LinLi", "Consumed $it ");
}
}

要使用viewModelScope需添加依賴:

 implementation("androidx.lifecycle:lifecycle-viewmodel-ktx:2.6.2")

可以從Log中觀察到生產者和消費者的行為。這也驗證了我們之前提到的,Flow API能自動協助我們管理背壓。

在消費完才會生產

Buffer與CollectLatest

在某些情境下,我們可能希望即使消費者還未準備好,生產者也能繼續生成數據。這時,我們可以使用 buffer() 操作符:

  myFlow1.buffer().collect{
delay(2000L)
Log.i("LinLi", "Consumed $it ");
}

這樣可以確保生產與消費在不同的協程中獨立運行。如下圖的log:

生產與消費是獨立的

有些情況下我們只需要最新的值,例如顯示某場比賽的當前得分,此時就可以用collectLatest(),如以下這樣:

myFlow1.collectLatest {
delay(2000L)
Log.i("LinLi", "Consumed $it ");
}

如圖所示,會在最後一次生產後才收集:

只收集最新的值

下一篇文章應該也是協程筆記的最後一篇,會介紹Flow Operator以及狀態流、分享流。

--

--