Credit: https://kotlinlang.org/assets/images/twitter-card/kotlin_800x320.png

Playing with Kotlin in Android: coroutines and how to get rid of the callback hell

In this post I will talk about how Kotlin coroutines can allow you to get rid of the callback hell in your Android code (I mean… get rid of all the callbacks!). At the beginning, I will talk about asynchronous server calls and later I’ll go further showing you a possible way to remove callbacks also on the UI side.

At the time of writing, the available version of the coroutines library is 0.19.3 and the available version of Kotlin is 1.2.0-rc-39. These are the versions that I’ll use as a reference.

There’s also a GitHub repository with the source code related to this post (that code is actually a more advanced version of what’s explained here). Make sure you check it out. You can get it immediately if you prefer to follow the post while looking at the code or at the end to see how all the pieces explained here can be put together. To show examples of remote REST service calls, I’ve used the weather API at OpenWeatherMap and my app ID is bundled in the code to make it easier for you to test it immediately, but mine is a free account, so make sure you create your own account at OpenWeatherMap and replace WEATHER_API_APP_ID inside build.gradle with your own ID to avoid blocking the account because of too many requests to the API.

Asynchronous server calls with RxJava

Typically, nowadays, a common tool to deal with asynchronous operations in Android is RxJava. It’s commonly used together with Retrofit to handle server calls without blocking the main UI thread. RxJava is great for handling streams and chaining multiple asynchronous operations, but there are some issues:

  1. each chain of calls usually ends with a couple of callbacks set in the subscribe() method
  2. it’s hard to create modular code that can be composed in more complex flows and still keep it readable (this makes it hard to adapt to changing requirements of the app)
  3. due to the fluent API, the flow is hard to read and change when it’s very complex
  4. we usually never have to deal with real streams while getting results from a REST API because we usually simply deal with collections received as JSON objects, so using RxJava is not strictly necessary, especially given what Kotlin offers out of the box

These issues might not be really clear right now, so let’s take a closer look at each one of them to find out their meaning and later we’ll see how to solve them with the help of coroutines.

Issue 1: the callbacks in subscribe()

This is a typical snippet of a call to a REST service through RxJava without additional operations:

myRepository.myRemoteCall()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(callback::onSuccess, callback::onError)

We have two callbacks in subscribe(), one for the successful web request and another one for the error cases. Now, let’s say that the architecture of our app is MVP + Clean Architecture (but this applies to other architectures as well). The typical flow is like this:

View -> Presenter -> Use Case -> Repository -> REST Api
(...server call...)
View <- Presenter <- Use Case <- Repository <- REST Api

And to be more specific, the use case will have an execution method like this one:

fun execute(...parameters..., callback: Callback) {
myRepository.myRemoteCall(...parameters...)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
callback::onMyUseCaseSuccess,
callback::onMyUseCaseError)
}

And the callback passed to the use case will be implemented by the presenter that will end up having two methods like these:

override fun onMyUseCaseSuccess(result: Result)
override fun onMyUseCaseError(exception: Exception)

This means that for each use case we would have at least two methods in the presenter to handle the result (we could have more if the use case provides additional callback methods). Even if you’re using a different architecture, you’ll still need to have those callback methods somewhere.

Now let’s say that in the project you end up having about 100 use cases (this is a real world scenario). Can you imagine how many callback methods are going to pollute your presenters? How much do you have to scroll through your class when there’s a huge amount of callbacks everywhere to actually find what you’re looking for and understand what’s going on?

Issue 2: it’s hard to create modular code that can be composed

Use cases, in MVP + Clean Architecture, are a great way to create modular business logic that is easy to reuse across multiple presenters and it’s specific and small enough to be understandable and testable.

The main problem comes when you would like to compose the results of two or more different use cases. You know, you’ve made all your business logic nice, modular and clean, then suddenly the Product Owner comes to you and says:

We have a new requirement from the client… just in this specific condition, you should take the data that we’ve got from the server in the case A and send this modified version to the server like we do in case B, only if the result of case C is true.

And then you think:

I knew that something like this was going to happen sooner or later… It always happens at some point in the project… Now I need to combine what was implemented in the use cases A, B and C with some new logic just for a specific case that happens just in that screen only when the user does that thing after the other one and it’s going to be a pain… Why can’t my life as a developer be easier?

