How can we use CoroutineScopes in Kotlin?

Anton Spaans
The Startup
Published in
10 min readOct 7, 2019

--

Kotlin’s Coroutines allow the use of suspend functions, Channels and Flows and they all operate in the context of a so-called CoroutineScope. How can we tie it to the lifecycle management of our own components?

Coroutine Scope

If you are not at least somewhat familiar with what a CoroutineScope is, read this article by Roman Elizarov about Structured Concurrency first.

Lifecycle for Coroutines

A CoroutineScope defines a lifecycle, a lifetime, for Coroutines that are built and launched from it. A CoroutineScope lifecycle starts as soon as it is created and ends when it is canceled or when it associated Job or SupervisorJob finishes. When that happens, the CoroutineScope is no longer active.

Any launch- or async-Coroutine built from a CoroutineScope will, if it is still running, be canceled when itsCoroutineScope lifecycle ends.

val scope: CoroutineScope = ...
...
scope.launch {
while(true) {
...
delay(100)
}
}
...
scope.cancel()

In the example above, calling scope.cancel() will cause the launch to get canceled as well (the call to delay(100) will throw a CancellationException as long as the code-execution is suspended there).

Controlling the lifecycle

We use a so-called top-most CoroutineScope when we want to control the lifecycle of the Coroutines we launch, so that we can cancel them and handle any exceptions.

The most common way to do this, is to construct a top-most Coroutine ourselves. This can be done by just calling CoroutineScope(someCoroutineContext). Most often, the someCoroutineContext is a dispatcher (e.g. Dispatchers.IO or Dispatchers.Main) plus a Job or SupervisorJob.

An example would be val scope = CoroutineScope(Dispatchers.Main + SupervisorJob()) or val scope = MainScope(), etc.

Another way to construct a top-most one is to tie it to the lifecycle of another class of our own making, either by inheritance or by delegation/aggregation. See the two examples below.

// SomethingWithALifecycle is-a CoroutineScope
class SomethingWithALifecycle : CoroutineScope {
override val coroutineContext = Dispatchers.IO + SupervisorJob()
...
}
// or //// SomethingWithALifecycle has-a CoroutineScope
class SomethingWithALifecycle {
val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
...
fun destroy() {
scope.cancel()
}
}

Living at the edge

A top-most CoroutineScope is usually created at the edges of our application-world, which is full of side-effects, where our application interacts with the system’s environment and its components, such as the screen, the network or local storage and connected devices. Often, these components have a lifecycle, like a screen or view that a user enters (start of lifecycle) and can back-out of (end of lifecycle), or like a network-cache that is related to the logged in user (lifecycle ends when the user logs out).

At these edges, for each component that has a well defined lifecycle, our code will create a brand-new CoroutineScope from which all asynchronous calls will be launched.

A very common example of a piece of code, that lives on the UI-edge in the Kotlin world, is a ViewModel that models an Android Activity or Fragment.

class MovieListViewModel(
private val dataSource: DataSource,
private val context: CoroutineContext = Dispatchers.Main
): ViewModel() {
private val scope = CoroutineScope(context + SupervisorJob())
...
init {
filter("*")
}
fun filter(filter: String) {
scope.launch {
movieListData.value =
// getMoviesList is a 'suspend' fun
dataSource.getMovieList(filter).toUiResult()
}
}
override fun onCleared() {
scope.cancel()
}
...
}

If the user leaves the screen before getMovieList(...) returns a value, the system calls onCleared and the ViewModel’s CoroutineScope will be canceled. This will make sure that the movieListData.value will not be modified, and the code will not attempt to update a no longer existing screen.

Every asynchronous task launched from or suspend function called by getMovieList will be canceled as well (as long as they are running within the provided top-most CoroutineScope scope, which is the default behavior). This is all guaranteed by the Structured Concurrency that a CoroutineScope provides.

In other words:

All synchronous and asynchronous tasks run or launched by a suspend function will be canceled when the CoroutineScope in which the suspend function is running is canceled, as long as they are running within that same CoroutineScope. No (asynchronous) task is orphaned and left running.

This is all wonderful; we don’t have to worry about canceling each and every asynchronous (sub-)task, as long as we cancel the top-most CoroutineScope. However, this deals only with asynchronous tasks that either just do something (launch) or return a single result (async/await or suspend fun). How does this work with functionality that returns a stream of data? Let’s examine this, let’s see how a CoroutineScope can be tied to a cold Flow-stream or a hot Channel-stream of data.

CoroutineScope and Flows

Flows are cold streams of data. A Flow starts each time collect is called on it and it ends when the Flow ends normally or throws an exception.

