Android InputStream的阻塞問題(三)

Barr
5 min readJun 26, 2024

--

Photo by Robert Zunikoff on Unsplash

前篇:Android InputStream的阻塞問題(二)

fun main() = runBlocking {
var thread: Thread? = null
try {
val job = launch(Dispatchers.IO) {
thread = Thread {
try {
println("Thread started: ${Thread.currentThread()}")
Thread.sleep(500) // This should be interrupted after 1000ms
println("First sleep done")
Thread.sleep(1000) // This should not run
println("Second sleep done")
} catch (e: Exception) {
println("Thread was interrupted during sleep. $e")
}
}
thread!!.start()
thread!!.join()
}

withTimeout(1000) {
job.join()
}
} catch (e: Exception) {
thread?.interrupt()
println("Exception $e")
}
}

這是前一篇說到的 Thread.interrupt() 方案,雖然可以解決問題,但卻存在明顯的缺點
1. 每次調用都要創建新的線程。
2. 使用 Thread.interrupt()中斷的方式不夠優雅。
3. 更複雜的執行緒管理應該使用 Executors.newSingleThreadExecutor() 等方法。
有更優雅的解決方案嗎?

其實有的,還記得這次的目標嗎? Serial Port 的 InputStream 上實現可取消的掛起。其實有個很簡單的方案:Channel

先分解這個問題本身,它分成了兩部分
1. InputStream的接收,用來獲取新的數據
2. InputStream的消費,使用獲取到的數據

而我們知道,InputStream 的接收是阻塞的,只有消費完全是可以取消的消費完全是我們自己控制,也就是說這是一個標準的"生產者-消費者模型",既然如此,Channel 就是這裡的完美選擇。

整個模型如下
1. 生產者: InputStream.read() + Channel.send()
2. 中介:Channel
3. 消費者:`也就是說,接下來的重點就是如何把 InputStream.read()Channel.send() 結合,因為需求上每次的 Ack 都是獨立的,因此直接利用 readLine() 即可。InputStream的接收本身可以視為冷流,那把它轉換成 Flow也非常自然。

fun InputStream.toFlow() = bufferedReader()
.lineSequence()
.asFlow()
.flowOn(Dispatchers.IO)

Channel.send() 範例如下

CoroutineScope(Dispatchers.IO).launch {
inputStream.toFlow().collect {
ackChannel.send(it)
}
}

Channel.receive() 範例如下

val receiveAckJob = CoroutineScope(Dispatchers.IO).async {
withTimeoutOrNull(TIMEOUT_MILLIS) {
ackChannel.receive()
}
}
if (receiveAckJob.await() != null) {
response()
} else {
Log.e(TAG, "receiveAck: timeout")
}

這個設計成功的把 InputStream read() 的阻塞給隔絕在接收之外,不僅不必額外使用 Thread,也能很簡單實現取消,可以說是沒有額外開銷,並且優美的使用了協程的各種特性來完成工作。

這次的問題是 InputStream read() 的阻塞,這是一個無法輕易取消的阻塞。在使用協程的同步寫法後,遇到這種會卡死的情況確實非常麻煩。但透過將阻塞部分封裝到其他地方,並利用可取消的方式讀取 buffer,可以優雅地解決這個問題。 我們展示如何使用 ChannelFlowInputStream.read() 的阻塞操作隔離開來,實現了更優雅、更有效率的解決方案。這種方法不僅避免了額外的執行緒開銷,還充分利用了協程的特性,達到了我們的目標。 希望這篇文章能幫助你更好地理解如何在協程中處理阻塞操作,並提供了解決類似問題的思路。

--

--