Simple design of Kotlin Flow

Flow by Grant Tarrant

In a previous “Cold flows, hot channels” story¹ I’ve defined cold and hot data streams and shown a use-case for Kotlin Flows — cold asynchronous streams. Now let us peek under the hood, examine their design, and see how a combination of language features and a library enables a powerful abstraction with simple design.

A Flow in Kotlin is represented by an interface²:

interface Flow<out T> {
suspend fun collect(collector: FlowCollector<T>)
}

All there is to a flow is a single collect function that accepts an instance of FlowCollector interface with a single emit method:

interface FlowCollector<in T> {
suspend fun emit(value: T)
}

An emit name should sound familiar to a reader of “Cold flows, hot channels”. Indeed, there I’ve shown an example of the following flow definition:

val ints: Flow<Int> = flow { 
for (i in 1..10) {
delay(100)
emit(i) // <-- emit is called here
}
}

A signature of the flow builder also uses a FlowCollector interface as a receiver³, so that we can emit directly from the body of the corresponding lambda:

fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>

For a simple usage of a flow, when the flow is collected, like this:

ints.collect { println(it) } // takes 1 second, prints 10 ints

what happens is that an instance of FlowCollector is created based on the lambda passed to collect { … } function and this very instance is then passed to the flow { … } body.

Thus an interaction between a flow emitter and a flow collector is that of a simple function call — a call of emit function. If we mentally inline this function call, we can immediately understand what happens when we run this code — it is going to be equivalent to:

for (i in 1..10) {
delay(100)
println(i) // <-- emit was called here
}

Operators

A flow builder and a collect terminal operator is all we need to know to start writing operators that transform flows in a variety of ways. For example, a basic map operator that applies a specified transform to every emitted value can be implemented like this:

fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R) = flow {
collect { emit(transform(it)) }
}

Using this operator we can now do ints.map { it * it } to define a flow with squares of the original integers. Elements still flow from the emitter to the collector via function calls. There is simply one more function in between now.

Actually, kotlinx.coroutines library already defines map and a host of other general purpose operators as extensions on the Flow type, following extension-oriented design approach. What is important in this design, is that it is quite easy to define domain-specific operators. There is no distinction between “built-in” and “user-defined” operators — all operators are first-class.

Back-pressure

Back-pressure in software engineering is defined as the ability of a data consumer that cannot keep up with incoming data to send a signal to the data producer to slow down the rate of the data elements.

Traditional reactive streams design involves a back-channel to request more data from producers as needed. Management of this request protocol leads to notoriously difficult implementations even for simple operators. We do not see any of this complexity in the design of Kotlin flows, nor in the implementation of operators for them, yet Kotlin flows do support back-pressure. How come?

Transparent back-pressure management is achieved in Kotlin flows via the use of Kotlin suspending functions. You might have noticed that all functions and functional types in Kotlin flow design are marked with suspend modifier — these functions have a super-power to suspend execution of caller without blocking a thread. So, when collector of the flow is overwhelmed, it can simply suspend the emitter and resume it later when it is ready to accept more elements.

This is quite similar to back-pressure management in traditional thread-based synchronous data pipelines, where a slow consumer automatically applies back-pressure onto the producer by the virtue of blocking producer’s thread. Suspending functions take it beyond a single thread and into the realm of asynchronous programming, by transparently managing back-pressure across the threads without blocking them. But that is to be told in another story.


Further reading and footnotes

  1. ^ Cold flows, hot channels
  2. ^ Flow and related types and functions are still in preview as of version 1.2.1 of kotlinx.coroutines library. Read more here.
  3. ^ Function types in Kotlin
  4. ^ This is a slight simplification. It does not take into account additional checks to ensure preservation of context, but that topic is out of the scope of this story.
  5. ^ You can run this code via Kotlin Playground here.
  6. ^ Extension-oriented design
  7. ^ Reactive streams
  8. ^ Implementing operators for [RxJava] 2.0
  9. ^ Blocking threads, suspending coroutines