Let’s take a look at our use cases A, B and C:

UseCaseA
fun execute(...parameters..., callback: Callback) {
myRepository.myRemoteCallA(...parameters...)
.flatMap(...do something cool...)
.map(...do something cooler...)
.concatMap(...there's still something else to do...)
.zip(...I feel like this chain is never going to end...)
.filter(...oh, here is the last step, finally...)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
callback::onUseCaseASuccess,
callback::onUseCaseAError)
}
UseCaseB
fun execute(...parameters..., callback: Callback) {
myRepository.myRemoteCallB(...parameters...)
.map(...do something...)
.flatMap(...do something else...)
.flatMap(...do something more...)
.concatMap(...there's still something else to do...)
.zip(...and here we are in our last chained method...)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
callback::onUseCaseBSuccess,
callback::onUseCaseBError)
}
UseCaseC
fun execute(parameter, callback: Callback) {
Observable.from(parameter)
.reduce(...do something...)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
callback::onUseCaseCSuccess,
callback::onUseCaseCError)
}

You have at least three options to deal with the new requirement:

  1. write a new huge use case D that combines together the huge chain of RxJava operators of A, B and C to end up with the final result adding additional logic along the way to implement the new requirement so you’ll have just two new callback methods onUseCaseDSuccess() and onUseCaseDError()
  2. package the RxJava operators of A, B and C in separate methods without the subscriptions (just returning the Observables) and use them in the new use case D combined with some new logic so you avoid repeating the code, even though you still end up with two new callback methods onUseCaseDSuccess() and onUseCaseDError() plus three new methods that you need to put somewhere else for the RxJava operators chains of the single use cases so they can be reused inside A, B, C and D
  3. deal with the callback results of A, B and C in the presenter, add class fields to it with the results of the different callbacks so you can coordinate them in some way and then execute the use case D with the other results as parameters

It’s easy to understand that whatever option you choose it’s still not ideal and quite messy as well.

Issue 3: due to the fluent API, the flow is hard to read or change when it’s very complex

As the different requirements for the app get more and more complex, your RxJava operators chains might end up looking like this:

fun execute(...parameters..., callback: Callback) {
myRepository.myRemoteCall(...parameters...)
.flatMap(value -> {
if (...my complex condition with value is true...) {
return mySecondRepository.myComplexOperation()
.flatMap(...do something...)
} else {
return myThirdRepository.myOtherComplexOperation()
.map(...do something...)
}
})
.map(...do something...)
.concatMap(myOtherMethodThatContainsAHugeChainOfRxOperators)
.flatMap(anotherMethodWithManyRxOperators)
.zip(...do something...)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
callback::onUseCaseSuccess,
callback::onUseCaseError)
}
fun myOtherMethodThatContainsAHugeChainOfRxOperators(): Observable<...> {
return ...huge RxJava operators chain...
}
fun anotherMethodWithManyRxOperators(): Observable<...> {
return ...another huge RxJava operators chain...
}

This is a small example to make an RxJava operators chain look complex, but in a real world scenario it happens to deal with much longer chains that, even if split across multiple nice smaller chains with descriptive methods returning Observables, are still complex to manage and understand without making some mistakes (thankfully we have the unit tests that tell us if something got broken). And can you imagine how hard it could be to understand what’s going on for someone new in the team that just joined the project and is not familiar with the codebase scenarios? Another problem of the fluent API is that each method can use as input what the previous method in the chain returned as output, but sometimes it’s necessary to use the output returned much earlier in the chain and to access that we end up having nested chains of operators that make the flow even harder to understand and manage.

Issue 4: we usually never have to deal with real streams

Typically, in an Android app, we have to make calls to REST endpoints that give back some data in the JSON format and only when we’ve received the full JSON we can start processing it. That’s not a stream. We don’t need a stream. We wait for the full data, then we can start working with it. We might have collections of object decoded from our JSONs, but they’re not streams, just collections.

Given this and given that RxJava is a great tool when we need to deal with real streams handling the backpressure with some powerful logic to process everything nicely in a sequence of operations, we actually don’t really need it in almost all the cases in an Android app, especially because Kotlin offers out of the box a good set of operators on collections that you’ll find very familiar if you’ve been working with RxJava. Those operators allow us to use a fluent API when we find it more convenient while still giving us the freedom to write imperative code when it’s more manageable and readable (and this is exactly what coroutines will allow us to do as we’ll see later).

