Kotlin Flows and Coroutines

Channels by Tom Doel

In the previous story on Kotlin Flows I’ve shown how they are designed¹ and one thing was missing from that description on purpose — there was no mention of either coroutines or channels. Indeed, the design of Kotlin Flows is based on suspending functions and they are completely sequential, while a coroutine is an instance of computation that, like a thread, can run concurrently with the other code.

Sequential flows

To get a better feel of the sequential nature of flows, take a look at the same example flow that emits ten integers with 100 ms delay between them:

Let us confirm that collecting it takes around a second:

But what happens if the collector is also slow and adds its own 100 ms delay before printing each element? Check it out:

It takes around 2 seconds to complete, because both emitter and collector are parts of a sequential execution here and it alternates between them:

Concurrent coroutines

Can we structure this execution so that the whole operation completes faster, changing neither emitter’s nor collector’s code? Yes, we can. We need to decouple the emitter and the collector — run the emitter in a separate coroutine from the collector, so that they can be concurrent. But with two separate coroutines we cannot simply emit the elements by a function call; we need to establish some communication between two coroutines. That is exactly what channels are designed to do². You can send the elements via a channel from one coroutine and receive them in another one, so that the whole execution would look like this:

This is a common communication pattern and it can be encapsulated into an operator on flows. Build-in produce builder from kotlinx.coroutines library makes this pattern especially easy to implement, since it combines launching a new coroutine and creating a channel, and consumeEach function pairs with it on the consumer side. We use coroutineScope so that concurrency is structured³ and to avoid leaking the producer coroutine outside of the scope:

Running the same emitter and collector code as before but with the above buffer() operator in between them gives the desired speedup in execution time:

Declarative programming

This example is a part of a larger story. Channels are low-level communication primitives with their roots in Hoare’s CSP (Communicating Sequential Processes) model that has been recently popularized by the Go programming language. They enable many concurrent communication patterns, but it is quite verbose and error prone to use channels directly to implement them.

In fact, the recent academic research corroborates this anecdotic knowledge:

Contrary to the common belief that message passing is less error-prone, more blocking bugs in our studied Go applications are caused by wrong message passing than by wrong shared memory protection.

On the other hand, programming with flows is declarative. Instead of manually launching coroutines and setting up channels between them, the corresponding patterns will be written and debugged once, and encapsulated into operators on flows.

Roadmap

Kotlin Flows are currently available in early preview in kotlinx.coroutines version 1.2.1. Their core design is going to stay, but there are various things that will change before their stable release, so you are not advised to use them just yet in the production code. We plan to provide a comprehensive set of general-purpose built-in operators, including something akin to the buffer() from the above example.

I’m getting one question quite often: “Are channels going to be deprecated when the flows are finalized?”. The purpose of this story was to show why the answer is a definite “No”. The channels are going to be used as a low-level implementation mechanism on which various flow operators, requiring communication between coroutines, will be based.