筆記: Coroutine簡介(二) Async與非結構化/結構化並發

Lin Li
11 min readSep 18, 2023

--

上一篇文章 簡單介紹了協程,這邊補充一下掛起函數。

一、掛起函數

    suspend fun  downloadUserData() {
for (i in 1..200000) {
withContext(Dispatchers.Main){
tvUserMessage.text = "Downloading user $i in ${Thread.currentThread().name}"

}
}
}

在這個例子中,withContext是一個掛起函數,我們不能在普通函數調用掛起函數,所以我們必須要在前面加上suspend修飾詞。

在協程中可以使用一般函數,也可以使用掛起函數,然而掛起函數只能在協程中或另一個掛起函數內使用。

什麼時候使用掛起函數呢?當你判斷這個任務主要是在 I/O 操作上可能會很耗時,比如網絡請求或文件讀寫,就可以加上這個修飾詞。

在 Kotlin 協程中,suspend 函數通常用於 I/O 操作(如網絡請求、文件讀寫等)或者其他可暫停的操作,而不是用於 CPU 密集型任務。這是因為:

1. I/O 等待:在 I/O 操作中,線程經常需要等待一個外部資源。在這種情況下,使用 suspend 函數會釋放佔用的線程,讓它去執行其他任務。

2. CPU 密集型任務:對於 CPU 密集型任務,線程實際上是在積極工作的,而不是在等待。在這種情況下,使用 suspend 並不會提供任何好處,因為它不會使 CPU 工作得更快。

Kotlin 協程 API 為我們提供了很多使工作變得更簡單的函數,它們幾乎全都是掛起函數。例如 withContext()withTimeout()、withTimeoutOrNull()join()delay()await()supervisorScopecoroutineScope

這些都是 Kotlin 協程 API 提供的一些掛起函數的例子。

不僅僅是協程庫,其他庫如 Room 和 Retrofit,也提供掛起函數以支援我們使用協程。

我們應該會在範例中使用以上一些掛起函數,如delay()、await()。

那掛起函數又是什麼呢?這裡不牽涉太深入的東西,當Kotlin的掛起函數反編譯為Java時,該函數會多一個Continuation 參數。Continuation是一個 Kotlin 介面,其中包含了所有需要恢復掛起函數所需的結構。

Continuation 是 Kotlin 協程中用於控制異步操作和掛起函數(suspend functions)的接口。當一個掛起函數暫停(suspend)執行時,其實就是將當前的運行狀態封裝成一個 Continuation 對象。這個對象知道如何在稍後的時間點「恢復(resume)」暫停的函數,並且它提供了一個用於恢復計算的 resume() 方法。

Continuation 通常會有兩個主要方法:

  1. resume(T):用於恢復協程的執行並將結果傳遞回去。
  2. resumeWithException(Throwable):用於在協程中拋出一個異常。

當掛起函數完成其異步操作後,相應的 Continuationresume()resumeWithException() 方法會被調用,以恢復協程的執行。

二、async

假設我有兩個方法,一個需要10秒,一個需要8秒,如果我們依序地運行這兩個方法(sequential decomposition),我們就必須花上18秒,但如果我們能同時運行,那我們就只需要花10秒。

class MainActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

CoroutineScope(Dispatchers.IO).launch{
Log.i("LinLi", "Calculation started: ");
val stock1 = getStock1()
val stock2 = getStoc2()
val total = stock1+stock2
Log.i("LinLi", "Total is: $total");
}
}

private suspend fun getStock1() : Int{
delay(10000)
Log.i("LinLi", "stock 1 returned ");
return 55000
}
private suspend fun getStock2() : Int{
delay(8000)
Log.i("LinLi", "stock 2 returned ");
return 55000
}
}
如圖所示,這樣花了18秒

將程式碼寫成平行下載這些數據並在最後將它們組合起來,這被稱為平行分解( parallel decomposition)。過去,平行分解並不是那麼容易。我們必須為此寫出複雜、冗長、難以閱讀、難以維護的程式碼。但有了 Kotlin 協程,我們可以非常容易地進行平行分解。

還記得之前提過的協程建構器對吧?我們可以用async建構器來簡單實現平行分解。async返回的是Differed物件,我們可以用await()來得到它的返回值。如果你還記得的話,await()本身是一個掛起函數。