Asynchronous server calls with Kotlin coroutines

Here we are. Finally we’re going to take a look at how the previous issues could be addressed thanks to the Kotlin coroutines and how our life, as developers, can become easier.

The main benefits that we can get from the solution with coroutines are the following:

  • the code is easier to manage and amend (changing requirements for our app are not as hard as they were before)
  • the code is much more compact because we can get rid of all the callbacks
  • we don’t need to store anymore class fields in the presenter just because we needed some values across multiple callbacks since the callbacks are gone (we can do almost everything with simple variables local to the methods scopes)
  • if a person is new to the team working on the app, it will be easier to understand the existing codebase
  • we won’t be scared anymore about new super-specific-just-in-that-case requirements for the app that break our architecture because it will be easier for us to adapt to changes

Solving issue 1: the callbacks in subscribe()

Let’s start immediately with some code to see what a typical coroutine would look like in Android for our needs:

fun myCoroutine() {
launch(UI) {
val result = async(CommonPool) {
...do something asynchronous...
}.await()

myProcessingMethod(result)
}
}

In short, this is what’s happening in the above code snippet:

  • launch a new coroutine that executes in the Android UI thread (launch(UI) { … })
  • execute an asynchronous operation in a separate thread taken from a thread pool (async(CommonPool) { … }) and wait for the asynchronous operation to complete before moving forward (await())
  • while we’re waiting for the asynchronous operation to complete, our main Android UI thread is completely free because the coroutine inside launch() is suspended, not the UI thread
  • when the asynchronous operation inside async() completes, then the coroutine is resumed in the main UI thread and we get the result inside result (the coroutine restarts from here, where we previously suspended the execution)
  • myProcessingMethod() is executed with the result that we’ve previously computed asynchronously

As you can see, the flow of the coroutine looks exactly as the one of a common synchronous method, so we don’t have to deal with callbacks to get the result. Note that UI and CommonPool are provided directly by Kotlin libraries out of the box so they’re not mysterious objects (the first one is provided by kotlinx-coroutines-android and the second one by kotlinx-coroutines-core). I won’t go too much in depth in this post about their meaning and you can find out more in the excellent Kotlin coroutines docs. All you need to know here is that UI allows to execute any code automatically in the Android UI thread when it’s not executing in a separate thread while CommonPool gives us a thread pool to execute code in separate threads other than the UI one.

OK, nice! But can we make the code a little bit simpler given that we’ll probably be going to write the same things many times? Of course we can.

Let’s simplify launch(UI) { … } because in Android we’ll always want to execute the code in the main UI thread whenever it’s not executing in a background thread. We can have the following utility method to help us:

fun launchAsync(block: suspend CoroutineScope.() -> Unit): Job {
return launch(UI) { block() }
}

This code returns a Job that is given by launch() in case we want to cancel the coroutine for example. Now we can rewrite the original code snippet in this way:

fun myCoroutine() {
launchAsync {
val result = async(CommonPool) {
...do something asynchronous...
}.await()

myProcessingMethod(result)
}
}

Not a big improvement you might think, but we can do more of course. Let’s simplify async() { … }.

suspend fun <T> async(block: suspend CoroutineScope.() -> T): Deferred<T> {
return async(CommonPool) { block() }
}
suspend fun <T> asyncAwait(block: suspend CoroutineScope.() -> T): T {
return async(block).await()
}

Our new methods are marked with the suspend modifier to tell the compiler that they have the ability to suspend a coroutine and they must run within a coroutine context (like the context inside launch()). async() returns a Deferred object that allows us to wait for the result (with await()) or cancel the asynchronous operation if we want (with cancel()). The await() method waits for the asynchronous code to complete and returns the result returned by the block of code inside async() without any additional wrappers (this means that, for example, if the block returns an Int, then await() simply returns an Int). Our usual example snippet now can be rewritten like this:

fun myCoroutine() {
launchAsync {
val result = asyncAwait { ...do something asynchronous... }
myProcessingMethod(result)
}
}

Much more compact and easier to read. It looks totally familiar imperative code. Here you might think: why do we have two new methods async(block) and asyncAwait(block)? Couldn’t we have just created one? Well, good question and here is the answer: having two separate methods allows us to run asynchronous code in parallel or sequentially depending on our needs. Here’s how:

