How can we use CoroutineScopes in Kotlin?
Kotlin’s Coroutines allow the use of suspend functions, Channels and Flows and they all operate in the context of a so-called CoroutineScope. How can we tie it to the lifecycle management of our own components?
If you are not at least somewhat familiar with what a CoroutineScope
is, read this article by Roman Elizarov about Structured Concurrency first.
Lifecycle for Coroutines
A CoroutineScope
defines a lifecycle, a lifetime, for Coroutines that are built and launched from it. A CoroutineScope
lifecycle starts as soon as it is created and ends when it is canceled or when it associated Job
or SupervisorJob
finishes. When that happens, the CoroutineScope
is no longer active.
Any launch-
or async-
Coroutine built from a CoroutineScope
will, if it is still running, be canceled when itsCoroutineScope
lifecycle ends.
val scope: CoroutineScope = ...
...
scope.launch {
while(true) {
...
delay(100)
}
}
...
scope.cancel()
In the example above, calling scope.cancel()
will cause the launch
to get canceled as well (the call to delay(100)
will throw a CancellationException
as long as the code-execution is suspended there).
Controlling the lifecycle
We use a so-called top-most CoroutineScope
when we want to control the lifecycle of the Coroutines we launch, so that we can cancel them and handle any exceptions.
The most common way to do this, is to construct a top-most Coroutine ourselves. This can be done by just calling CoroutineScope(someCoroutineContext)
. Most often, the someCoroutineContext
is a dispatcher (e.g. Dispatchers.IO
or Dispatchers.Main)
plus a Job
or SupervisorJob
.
An example would be val scope = CoroutineScope(Dispatchers.Main + SupervisorJob())
or val scope = MainScope()
, etc.
Another way to construct a top-most one is to tie it to the lifecycle of another class of our own making, either by inheritance or by delegation/aggregation. See the two examples below.
// SomethingWithALifecycle is-a CoroutineScope
class SomethingWithALifecycle : CoroutineScope {
override val coroutineContext = Dispatchers.IO + SupervisorJob()
...
}// or //// SomethingWithALifecycle has-a CoroutineScope
class SomethingWithALifecycle {
val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
...
fun destroy() {
scope.cancel()
}
}
Living at the edge
A top-most CoroutineScope
is usually created at the edges of our application-world, which is full of side-effects, where our application interacts with the system’s environment and its components, such as the screen, the network or local storage and connected devices. Often, these components have a lifecycle, like a screen or view that a user enters (start of lifecycle) and can back-out of (end of lifecycle), or like a network-cache that is related to the logged in user (lifecycle ends when the user logs out).
At these edges, for each component that has a well defined lifecycle, our code will create a brand-new CoroutineScope
from which all asynchronous calls will be launch
ed.
A very common example of a piece of code, that lives on the UI-edge in the Kotlin world, is a ViewModel
that models an Android Activity or Fragment.
class MovieListViewModel(
private val dataSource: DataSource,
private val context: CoroutineContext = Dispatchers.Main
): ViewModel() {
private val scope = CoroutineScope(context + SupervisorJob())
...
init {
filter("*")
} fun filter(filter: String) {
scope.launch {
movieListData.value =
// getMoviesList is a 'suspend' fun
dataSource.getMovieList(filter).toUiResult()
}
} override fun onCleared() {
scope.cancel()
}
...
}
If the user leaves the screen before getMovieList(...)
returns a value, the system calls onCleared
and the ViewModel’s CoroutineScope
will be canceled. This will make sure that the movieListData.value
will not be modified, and the code will not attempt to update a no longer existing screen.
Every asynchronous task launched from or suspend
function called by getMovieList
will be canceled as well (as long as they are running within the provided top-most CoroutineScope
scope
, which is the default behavior). This is all guaranteed by the Structured Concurrency that a CoroutineScope
provides.
In other words:
All synchronous and asynchronous tasks run or launched by a
suspend
function will be canceled when theCoroutineScope
in which thesuspend
function is running is canceled, as long as they are running within that sameCoroutineScope
. No (asynchronous) task is orphaned and left running.
This is all wonderful; we don’t have to worry about canceling each and every asynchronous (sub-)task, as long as we cancel the top-most CoroutineScope. However, this deals only with asynchronous tasks that either just do something (launch
) or return a single result (async/await
or suspend fun
). How does this work with functionality that returns a stream of data? Let’s examine this, let’s see how a CoroutineScope
can be tied to a cold Flow
-stream or a hot Channel
-stream of data.
CoroutineScope and Flows
Flows are cold streams of data. A Flow starts each time collect
is called on it and it ends when the Flow ends normally or throws an exception.
01. val counterFlow: Flow<Int> = flow<Int> {
02. for (i in 1..10) { emit(i); delay(100) }
03. }
04.
05. ...
06.
07. val scope: CoroutineScope = ...
08. scope.launch {
09. counterFlow.collect { counter ->
10. ...
11. }
12. }
13.
14. ...
15.
16. scope.cancel()
The for-loop on line 02
starts as soon as collect
is called on line 09
, but not before.
The call to collect
suspends on line 09
and its block of code, the lambda, on line 10
is executed each time emit
is called on line 02
.
The call to collect
resumes normally after line 11
, after the Flow has ended normally, i.e. when the for-loop on line 02
has finished. It will throw an exception, if the code on line 02
had thrown an exception somehow.
As you can see, the creation of a Flow
does not require a CoroutineScope
, but the collection (subscription, in Rx terminology) does require it.
The code that produces the Flow (the code on line 02
) runs, therefore, in the scope of the code that calls collect
(on line 09
). This means that canceling the scope, on line 16
, not only cancels the suspending collect
call (line 10
), but cancels the Flow
itself (line 02
) as well.
This is all part of Structured Concurrency. The CoroutineScope
in which a Flow
operates is the one in which the collector/consumer of the Flow
runs. When the CoroutineScope
is canceled, not only the collector/consumer of a Flow but also the producer of values to a Flow is canceled. This ensures that all asynchronous code is cleaned up quite nicely.
CoroutineScope and Channels
Channels are hot streams of data. A Channel
can start producing values even if there are no consumers listening and can keep producing values after all its consumers have died (e.g. have been canceled). The producing ends, using the SendChannel
interface, and the consuming ends, using the ReceiveChannel
interface, can each run in a different CoroutineScope
.
Below is a simple example with just one producer and one consumer on a Channel, each running in a different scope:
01. val counterChannel: Channel<Int> = Channel<Int>(backPressure)
02.
03. val producerScope: CoroutineScope = ...
04. val producer = producerScope.launch {
05. for (i in 1..10) { counterChannel.send(i); delay(100) }
06. }
07.
08. ...
09.
10. val consumerScope: CoroutineScope = ...
11. val consumer = consumerScope.launch {
12. for (counter in counterChannel) {
13. ...
14. }
15. }
16.
17. ...
18.
19. producerScope.cancel()
20. ... or ...
21. consumerScope.cancel()
When the consumerScope
is canceled or when the consumer
(lines 12
– 14
) just decides to stop receiving data from the counterChannel
, the channel’s producer
(line 05
) just keeps going. If there are no active consumers receiving data from a channel, its producer(s) may just sit there and be suspended forever (at the call to send()
), depending on the back-pressure of the channel. If we are not careful, we may leak the asynchronous (sub-)tasks of the producer.
There is a handy extension function on a Channel
called consumeEach
. This function consumes and then closes the channel. When the consumeEach
resumes, either normally, by cancelation or through some exception, we can be assured that the producer has ended as well and the channel has been closed. See the modified code below:
01.
...
10. val consumerScope: CoroutineScope = ...
11. val consumer = consumerScope.launch {
12. counterChannel.consumeEach { counter ->
13. ...
14. }
15. }
...
When the consumerScope
is canceled in the modified example, the consumeEach
will close the counterChannel
and the producer
will be canceled as well. Hooray!
When the producerScope
is canceled, the producer
is canceled, but the counterChannel
is not closed. The consumer
just keeps running, waiting for data on the open channel that may never come.
Thankfully, there is a nice extension function on a CoroutineScope
called produce
. The CoroutineScope.produce
returns a ReceiveChannel
for its consumers and it makes sure that when the the enclosing CoroutineScope
finishes, the returned Channel is closed for sending. This will allow its consumers to exhaust/drain it and finish when that is done:
01.
02.
03. val producerScope: CoroutineScope = ...
04. val counterChannel = producerScope.produce(backPressure) {
05. for (i in 1..10) { send(i); delay(100) }
06. }
07.
...
When the producerScope
is canceled in the modified example, the counterChannel
will be closed with a CancellationException
. The consumeEach
drains any remaining items from the Channel and throws that same exception when it finishes. The consumer
will be canceled as well. Another Hooray!
Dealing with Channels and the lifecycles of the objects that use them is quite a bit more complicated than dealing with Flows. The extension functions CoroutineScope.produce
and ReceiveChannel.consumeEach
alleviate some of the issues, but we still have to be careful not to leak any asynchronous tasks related to them.
Switching CoroutineScopes
Let’s say we have this situation.
The UI has a screen that needs some data from the network. A bunch of requests are issued and then merged by business-logic. When all the data is received, it is cached into memory and onto local storage and finally shown on screen.
When the UI’s request takes a long time, for whatever reason, the user may abandon the screen. However, parts of the chain of requests may already be in flight and we’d like to just cache the data, even if it won’t appear on the screen at the moment, just in case the user comes back to the same screen later. We don’t want to cancel the entire chain of requests. We’d like to let them run to completion instead. We just won’t show it on the screen.
This situation can be handled by switching CoroutineScopes.
In our example scenario, the UI is running in a scope called uiScope
(see example below). Tying the launching of the network requests to the uiScope
is not a good idea in our situation, since we don’t want to cancel these network requests when the user leaves the screen. We only want to tie the showing of final result of the network requests on the screen to the uiScope
.
We’ll need a different scope for launching the network requests and caching the resulting data, a scope whose tasks can outlive the calling scope. We’ll create a class called CachedNetworkRepo
and give it its own CoroutineScope
(that basically could live as long as the entire application is alive).
class CachedNetworkRepo(context: CoroutineContext): Repo {
val repoScope = CoroutineScope(context + SupervisorJob())
...
override suspend fun getAndCachData(
key: Key,
request: suspend (Key) -> Result
): Result {
val data: Result? = cache[key]
return if (data != null) {
data
} else {
// Here we switch from the calling CoroutineScope
// to the 'repoScope'.
repoScope.async {
request(key).also { cache[key] = it }
}
// We're running in the calling CoroutineScope again.
.await()
}
}
...
fun destroy() {
repoScope.cancel()
cache.clear()
}
}...class ScreenViewModel(
private val network: Network,
private val repo: Repo,
context: CoroutineContext
) : ViewModel() {
private val uiScope: CoroutineScope = MainScope(context)
... val dataObs = ObservableData<UiData>() fun requestData() {
uiScope.launch {
// Here, the code runs in the uiScope
... val data: Result = repo.getAndCacheData(...) { key ->
// Here, the code runs in the repoScope!
...
network.getAllData(... ...)
} // The code runs in the uiScope again
dataObs.value = data.toUiData()
}
}
...
override fun onCleared() {
uiScope.cancel()
}
}
The screen observes the ScreenViewModel
’s dataObs
property which gets updated when the requestData()
receives the final result of the getAllData()
from the network
.
The requestData
function calls the repo
to get and cache the data from the network
: repo.getAndCacheData(…) { … }
.
While the network.getAllData()
is suspended (waiting for all the network responses to finish and to be merged), the user leaves the screen. The onCleared
is called, which cancels the uiScope
. This will cancel the repo.getAndCacheData
call itself — it will never resume and assign a value to dataObs.value
— but the call to network.getAllData()
will not be canceled. It won’t be canceled because it is not running in the uiScope
. It runs in the repoScope
of the repo
object, and that scope has not been canceled.
The network.getAllData()
will therefore resume normally and the repo
implementation will make sure the result is put into the cache
(see .also { cache[key] = it }
).
The await()
call in the getAndCacheData
function of CachedNetworkRepo
is running in the calling scope, i.e. the canceled uiScope
, and the call to await()
will immediately be canceled, causing the getAndCacheData
function to be canceled, which prevents the statement dataObs.value = data.toUiData()
to be called.
Great, this works wonderfully: When the calling CoroutineScope
finishes, the other CoroutineScope
, to which the asynchronous request was switched, keeps running.
Let’s flip it around. What happens if that other CoroutineScope
finishes, gets canceled? What would happen to the calling CoroutineScope
?
Say, in our example, the CachedNetworkRepo
gets destroyed, causing the repoScope
to get canceled. The code in the repoScope.async { ... }
is canceled and the call to await()
will throw a CancellationException
.
Remember; the call to await()
runs in the calling scope uiScope
. The CancellationException
will bubble up all the way to the top of the Coroutine stack, to the uiScope.launch { ... }
call. Luckily, a CancellationException
is not just any exception. This exception will only cancel and finish this particular Coroutine. It will not finish the entire uiScope
[Exceptional Exceptions for Coroutines made easy…?]. In other words; the calling Coroutine will be canceled, but the CoroutineScope in which it runs remains active.
Recap
I hope you learned a bit more how CoroutineScopes can be leveraged to manage the lifecycles of the asynchronous tasks of your application’s components. The fact that CoroutineScopes implement Structured Concurrency alleviates a lot of the burden dealing with cancelations, exception handling and avoiding runtime- and memory-leaks.
If you have any questions, let me know in the comments!
!!Have a nice Kotlin!!