Kotlin Coroutines in Android — Part 11

Channels

Andrea Bresolin
Kin + Carta Created
4 min readApr 5, 2019

--

In classic asynchronous programming, we might have independent threads that share mutable objects to communicate with each other and use different techniques to be in sync at specific points during the execution and avoid race conditions.

Coroutines have an additional way of communicating with each other. They can use channels. A channel allows a coroutine to send some items to another coroutine that can suspend until new items are available.

Simple channel

Let’s assume that in our Android app we have a use case that needs to send multiple items as a stream at different times.

The executeAsync() method accepts a channel as an argument. It uses the SendChannel interface because it’s just interested in sending items through the channel and not receiving them. The method itself runs independently in a background thread and doesn’t return a final result at the end so we can just specify the return type as Deferred<Unit> and end the method with return@backgroundTaskAsync. When there are no more items to send, we close the channel by invoking close(). Note that the channel’s send() method is a cancellable suspend function and this means that our executeAsync() method is cancellable as well.

Inside our Presenter/ViewModel we want to receive the items from the use case and we want to suspend until new items are available. This is how we can achieve the desired behaviour:

We create a channel that can manage items of type T, we pass it to the use case and then we use the for loop on the channel itself to receive new items. The for loop on the channel is actually a suspend function so it suspends automatically the coroutine started by uiJob until there’s a new item to process. The for loop completes when the channel is closed. At the end we also wait for the executeAsync() method to complete by invoking await() on its Deferred.

Backpressure

If you’ve been using RxJava, you’re probably familiar with the idea of backpressure. How is it handled in the coroutines world?

While receiving the items from a channel, the receiver coroutine suspends automatically if there are no items. A similar thing happens for the sender. If the receiver has not processed an item yet, the sender coroutine is suspended until the item is processed so it can’t send other items until the last one has been processed.

When creating a channel, we can also specify an optional capacity. In that case, the sender won’t suspend until there’s still capacity to accept new items in the channel’s buffer. This allows a bit of flexibility between the sender’s and receiver’s processing speeds.

In some cases, we might need to discard the items that can’t be processes in time by the receiver because we don’t want to block the sender from sending new items as they become ready. We basically might want to handle the backpressure by discarding the received items when they come too fast for the speed of the items processor. We can achieve this behaviour with select.

This is a modified version of the previous use case that makes use of select:

Each SendChannel has an onSend clause that can be used with select. In case itemsChannel is free to receive additional items, then the itemsChannel.onSend clause is selected, while in case itemsChannel can’t receive any more items at the moment because its buffer is full and its receiver hasn’t finished processing all the previous items yet, then the backupChannel.onSend clause is selected. If also backupChannel is full, then select suspends until one of the channels becomes available. You’ve probably noticed that there’s an empty block of code after each onSend. We don’t use it in this example, but that block of code is executed when the corresponding clause is selected. It also has a reference to its channel as a parameter if we need it.

Now that we have support for two channels in the use case, this is how the Presenter/ViewModel looks like:

We have two channels. primaryChannel handles the items that we want to process, while backpressureChannel handles all the items that we’ve received too fast for our processing speed. The two channels are handled in independent background threads because the suspension of one must not affect the other. backpressureHandler might decide to simply discard the items or handle them in a different way.

What’s next

We had an introduction to channels. There’s more to explore about them, but this is a good start. In the next part of this series, we’ll talk about testing to see what we need to keep in mind while testing code that uses coroutines.

Get the source code

The source code for this series can be found on GitHub. It’s an example Android project that covers multiple cases. Download it and play with it.

Missed the other parts of this series?

If you’ve missed the other parts of the Kotlin Coroutines in Android series, take a look at the introduction and check the full list of topics.

--

--