Sequential code
val result1 = asyncAwait { ...do something asynchronous... }
val result2 = asyncAwait { ...do something asynchronous... }
processResults(result1, result2)
Parallel code
val result1 = async { ...do something asynchronous... }
val result2 = async { ...do something asynchronous... }
processResults(result1.await(), result2.await())

Here’s what’s happening:

  • in the sequential code example, the first asynchronous block executes and we wait for it to complete to get result1, then we move on to the second asynchronous block and wait for it to complete to get result2 and only when both are done, we process our results
  • in the parallel code example, the first asynchronous block executes and returns immediately assigning a Deferred object to result1, then while the first block is still executing, we start the second asynchronous block that returns immediately as well and puts its own Deferred object inside result2, and when both the parallel asynchronous blocks have completed (we use await() for both the Deferred objects in result1 and result2 to wait for them to complete), then the results are processed

As you can see, it’s useful, depending on our needs, to have two separate async() and asyncAwait() methods in case we need to run the code sequentially or in parallel.

Now you might say: what happens if there’s an exception in the asynchronous code? How do we deal with it? Well, it turns out that it’s very simple and it looks like any synchronous imperative code you’ve always been writing. Here is an example:

fun myCoroutine() {
launchAsync {
try {
val result = asyncAwait { ...asynchronous operations...}
myProcessingMethod(result)
} catch (e: MyException) {
...deal with the exception...
}
}
}

Cool. Now we have new tools to write our business logic in the use case classes (or anywhere your architecture requires). Do you remember the previous simple use case execution method with RxJava? It was like this:

fun execute(...parameters..., callback: Callback) {
myRepository.myRemoteCall(...parameters...)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
callback::onMyUseCaseSuccess,
callback::onMyUseCaseError)
}

And now, with coroutines, we can rewrite it in this way:

suspend fun execute(...params...): Result {
return asyncAwait { myRepository.myRemoteCall(...params...) }
}

The presenter will look like this:

fun executeMyUseCase() {
launchAsync {
try {
val useCaseResult = myUseCase.execute(...params...)
myProcessingMethod(useCaseResult)
} catch (e: MyException) {
...deal with the exception...
}
}
}

Great! Issue 1 has been solved. No more callbacks waiting for the asynchronous code to complete.

Solving issue 2: it’s hard to create modular code that can be composed

Do you remember our nice new requirement for the project that forces us to combine the logic of three different use cases A, B and C? Well, we’re not scared anymore because now our use cases look like this thanks to coroutines:

UseCaseA
suspend fun execute(...params...): Result1 {
return asyncAwait { ...a lot of asynchronous things... }
}
UseCaseB
suspend fun execute(...params...): Result2 {
return asyncAwait { ...a lot of asynchronous things... }
}
UseCaseC
suspend fun execute(...params...): Result3 {
return asyncAwait { ...a lot of asynchronous things... }
}

If we want to combine the three use cases with some additional logic, we can create a fourth use case:

UseCaseD
suspend fun execute(...params...): Boolean {
val useCaseAResult = useCaseA.execute(...params for A...)

if (useCaseC.execute(useCaseAResult, ...other params...)) {
useCaseB.execute(useCaseAResult)
return true
}

return false
}

Since we don’t have callbacks anymore, everything is as easy as in imperative code. Issue 2 solved!

Solving issues 3 and 4: due to the fluent API, the flow is hard to read or change when it’s very complex and we usually never have to deal with real streams

Sometimes, while reading a very long chain of operators in a fluent API like in RxJava, I feel like I’m holding my breath until the end to see what’s finally going to happen. That chain could be split into multiple descriptive methods with smaller chains in them, but those methods must be composed in the main chain and once you’ve started with the fluent API, you can’t get out of it until the very end of the processing.

Well, this is basically mandatory to handle streams, so we don’t have many choices in that case, but in a typical Android app we never deal with real streams because we deal with collections instead. We get our JSONs from the server and we handle the results when we’ve got the full data and the connection to the server gets closed.

