Coroutine - async concurrency in certain order
UPDATE : this should do the same work
(1..100)
.toList()
.map { coroutineScope {
async { it }
} }
.forEach {
println(it.await())
}
— — — — — — — — — — — — — — — — — — — — — — — — — — — — —
Recently, one of my coworker facing an issue, the spec is
first, call an api and get a byteArray as the result
second, decode the byteArray and return the output
third, load the byteArray as image by Glide
The original design is synchronous, if the fragment need contains lots of images to display, it will waste lots of time for I/O loading, but if we simply use async await, we couldn’t make sure the order the api to return, which cost the image disorder
The solution is using channel with async, channel is aim for make sure the order is right, and async is use for asynchronous api call and synchronous decode byteArray
The simpler version is below, this demonstrate a clean version of this design
function `callApi` will delay for 0.5 second then return int
function `decode` is synchronous, so I just skip this part with Log
variable `decodeResult` is a mutableList, which is the result my coworker needed
val channelApi = Channel<Deferred<Int>>(20)
val channelDecode = Channel<Int>(20)fun runTest(){
runBlocking {
launch {
//run on dispatcher.main since we will need to update liveData in real case
channelDecode.consumeEach {
decodeResult.add(it)
if (channelDecode.isEmpty) println(decodeResult.toString())
}
}
launch(Dispatchers.Default) {
// run on dispatchers.default since cpu is more suitable for decode
channelApi.consumeEach {
decode(it.await())
}
}
launch(Dispatchers.IO) {
// run on io, since io dispatcher is build for io job
repeat(20){
channelApi.send(async {
callApi(it)
})//do many asynchronous api request, send into channel in order
}
}
}
suspend fun decode(int:Int){
println("decode $int")
channelDecode.send(int)
}
suspend fun callApi(result:Int): Int {
delay(500)
println("api result $result")
return result
}
private fun closeChannel(){
channelApi.close()
channelDecode.close()
}}
Basically, async is similar as document, but the key part is the declare the channel type as Deferred< T>, and await it for result, the buffered for the channel also important, in here is set as 20, but this should be customize for your business logic and user equipment
val channelSourceImg = Channel<Deferred<DecryptImage>>(data.storeVipData.imageCount)
val channelDecryptImg = Channel<ByteArray>(data.storeVipData.imageCount)
val decryptedImages = data.decryptImg.toMutableList()viewModelScope.launch {
val decryptImages = data.decryptImg.toMutableList()
val sourceImages = data.storeVipData.image
launch {
channelDecryptImg.consumeEach {
decryptedImages.add(it)
if (decryptedImages.size == data.storeVipData.imageCount) {
vipCommentDecryptImages.value = decryptedImages
}
}
}launch(Dispatchers.Default) {
channelSourceImg.consumeEach {
val deferredData = it.await()
DecryptUtil.decryptImg(deferredData.image, deferredData.key)
?.let { decryptedImg -> channelDecryptImg.send(decryptedImg) }
}
}launch(Dispatchers.IO) {
for (i in decryptImages.size until data.storeVipData.imageCount) {
channelSourceImg.send(async {
DecryptImage(
storeRepo.getImageByteArray(sourceImages[i].url).byteStream().readBytes(),
sourceImages[i].key
)
})
}
}
}
override fun onCleared() {
super.onCleared()
channelApi.close()
channelDecode.close()
}private class DecryptImage(
val image: ByteArray,
val key: String
)
But why don’t we set the capacity of the channel as Channel.UNLIMITED?
Actually we could, but this is a design judgement, you have to make a decision for your business logic, there is no common answer. Most importantly, Channel.UNLIMITED actually sacrificed back-pressure ability of coroutine.
class ConcurrencyWithOrderHelper{
private val channelApi = Channel<Deferred<Int>>(20)
private val channelDecode = Channel<Int>(20)
suspend fun consumeDecodeResult(block:()->Unit){
channelDecode.consumeEach {
block.invoke()
}
}
suspend fun consumeApiResume(){
channelApi.consumeEach {
decode(it.await()){ apiResult ->
channelDecode.send(apiResult)
}
}
}
suspend fun asyncApiRequest(){
coroutineScope {
repeat(20) {
channelApi.send(async {
callApi(it)
})//do many asynchronous api request, send into channel in order
}
}
}
fun closeChannel(){
channelApi.close()
channelDecode.close()
}
}
fun runTest(){
runBlocking {
val helper = ConcurrencyWithOrderHelper()
launch {
//run on dispatcher.main since we will need to update liveData in real case
helper.consumeDecodeResult{
}
}
launch(Dispatchers.Default) {
// run on dispatchers.default since cpu is more suitable for decode
helper.consumeApiResume()
}
launch(Dispatchers.IO) {
// run on io, since io dispatcher is build for io job
helper.asyncApiRequest()
}
}
}
suspend fun decode(int:Int, block:suspend (Int) -> Unit){
println("decode $int")
block.invoke(int)
}
suspend fun callApi(result:Int): Int {
delay(500)
println("api result $result")
return result
}
And we can simply extract the coroutine part as a class, and pass lambda to define what you wanna do after decode.