01. val counterFlow: Flow<Int> = flow<Int> { 
02. for (i in 1..10) { emit(i); delay(100) }
03. }
04.
05. ...
06.
07. val scope: CoroutineScope = ...
08. scope.launch {
09. counterFlow.collect { counter ->
10. ...
11. }
12. }
13.
14. ...
15.
16. scope.cancel()

The for-loop on line 02 starts as soon as collect is called on line 09, but not before.

The call to collect suspends on line 09 and its block of code, the lambda, on line 10 is executed each time emit is called on line 02.

The call to collect resumes normally after line 11, after the Flow has ended normally, i.e. when the for-loop on line 02 has finished. It will throw an exception, if the code on line 02 had thrown an exception somehow.

As you can see, the creation of a Flow does not require a CoroutineScope, but the collection (subscription, in Rx terminology) does require it.

The code that produces the Flow (the code on line 02) runs, therefore, in the scope of the code that calls collect (on line 09). This means that canceling the scope, on line 16, not only cancels the suspending collect call (line 10), but cancels the Flow itself (line 02) as well.

This is all part of Structured Concurrency. The CoroutineScope in which a Flow operates is the one in which the collector/consumer of the Flow runs. When the CoroutineScope is canceled, not only the collector/consumer of a Flow but also the producer of values to a Flow is canceled. This ensures that all asynchronous code is cleaned up quite nicely.

CoroutineScope and Channels

Channels are hot streams of data. A Channel can start producing values even if there are no consumers listening and can keep producing values after all its consumers have died (e.g. have been canceled). The producing ends, using the SendChannel interface, and the consuming ends, using the ReceiveChannel interface, can each run in a different CoroutineScope.

Below is a simple example with just one producer and one consumer on a Channel, each running in a different scope:

01. val counterChannel: Channel<Int> = Channel<Int>(backPressure)
02.
03. val producerScope: CoroutineScope = ...
04. val producer = producerScope.launch {
05. for (i in 1..10) { counterChannel.send(i); delay(100) }
06. }
07.
08. ...
09.
10. val consumerScope: CoroutineScope = ...
11. val consumer = consumerScope.launch {
12. for (counter in counterChannel) {
13. ...
14. }
15. }
16.
17. ...
18.
19. producerScope.cancel()
20. ... or ...
21. consumerScope.cancel()

When the consumerScope is canceled or when the consumer (lines 1214) just decides to stop receiving data from the counterChannel, the channel’s producer (line 05) just keeps going. If there are no active consumers receiving data from a channel, its producer(s) may just sit there and be suspended forever (at the call to send()), depending on the back-pressure of the channel. If we are not careful, we may leak the asynchronous (sub-)tasks of the producer.

There is a handy extension function on a Channel called consumeEach. This function consumes and then closes the channel. When the consumeEach resumes, either normally, by cancelation or through some exception, we can be assured that the producer has ended as well and the channel has been closed. See the modified code below:

01.
...
10. val consumerScope: CoroutineScope = ...
11. val consumer = consumerScope.launch {
12. counterChannel.consumeEach { counter ->
13. ...
14. }
15. }
...

When the consumerScope is canceled in the modified example, the consumeEach will close the counterChannel and the producer will be canceled as well. Hooray!

When the producerScope is canceled, the producer is canceled, but the counterChannel is not closed. The consumer just keeps running, waiting for data on the open channel that may never come.

Thankfully, there is a nice extension function on a CoroutineScope called produce. The CoroutineScope.produce returns a ReceiveChannel for its consumers and it makes sure that when the the enclosing CoroutineScope finishes, the returned Channel is closed for sending. This will allow its consumers to exhaust/drain it and finish when that is done:

01. 
02.
03. val producerScope: CoroutineScope = ...
04. val counterChannel = producerScope.produce(backPressure) {
05. for (i in 1..10) { send(i); delay(100) }
06. }
07.
...

When the producerScope is canceled in the modified example, the counterChannel will be closed with a CancellationException. The consumeEach drains any remaining items from the Channel and throws that same exception when it finishes. The consumer will be canceled as well. Another Hooray!

Dealing with Channels and the lifecycles of the objects that use them is quite a bit more complicated than dealing with Flows. The extension functions CoroutineScope.produce and ReceiveChannel.consumeEach alleviate some of the issues, but we still have to be careful not to leak any asynchronous tasks related to them.

Switching CoroutineScopes

Let’s say we have this situation.

The UI has a screen that needs some data from the network. A bunch of requests are issued and then merged by business-logic. When all the data is received, it is cached into memory and onto local storage and finally shown on screen.