How could Kotlin coroutines be helpful in this case? As we’ll see, it’s not just the coroutines that help us with this, but them combined with the set of operators on collections that we get out of the box with Kotlin. Here the point is not replacing completely the fluent API, but giving us the freedom to break the flow when we think it might be easier to read or manage and have smaller pieces still working with the fluent API (just to avoid holding our breath until the full methods chain is completed).

This is an example of how the code could look like having coroutines combined with the operators on collections:

suspend fun execute(...parameters...): Result {
val valueA = myRepository.myRemoteCall()
    val valueB = valueA.map { ...do something... }
.flatMap { ...do something else... }
.reduceIndexed { ...do something else... }
    var valueC = methodWithAHugeChainOfOperators(valueB)
    ...do something with valueC...

return otherUseCase.execute(anotherMethodWithOperators(valueC))
}
suspend fun methodWithAHugeChainOfOperators(input: Type1): Result1 {
return ...huge chain of operators...
}
suspend fun anotherMethodWithOperators(input: Type2): Result2 {
return ...another huge chain of operators...
}

As we see, the point is the freedom that we have in breaking the fluent API whenever we feel that it might be easier to read or manage.

And we’ve solved also issues 3 and 4.

A quick recap on the different components of our app architecture with coroutines

Given the following architecture

View -> Presenter -> Use Case -> Repository -> REST Api
(...server call...)
View <- Presenter <- Use Case <- Repository <- REST Api

the different components of our app will look like this with coroutines:

View
presenter.myPresenterMethod()
Presenter
fun myPresenterMethod() {
launchAsync {
view.displayInProgressSpinner()

try {
val result1 = useCase1.execute(...parameters...)
val result2 = useCase2.execute(...parameters...)
view.displayResults(result1, result2)
} catch (e: MyException) {
view.displayError()
}

view.hideInProgressSpinner()
}
}
Use Case 1
suspend fun execute(...params...): Result1 {
return asyncAwait { myRepository.myRemoteCall1(...params...) }
}
Use Case 2
suspend fun execute(...params...): Result2 {
return asyncAwait { myRepository.myRemoteCall2(...params...) }
}
Repository
fun myRemoteCall1(...params...): Result1 {
return myRestApi.myRemoteCall1(...params...).execute().body()
}
fun myRemoteCall2(...params...): Result2 {
return myRestApi.myRemoteCall2(...params...).execute().body()
}
REST API interface (handled with Retrofit)
@GET("path1/path2/path3")
fun myRemoteCall1(@Query("p1") param1: String,
@Query("p2") param2: String): Call<Result1>
@GET("path4/path5")
fun myRemoteCall2(@Query("p3") param3: String): Call<Result2>

The main advantage over the RxJava solution is in the presenter. No callbacks at all and no need for class fields to store values and coordinate different callbacks because everything can be contained in one method that is easy to read.

Great. Is that all? Is it all about server calls? Of course not. At the beginning of this post I’ve promised you that I was going to show you something more to remove callbacks also on the UI side so here it is.

Kotlin coroutines in the UI: let’s get rid of the callbacks also there!

When do we have callbacks in the UI in an Android app? One common case are dialogs. Here’s what a typical dialog looks like in the view layer of our app in case we want a dialog where the user needs to select among multiple choices:

fun displayMyMultipleChoiceDialog() {
AlertDialog.Builder(this)
.setTitle(...string resource...)
.setMessage(...string resource...)
.setPositiveButton(...RETRY string..., {
dialogInterface: DialogInterface, _: Int ->
dialogInterface.dismiss()
presenter.retrySelected()
})
.setNegativeButton(...CANCEL string..., {
dialogInterface: DialogInterface, _: Int ->
dialogInterface.dismiss()
presenter.cancelSelected()
})
.setOnCancelListener {
presenter.cancelSelected()
}
.create()
.show()
}

I’ve highlighted a few parts in the previous code snippet to show you that this dialog is displaying two choices to the user, RETRY and CANCEL. Depending on the user choice, we tell the presenter that it needs to be handled in a different way. The methods retrySelected() and cancelSelected() in the presenter act as callbacks waiting for a user response.

The presenter would look like this:

fun myPresenterMethod() {
view.displayMyMultipleChoiceDialog()
}
override fun retrySelected() {
...do something...
}
override fun cancelSelected() {
...do something else...
}

Wouldn’t it be nice if we could display the dialog and wait for the response just in one method instead of breaking the flow in multiple methods? You’re lucky, because here I’ll show you how to do it thanks to Kotlin coroutines.

