Coroutines and RxJava — An Asynchronicity Comparison (Part 2): Cancelling Execution

Manuel Vivo
Capital One Tech
Published in
5 min readApr 11, 2018

--

Introduction

In this blog series I will compare Kotlin Coroutines and RxJava since they are both trying to solve a common problem in Android development: Asynchronous Programming.

In Part 1 we learned how to perform heavy computation tasks in the background. What if we want to interrupt that computation?

Part 2 is about Cancelling Execution.

What Does Cancel Execution Mean?

We want to be able to cancel the execution of a computation that has been created with RxJava or Coroutines. This computation can be asynchronous or not.

This is important in different use cases in Android development — the most common one might be when the View is going to be destroyed. If that happens, we might want to cancel the ongoing executions such as a network request, a heavy Object initialization, etc.

RxJava

As in Part 1, we’re going to omit the ability of transferring streams of elements. How can we cancel execution with RxJava?

Let’s imagine we create a timer with the interval operator.

Observable.interval(1, TimeUnit.SECONDS)

When you subscribe to this observable, the timer is going to kick off and it will send an event to the subscriber every second after the subscription.

How Can You Cancel That Timer?

When you subscribe to the timer(calling .subscribe()), it returns a Disposable object.

val disposable: Disposable = 
Observable.interval(1, TimeUnit.SECONDS).subscribe()

You can call dispose() on the Disposable object to cancel execution. The Observable will finish emitting items.

disposable.dispose()

That’s it! We have cancelled the asynchronous computation that the Observable had created.

Caveats

If you manually create your own Observable without using any create operator (like interval), you don’t need to handle the cancellation of the computation yourself.

This Observable is not ready to be cancelled. If we want that to happen, we need to check if the emitter is still subscribed before calling it.

Observable.create<Int> { emitter ->
for (i in 1..5) {
if (!emitter.isDisposed) {
emitter.onNext(i)
} else {
break
}
}

emitter.onComplete()
}

If the subscriber is no longer there, we can skip emitting the rest of the items. If we don’t do that, the code will keep running and ignoring the emitter.onNext(i) calls according to the Observable.create source code.

Coroutines

A Coroutine is an instance of a computation itself. Cancelling a Coroutine means stopping the execution of its suspending lambda.

We can cancel execution with the Coroutine Job, which is part of the Coroutine Context.

The Coroutine Job exposes a method to cancel the execution of the Coroutine. As we might expect, that method is called cancel().

For example, the Coroutine Builder launch returns the Job of the Coroutine that it creates.

val job = launch(CommonPool) {
// my suspending block
}

We can assign that to a value and call cancel.

job.cancel()

That was an example of getting the Job from the Coroutine and cancelling it. Can you do it in a different way? Yes, you can also specify the Job of a Coroutine. You can do that in multiple ways.

Some Coroutine Builders (e.g. launch and async) take a named parameter called parent with which you can set the Job for the Coroutine that will be created.

val parentJob = Job()async(CommonPool, parent = parentJob) {
// my suspending block
}
parentJob.cancel()

One of the benefits of this approach is that you can share that parentJob instance with multiple Coroutines, so when you call parentJob.cancel() you are going to cancel the execution of those coroutines that have parentJob as their Job.

This approach is similar to RxJava CompositeDisposable with which you can dispose multiple subscriptions at once.

val parentJob = Job()val deferred1 = async(CommonPool, parent = parentJob) {
// my suspending block
}
val deferred2 = async(CommonPool, parent = parentJob) {
// my suspending block
}
parentJob.cancel() // Cancels both Coroutines

NOTE: You should be careful when sharing Jobs between different Coroutines. When you cancel a Job, you need to reassign it. You cannot a start another Coroutine with that Job, you will have to create a new one.

When you cancel a Job, you need to reassign it.

Another way of doing this is by combining Coroutine Contexts. You can use the plus operator to do this.

val parentJob = Job()launch(parentJob + CommonPool) {
// my suspending block
}
parentJob.cancel()

In this case, the result Coroutine Context of that Coroutine is the combination of parentJob and CommonPool. The threading policy will be defined from CommonPool and the Job value from parentJob.

If you want to learn more about combining contexts you can read this part of the Kotlin Coroutines documentation.

Caveats

Just like RxJava, you have to consider cancellation in Coroutines.

val job = launch(CommonPool) {
for (i in 1..5) {
heavyComputation()
}
}
job.cancel()

If we try to execute this code, it will repeat the heavy computation 5 times since the code is not ready to be cancelled.

How can we improve it?

The same way we checked if the subscriber was present in RxJava, we need to check if the Coroutine is active.

val job = launch(CommonPool) {
for (i in 1..5) {
if (!isActive) { break }
heavyComputation()
}
}
job.cancel()

isActive is a an internal variable that can be accessed inside a Coroutine (the coroutineContext is another variable).

Some suspending functions available in the Standard Coroutines library handle cancellation for us. Let’s take a look at delay.

val job = launch(CommonPool) {
doSomething()
delay(300) // It’s going to cancel at this point
doSomething()
}
job.cancel()

Delay is a suspending function that can handle the cancellation for us. However, if you use Thread.sleep instead of delay, since it’s blocking the thread and not suspending the coroutine, it won’t cancel it.

val job = launch(CommonPool) {
doSomething()
Thread.sleep(300) // It’s NOT going to cancel execution
doSomething()
}
job.cancel()

Thread.sleep doesn’t cancel the execution for us. It’s not even a suspending function! That coroutine won’t be cancelled even though we called job.cancel().

Thread.sleep is not something you should use in this instance. If you really really need to, a way of cancelling that coroutine would be checking if it’s active just before and after the thread blocks.

val job = launch(CommonPool) {
doSomething()
if (!isActive) return
Thread.sleep(300) // It’s NOT going to cancel execution
if (!isActive) return

doSomething()
}
job.cancel()

What’s Coming Next?

The third part of this series will be about transferring stream of elements.

What are the differences between Observable and Channels? Subjects and Broadcast Channels? Don’t miss the third part next week.

More Education

Did you miss Coroutines and RxJava comparison Part 1?

You want to learn more about Kotlin? Check out this article!

Thanks for reading,

Manuel Vicente Vivo

--

--