CoroutineScope(Dispatchers.IO).launch{
Log.i("LinLi", "Calculation started: ");
val stock1 = async { getStock1() }
val stock2 = async { getStock2() }
//調用兩個async的返回值
val total = stock1.await()+stock2.await()
Log.i("LinLi", "Total is: $total");
}
改成這樣寫10秒就可以完成了

以上也可以改成這種寫法,這樣寫會在背景線程執行完畢後切回主線程並顯示結果:

CoroutineScope(Dispatchers.Main).launch{
Log.i("LinLi", "Calculation started: ");
val stock1 = async(Dispatchers.IO) { getStock1() }
val stock2 = async(Dispatchers.IO) { getStock2() }

val total = stock1.await()+stock2.await()
Toast.makeText(applicationContext,"Total is: $total",Toast.LENGTH_LONG)
.show()
}

三、非結構化/結構化並發

假設我們有一個UserDataManager的類,該類有一個getTotalUserCount()的方法,我們模擬它耗時1秒後取得50,然後將取得的數字顯示在textView上。

先來示範非結構化併發的錯誤寫法:


class MainActivity : AppCompatActivity() {
private lateinit var tvUserMessage : TextView

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
tvUserMessage = findViewById(R.id.tvUserMessage)

CoroutineScope(Dispatchers.Main).launch {
tvUserMessage.text = UserDataManager().getTotalUserCount().toString()
}
}
}
class UserDataManager {

suspend fun getTotalUserCount():Int{
var count = 0
CoroutineScope(IO).launch {
delay(1000)
count = 50
}
}

這裡小補充:可以看到我們使用了delay(),它是一個掛起函數,可以暫停協程(是不是想到Thread.sleep()呢?看起來很像實際上是不同的!delay()並沒有讓線程停止)。

運行以上程式碼會發現…顯示的是0而不是50。進去debug看一下…

很明顯看出協程並沒有運行就返回了

在這裡,這個協程作用域(coroutine scope)在主活動(main activity)的父協程中創建了一個新的協程,這個新的協程有自己獨立的行為。

因此,這個函數在協程完成之前就已經達到結束,並返回了這個 count 變數的值。

因為這個原因,我們得到的結果是0而不是50。這是非結構化並發(unstructured concurrency)的一個弱點。非結構化並發不能保證在返回之前完成掛起函數(suspending function)的所有任務。

咦?那如果用先前的async能解決嗎?如以下寫法:

class UserDataManager {

suspend fun getTotalUserCount():Int{
var count = 0
CoroutineScope(IO).launch {
delay(1000)
count = 50
}

//用async可以達到效果,但不是好方法,因為沒有辦法處理例外
val deferred = CoroutineScope(IO).async {
delay(3000)
return@async 70
}
return count+deferred.await()
}
}

此時我們運行會發現textView顯示的是70。但這不意味著這是好方法,因為這樣沒有辦法處理例外情形。

實際上,即使父協程完成後,子協程仍然可能在運行。

因此,這可能會導致不可預測的錯誤,特別是如果我們像剛才那樣使用 launch 協程構建器。

解決方法是使用小寫的coroutineScope。小寫的coroutineScope是掛起函數。當中的子協程受到MainActivity中的父協程控制,因此能保證在返回之前完成任務。

於是我們可以這樣寫:

class UserDataManager2 {
var count = 0
lateinit var deferred: Deferred<Int>
suspend fun getTotalUserCount(): Int {

coroutineScope {
launch(IO) {
delay(1000)
count = 50
}

deferred = async {
delay(3000)
return@async 70
}
}
return count+deferred.await()
}
}

所以這是推薦的最佳實踐。當多於一個協程時, 從 CoroutineScope 接口開始,使用Dispatchers.Main調度器。在掛起函數內部,使用 coroutineScope 函數,該函數以小寫的 'c' 開頭,以提供一個子作用域。

這就是結構化並發(structured concurrency)。結構化並發保證在暫停函數返回之前,完成子作用域內啟動的所有協程的工作。實際上,這個 coroutineScope 會等待子協程完成。不僅如此,這種結構化並發還有其他好處。

當出現錯誤、當拋出異常時,結構化並發保證通知調用者,所以我們可以輕易而有效地處理異常。我們也可以使用結構化並發來取消我們啟動的任務。如果我們取消整個子作用域,該作用域內部發生的所有工作都將被取消。我們也可以單獨取消協程。

下一篇文章會介紹其他的作用域,如viewmodel scope、lifecycle scope等等。

--

--