The new coroutine-enabled version of the dialog method in our view will look like this:

suspend fun displayMyMultipleChoiceDialog(): MyDialogResult {
lateinit var result: Continuation<MyDialogResult>
    AlertDialog.Builder(this)
.setTitle(...string resource...)
.setMessage(...string resource...)
.setPositiveButton(...RETRY string..., {
dialogInterface: DialogInterface, _: Int ->
dialogInterface.dismiss()
result.resume(MyDialogResult.RETRY)
})
.setNegativeButton(...CANCEL string..., {
dialogInterface: DialogInterface, _: Int ->
dialogInterface.dismiss()
result.resume(MyDialogResult.CANCEL)
})
.setOnCancelListener {
result.resume(MyDialogResult.CANCEL)
}
.create()
.show()

return suspendCoroutine {continuation -> result = continuation}
}

The view interface will provide the MyDialogResult type:

enum class MyDialogResult {
RETRY, CANCEL
}

And the presenter will look like this:

fun myPresenterMethod() {
launchAsync {
when (view.displayMyMultipleChoiceDialog()) {
RETRY -> ...do something...
CANCEL -> ...do something else...
}
}
}

Thanks to the changes in displayMyMultipleChoiceDialog(), we can now handle the dialog result directly without waiting for the response in different callback methods. Nice!

In the new version of the dialog method, we basically manually control when the coroutine continues and since all the dialog operations need to be executed in the main UI thread we don’t actually need to execute anything asynchronously in background threads.

With this simple pattern you could get rid of all the callbacks also in code that doesn’t need to execute asynchronously. That might turn out to be handy in many cases. For example, let’s see how we could apply this to the previous example with the remote server call:

fun myPresenterMethod() {
launchAsync {
view.displayInProgressSpinner()
        try {
val result1 = useCase1.execute(...parameters...)
val result2 = useCase2.execute(...parameters...)
view.displayResults(result1, result2)
} catch (e: MyException) {
when (view.displayRetryDialog()) {
RETRY -> myPresenterMethod()
CANCEL -> ...do something else...
}
}
        view.hideInProgressSpinner()
}
}

When the user decides to retry, we simply call again the same presenter method to repeat the operation.

Coroutines cleanup

You’ll need to make sure at some point that any asynchronous operation sill in progress is canceled. Here’s how you can do that with coroutines.

Remember that the standard coroutines method launch() returns a Job while async() returns a Deferred object. To cancel a coroutine, you just need to call cancel() on the Job, while if you don’t want to cancel the whole coroutine, but just the asynchronous operation in progress within the coroutine, then you’ll call cancel() on the Deferred object.

Can we create something automatic to cancel all the coroutines or all the asynchronous operations? Sure. Let’s say that you have a presenter and you want to make sure that you call just one method to cancel all the coroutines currently in progress (for example when the app is going into background). Here’s how we could handle this requirement:

val asyncJobs: MutableList<Job> = mutableListOf()

fun launchAsync(block: suspend CoroutineScope.() -> Unit) {
val job: Job = launch(UI) { block() }
asyncJobs.add(job)
job.invokeOnCompletion { asyncJobs.remove(job) }
}
fun cancelAllAsync() {
val asyncJobsSize = asyncJobs.size

if (asyncJobsSize > 0) {
for (i in asyncJobsSize - 1 downTo 0) {
asyncJobs[i].cancel()
}
}
}

This code redefines our utility method launchAsync() that we’ve seen before and automatically keeps track of each Job currently in progress. A Job is automatically removed from the list whenever it completes or if it’s canceled or an exception is thrown, so we can always simply call cancellAllAsync() whenever we want to cancel all the coroutines that are still in progress.

We can have something similar to cancel the single asynchronous tasks within a coroutine:

val deferredObjects: MutableList<Deferred<*>> = mutableListOf()

suspend fun <T> async(block: suspend CoroutineScope.() -> T): Deferred<T> {
val deferred: Deferred<T> = async(CommonPool) { block() }
deferredObjects.add(deferred)
deferred.invokeOnCompletion { deferredObjects.remove(deferred) }
return deferred
}

suspend fun <T> asyncAwait(block: suspend CoroutineScope.() -> T): T {
return async(block).await()
}