When the UI’s request takes a long time, for whatever reason, the user may abandon the screen. However, parts of the chain of requests may already be in flight and we’d like to just cache the data, even if it won’t appear on the screen at the moment, just in case the user comes back to the same screen later. We don’t want to cancel the entire chain of requests. We’d like to let them run to completion instead. We just won’t show it on the screen.

This situation can be handled by switching CoroutineScopes.

In our example scenario, the UI is running in a scope called uiScope (see example below). Tying the launching of the network requests to the uiScope is not a good idea in our situation, since we don’t want to cancel these network requests when the user leaves the screen. We only want to tie the showing of final result of the network requests on the screen to the uiScope.

We’ll need a different scope for launching the network requests and caching the resulting data, a scope whose tasks can outlive the calling scope. We’ll create a class called CachedNetworkRepo and give it its own CoroutineScope (that basically could live as long as the entire application is alive).

class CachedNetworkRepo(context: CoroutineContext): Repo {
val repoScope = CoroutineScope(context + SupervisorJob())
...
override suspend fun getAndCachData(
key: Key,
request: suspend (Key) -> Result
): Result {
val data: Result? = cache[key]
return if (data != null) {
data
} else {
// Here we switch from the calling CoroutineScope
// to the 'repoScope'.
repoScope.async {
request(key).also { cache[key] = it }
}
// We're running in the calling CoroutineScope again.
.await()
}
}
...
fun destroy() {
repoScope.cancel()
cache.clear()
}
}
...class ScreenViewModel(
private val network: Network,
private val repo: Repo,
context: CoroutineContext
) : ViewModel() {
private val uiScope: CoroutineScope = MainScope(context)
...
val dataObs = ObservableData<UiData>() fun requestData() {
uiScope.launch {
// Here, the code runs in the uiScope
...
val data: Result = repo.getAndCacheData(...) { key ->
// Here, the code runs in the repoScope!
...
network.getAllData(... ...)
}
// The code runs in the uiScope again
dataObs.value = data.toUiData()
}
}
...
override fun onCleared() {
uiScope.cancel()
}
}

The screen observes the ScreenViewModel’s dataObs property which gets updated when the requestData() receives the final result of the getAllData() from the network.

The requestData function calls the repo to get and cache the data from the network: repo.getAndCacheData(…) { … }.

While the network.getAllData() is suspended (waiting for all the network responses to finish and to be merged), the user leaves the screen. The onCleared is called, which cancels the uiScope. This will cancel the repo.getAndCacheData call itself — it will never resume and assign a value to dataObs.value — but the call to network.getAllData() will not be canceled. It won’t be canceled because it is not running in the uiScope. It runs in the repoScope of the repo object, and that scope has not been canceled.

The network.getAllData() will therefore resume normally and the repo implementation will make sure the result is put into the cache (see .also { cache[key] = it }).

The await() call in the getAndCacheData function of CachedNetworkRepo is running in the calling scope, i.e. the canceled uiScope, and the call to await() will immediately be canceled, causing the getAndCacheData function to be canceled, which prevents the statement dataObs.value = data.toUiData() to be called.

Great, this works wonderfully: When the calling CoroutineScope finishes, the other CoroutineScope, to which the asynchronous request was switched, keeps running.

Let’s flip it around. What happens if that other CoroutineScope finishes, gets canceled? What would happen to the calling CoroutineScope?

Say, in our example, the CachedNetworkRepo gets destroyed, causing the repoScope to get canceled. The code in the repoScope.async { ... } is canceled and the call to await() will throw a CancellationException.

Remember; the call to await() runs in the calling scope uiScope. The CancellationException will bubble up all the way to the top of the Coroutine stack, to the uiScope.launch { ... } call. Luckily, a CancellationException is not just any exception. This exception will only cancel and finish this particular Coroutine. It will not finish the entire uiScope [Exceptional Exceptions for Coroutines made easy…?]. In other words; the calling Coroutine will be canceled, but the CoroutineScope in which it runs remains active.

Recap

I hope you learned a bit more how CoroutineScopes can be leveraged to manage the lifecycles of the asynchronous tasks of your application’s components. The fact that CoroutineScopes implement Structured Concurrency alleviates a lot of the burden dealing with cancelations, exception handling and avoiding runtime- and memory-leaks.

If you have any questions, let me know in the comments!

!!Have a nice Kotlin!!

--

--

Anton Spaans
The Startup

Associate Director of Product Engineering at @AccentureSong. You can find me online @streetsofboston or at https://www.linkedin.com/in/antonspaans/