前篇:Android InputStream的阻塞問題(二)
fun main() = runBlocking {
var thread: Thread? = null
try {
val job = launch(Dispatchers.IO) {
thread = Thread {
try {
println("Thread started: ${Thread.currentThread()}")
Thread.sleep(500) // This should be interrupted after 1000ms
println("First sleep done")
Thread.sleep(1000) // This should not run
println("Second sleep done")
} catch (e: Exception) {
println("Thread was interrupted during sleep. $e")
}
}
thread!!.start()
thread!!.join()
}
withTimeout(1000) {
job.join()
}
} catch (e: Exception) {
thread?.interrupt()
println("Exception $e")
}
}
這是前一篇說到的 Thread.interrupt()
方案,雖然可以解決問題,但卻存在明顯的缺點
1. 每次調用都要創建新的線程。
2. 使用 Thread.interrupt()
中斷的方式不夠優雅。
3. 更複雜的執行緒管理應該使用 Executors.newSingleThreadExecutor()
等方法。
有更優雅的解決方案嗎?
其實有的,還記得這次的目標嗎? Serial Port 的 InputStream
上實現可取消的掛起。其實有個很簡單的方案:Channel
先分解這個問題本身,它分成了兩部分
1. InputStream
的接收,用來獲取新的數據
2. InputStream
的消費,使用獲取到的數據
而我們知道,InputStream
的接收是阻塞的,只有消費完全是可以取消的消費完全是我們自己控制,也就是說這是一個標準的"生產者-消費者模型",既然如此,Channel
就是這裡的完美選擇。
整個模型如下
1. 生產者: InputStream.read()
+ Channel.send()
2. 中介:Channel
3. 消費者:`也就是說,接下來的重點就是如何把 InputStream.read()
與 Channel.send()
結合,因為需求上每次的 Ack 都是獨立的,因此直接利用 readLine()
即可。InputStream
的接收本身可以視為冷流,那把它轉換成 Flow
也非常自然。
fun InputStream.toFlow() = bufferedReader()
.lineSequence()
.asFlow()
.flowOn(Dispatchers.IO)
Channel.send() 範例如下
CoroutineScope(Dispatchers.IO).launch {
inputStream.toFlow().collect {
ackChannel.send(it)
}
}
Channel.receive() 範例如下
val receiveAckJob = CoroutineScope(Dispatchers.IO).async {
withTimeoutOrNull(TIMEOUT_MILLIS) {
ackChannel.receive()
}
}
if (receiveAckJob.await() != null) {
response()
} else {
Log.e(TAG, "receiveAck: timeout")
}
這個設計成功的把 InputStream read()
的阻塞給隔絕在接收之外,不僅不必額外使用 Thread
,也能很簡單實現取消,可以說是沒有額外開銷,並且優美的使用了協程的各種特性來完成工作。
這次的問題是 InputStream read()
的阻塞,這是一個無法輕易取消的阻塞。在使用協程的同步寫法後,遇到這種會卡死的情況確實非常麻煩。但透過將阻塞部分封裝到其他地方,並利用可取消的方式讀取 buffer
,可以優雅地解決這個問題。 我們展示如何使用 Channel
和 Flow
將 InputStream.read()
的阻塞操作隔離開來,實現了更優雅、更有效率的解決方案。這種方法不僅避免了額外的執行緒開銷,還充分利用了協程的特性,達到了我們的目標。 希望這篇文章能幫助你更好地理解如何在協程中處理阻塞操作,並提供了解決類似問題的思路。