fun cancelAllAsync() {
val deferredObjectsSize = deferredObjects.size

if (deferredObjectsSize > 0) {
for (i in deferredObjectsSize - 1 downTo 0) {
deferredObjects[i].cancel()
}
}
}

Also in this case, we’ve redefined our utility method async() to keep track of all the running asynchronous tasks and we can cancel all of them with cancelAllAsync() that calls cancel() on each Deferred object of a task still in progress.

A quick note: as stated by the Kotlin coroutines docs, both the invokeOnCompletion() methods that we’ve just used should not be used by general application code because their main purpose is to make it possible for the coroutines library to work, but I haven’t found any other good alternative in the current version (0.19.3) of the library to achieve the same goal with different methods, so they’re used anyway here as a convenient way to achieve our result.

At this point you might think: what happens when I cancel a coroutine Job with running asynchronous tasks managed by multiple Deferred objects in it? Do they get canceled as well or do I need to cancel them manually? Good question. Here’s what happens:

  • you have a coroutine Job running, there are some asynchronous tasks started by async() inside it, you call cancel() on the Job, the currently running asynchronous tasks keep on running until they complete and once the context goes back to the coroutine it gets canceled (so the asynchronous tasks that were already running keep on running until they complete, just the coroutine is canceled, not the tasks inside it)
  • you want to cancel both the coroutine and the running asynchronous tasks inside it so what you need to do is call cancel() on both the coroutine Job and the Deferred objects of the asynchronous tasks inside the coroutine (in this way everything is canceled instantly)

Does it sound hard? Well, it’s not actually. Whenever you want to cancel everything instantly you just need to call our utility methods cancellAllAsync() for both the Jobs and the Deferred objects. That’s it.

Now you might have another question: how does the cancellation work? The answer is simple. Whenever you call cancel(), a CancellationException is thrown and your asynchronous code interrupts immediately because of the exception. The exception is thrown automatically for any suspending function defined in the Kotlin coroutines library (kotlinx.coroutines) because all those functions are cancellable. If you want to make your own coroutine cancellable, then you need to handle the isActive property that is available in every coroutine context. That property tells you if the coroutine is still active or has been canceled. This means that, in case you have a long computation within your coroutine, you’ll have to periodically check the value of isActive and throw a CancellationException whenever the coroutine is not active anymore (of course, you can also choose to handle the cancellation of the coroutine differently if you prefer and avoid throwing the exception if you don’t want it to be propagated). Each asynchronous task started by async() is in fact a coroutine as well (async() is one of the coroutines builders) so isActive is available also inside the asynchronous tasks.

asyncAwait {
...
if (!isActive) {
throw CancellationException()
}
...
}

Since CancellationException is a special exception that makes the cancellation work for coroutines, that means that we need to understand what to do when we handle the exceptions in our try-catch blocks. Basically, if we catch that exception and don’t let it flow through the coroutines code, there’s no way for the coroutines or asynchronous tasks down in the stack to cancel. Because of this, unless we really want to deal with the cancellation in a special way, we shouldn’t catch that exception and let it interrupt our asynchronous execution. Let me give you an example:

fun myCoroutine() {
launchAsync {
try {
asyncAwait { ...asynchronous operations... }
} catch (e: MyException) {
...deal with the exception...
}
        // This code is NOT EXECUTED in case of cancellation
... other coroutine code...
}
}

In this case we catch a specific exception so we let the CancellationException flow normally and interrupt the execution further down in the stack. Now check the following code:

fun myCoroutine() {
launchAsync {
try {
asyncAwait { ...asynchronous operations... }
} catch (e: Exception) {
...deal with the exception...
}
        // This code is EXECUTED in case of cancellation
... other coroutine code...
}
}

This code catches all the exceptions, so even CancellationException. Since the exception is not propagated down in the stack, the cancellation doesn’t happen for the other coroutine code.

How can we fix this in case we really want to catch the generic Exception? We could write something like this:

fun myCoroutine() {
launchAsync {
try {
asyncAwait { ...asynchronous operations... }
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
...deal with the exception...
}
        // This code is NOT EXECUTED in case of cancellation
... other coroutine code...
}
}

