Coroutine - async concurrency in certain order

(1..100)
.toList()
.map { coroutineScope {
async { it }
} }
.forEach {
println(it.await())
}
val channelApi = Channel<Deferred<Int>>(20)
val channelDecode = Channel<Int>(20)
fun runTest(){
runBlocking {
launch {
//run on dispatcher.main since we will need to update liveData in real case
channelDecode.consumeEach {
decodeResult.add(it)
if (channelDecode.isEmpty) println(decodeResult.toString())
}
}
launch(Dispatchers.Default) {
// run on dispatchers.default since cpu is more suitable for decode
channelApi.consumeEach {
decode(it.await())
}
}
launch(Dispatchers.IO) {
// run on io, since io dispatcher is build for io job
repeat(20){
channelApi.send(async {
callApi(it)
})//do many asynchronous api request, send into channel in order
}
}
}
suspend fun decode(int:Int){
println("decode $int")
channelDecode.send(int)
}

suspend fun callApi(result:Int): Int {
delay(500)
println("api result $result")
return result
}
private fun closeChannel(){
channelApi.close()
channelDecode.close()
}
}
val channelSourceImg = Channel<Deferred<DecryptImage>>(data.storeVipData.imageCount)
val channelDecryptImg = Channel<ByteArray>(data.storeVipData.imageCount)
val decryptedImages = data.decryptImg.toMutableList()
viewModelScope.launch {
val decryptImages = data.decryptImg.toMutableList()
val sourceImages = data.storeVipData.image
launch {
channelDecryptImg.consumeEach {
decryptedImages.add(it)
if (decryptedImages.size == data.storeVipData.imageCount) {
vipCommentDecryptImages.value = decryptedImages
}
}
}
launch(Dispatchers.Default) {
channelSourceImg.consumeEach {
val deferredData = it.await()
DecryptUtil.decryptImg(deferredData.image, deferredData.key)
?.let { decryptedImg -> channelDecryptImg.send(decryptedImg) }
}
}
launch(Dispatchers.IO) {
for (i in decryptImages.size until data.storeVipData.imageCount) {
channelSourceImg.send(async {
DecryptImage(
storeRepo.getImageByteArray(sourceImages[i].url).byteStream().readBytes(),
sourceImages[i].key
)
})
}
}
}
override fun onCleared() {
super.onCleared()
channelApi.close()
channelDecode.close()
}
private class DecryptImage(
val image: ByteArray,
val key: String
)
class ConcurrencyWithOrderHelper{
private val channelApi = Channel<Deferred<Int>>(20)
private val channelDecode = Channel<Int>(20)
suspend fun consumeDecodeResult(block:()->Unit){
channelDecode.consumeEach {
block.invoke()
}
}

suspend fun consumeApiResume(){
channelApi.consumeEach {
decode(it.await()){ apiResult ->
channelDecode.send(apiResult)
}
}
}

suspend fun asyncApiRequest(){
coroutineScope {
repeat(20) {
channelApi.send(async {
callApi(it)
})//do many asynchronous api request, send into channel in order
}
}
}
fun closeChannel(){
channelApi.close()
channelDecode.close()
}
}
fun runTest(){
runBlocking {
val helper = ConcurrencyWithOrderHelper()
launch {
//run on dispatcher.main since we will need to update liveData in real case
helper.consumeDecodeResult{

}
}
launch(Dispatchers.Default) {
// run on dispatchers.default since cpu is more suitable for decode
helper.consumeApiResume()
}
launch(Dispatchers.IO) {
// run on io, since io dispatcher is build for io job
helper.asyncApiRequest()
}
}

}
suspend fun decode(int:Int, block:suspend (Int) -> Unit){
println("decode $int")
block.invoke(int)
}

suspend fun callApi(result:Int): Int {
delay(500)
println("api result $result")
return result
}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store