This makes sure that CancellationException is always propagated down in the stack. In case we had a finally block this would become trickier though. Can we avoid having to remember this thing every time we want to catch the generic Exception or when we also have a finally block? Yes, we can. We can write a utility method that filters CancellationException for us and makes sure it is propagated unless we explicitly state that we want to handle it manually.

suspend fun CoroutineScope.tryCatch(
tryBlock: suspend CoroutineScope.() -> Unit,
catchBlock: suspend CoroutineScope.(Throwable) -> Unit,
handleCancellationExceptionManually: Boolean = false) {
try {
tryBlock()
} catch (e: Throwable) {
if (e !is CancellationException ||
handleCancellationExceptionManually) {
catchBlock(e)
} else {
throw e
}
}
}

Here is how we can use the tryCatch() utility method:

fun myCoroutine() {
launchAsync {
tryCatch({
asyncAwait { ...asynchronous operations... }
}, {
when (it) {
is MyException -> ...handle MyException...
else -> ...handle any other exception...
}
})
        // This code is NOT EXECUTED in case of cancellation
... other coroutine code...
}
}

Since we didn’t specify in the utility method call that we want to handle CancellationException manually, that exception is filtered out automatically for us to make sure it’s propagated in the stack while we can still decide to handle any generic Exception in our code without worrying about throwing again CancellationException.

We can also create two other utility methods to handle the remaining try-catch-finally and try-finally cases.

suspend fun CoroutineScope.tryCatchFinally(
tryBlock: suspend CoroutineScope.() -> Unit,
catchBlock: suspend CoroutineScope.(Throwable) -> Unit,
finallyBlock: suspend CoroutineScope.() -> Unit,
handleCancellationExceptionManually: Boolean = false) {

var caughtThrowable: Throwable? = null

try {
tryBlock()
} catch (e: Throwable) {
if (e !is CancellationException ||
handleCancellationExceptionManually) {
catchBlock(e)
} else {
caughtThrowable = e
}
} finally {
if (caughtThrowable is CancellationException &&
!handleCancellationExceptionManually) {
throw caughtThrowable
} else {
finallyBlock()
}
}
}

suspend fun CoroutineScope.tryFinally(
tryBlock: suspend CoroutineScope.() -> Unit,
finallyBlock: suspend CoroutineScope.() -> Unit,
suppressCancellationException: Boolean = false) {

var caughtThrowable: Throwable? = null

try {
tryBlock()
} catch (e: CancellationException) {
if (!suppressCancellationException) {
caughtThrowable = e
}
} finally {
if (caughtThrowable is CancellationException &&
!suppressCancellationException) {
throw caughtThrowable
} else {
finallyBlock()
}
}
}

Coroutines testing

We always need to write unit tests for our code, so how can we test code that contains coroutines? Is there something different from all the other code? Well, it turns out that there isn’t much to keep in mind while testing coroutines. These are all the things that you need to do:

  • replace launch() with runBlocking() in your main app code
  • wrap each test method inside runBlocking() in the test classes

Since we’ve created our utility method launchAsync() that wraps launch() inside and we use it throughout our app code, all we need to do for testing is provide a different implementation of launchAsync() that calls runBlocking() inside so all our app code will automatically be using runBlocking(). The difference between launch() and runBlocking() is that the first one executes every asynchronous task in separate threads whenever we call async(), while runBlocking() makes everything execute in the current thread which is what we need for testing with Mockito for example.

So, here is how our utility method launchAsync() looks like in the main app code:

fun launchAsync(block: suspend CoroutineScope.() -> Unit) {
val job: Job = launch(UI) { block() }
asyncJobs.add(job)
job.invokeOnCompletion { asyncJobs.remove(job) }
}

And this is how its implementation looks like in the test package:

fun launchAsync(block: suspend CoroutineScope.() -> Unit) {
runBlocking { block() }
}

Now let’s take a look at a test method in our test classes:

@Test
fun myTestCase() {
runBlocking {
...usual test methods...
}
}

As you can see, the only difference is that everything is wrapped inside runBlocking(). There’s nothing else to keep in mind. All the testing code will be the same one that we’re used to write.

Conclusion

I hope you found this post interesting with new ideas to work with Kotlin coroutines. Make sure you check the GitHub repository with the code related to this post to see how to put all the pieces together and play with it to practice with Kotlin coroutines.

Like what you read? Give Andrea Bresolin a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.