Reactive Streams and Kotlin Flows

Photo by Darron Birgenheier

Reactive Extensions¹ (ReactiveX or Rx for short) were initially created by Erik Meijer for .NET and were revealed to the public in 2010. It was a new approach to API for asynchronous data streams that generalized observer pattern with callbacks for emitted elements (onNext), stream completion (onCompleted), and error (onError), and introduced stream-processing operators like map and filter that made working with streams of data as easy as working with collections.

Visitor-based Rx offered better performance than traditional iterator-based data processing APIs. Moreover, Rx had embraced the idea of “cold streams”. That was quite a novelty in a world of mainstream imperative programming languages of the time where most data processing APIs were “hot”. Hot streams have all the pains of resource management (once you open it, you must not forget to close it), while cold streams provided an elegant relief of this chore².

Rx adoption grew and it was ported to many programming languages including Java, thus reaching the biggest programming ecosystem — JVM. Rx port for Java called RxJava appeared in 2013. Version 1.0 was released in 2014.

Reactive streams

Meanwhile, Akka and Project Reactor were two other big projects on JVM that were working on asynchronous event-based systems and one problem was common for all of them — the problem of flow-control also known as back-pressure. This led to a collaboration aiming to provide a standard set of interfaces for reactive data streams with support for back-pressure on JVM. It is called the Reactive Streams initiative, and Viktor Klang had published a great interview describing its history³. The Reactive Streams specification 1.0 was released in 2015.

Reactive Streams is an impressive piece of engineering. It brings asynchronous event streams with back-pressure to the JVM world, which otherwise lacks platform support for asynchrony. It is a purely library-based feat, introducing a number of contracts that must be meticulously followed. But herein lies its problem. It works flawlessly when you use well-known operators built by experts, but writing your own reactive stream operators that honor all those contracts is a non-trivial challenge.

Kotlin Flows

Kotlin programming language released coroutines in 2018 as a general language feature specifically aimed at asynchronous programming. The concept of suspension in Kotlin provides the natural solution for flow-control. Combine it with the visitor-based Rx-inspired cold streams and you get the idea behind Kotlin Flows.

From the onset of our work on Kotlin Flows the goal was to have a simple design. We explicitly wanted a design that made writing your own operators trivial given just a few basic building blocks. Want to delay every value for one second? No problem, using the basic flow builder and the collect function you can write:

fun <T> Flow<T>.delayASecond() = flow {
collect { value -> // collect from the original flow
delay(1000) // delay 1 second
emit(value) // emit value to the resulting flow
}
}

You would not see any code that explicitly handles back-pressure, because it automatically happens behind the scenes due to the support for suspension that is provided by Kotlin compiler.

Structured concurrency

Designing Kotlin Flows from scratch we also used opportunity to reduce some of the boiler-plate typically associated with reactive streams programming. For one, when subscribing to a reactive stream you end up holding a reference to a Subscription object that must be carefully managed if you want to be able to cancel this subscription or otherwise you risk leaking it. This is very similar to the problem that structured concurrency is solving and it was logical to design Flow so that you cannot accidentally leak a subscription.

Kotlin Flow does not have the concept of subscription at all. Suspension and light-weight coroutines come to the rescue. Flow collect operation is the closest analogy to a subscription, but it is just a suspending function call that is hard to leak or otherwise misuse due to the structured concurrency.

The suspension-based design of collect operation also abolishes the need to have a separate set of onError and onCompleted callbacks. Want to perform some operation on the normal completion of the flow? Just do it after the normal completion of collect:

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
// reemit all values from the original flow
collect { value -> emit(value) }
// this code runs only after the normal completion
action()
}

Hindsight

Being able to explore the existing code that uses reactive streams had given us tremendous hindsight during our design process. For example, we saw common code patterns that relate to the handling of execution context (a ubiquitous subscribeOn/observeOn pair) and had designed a consistent mechanism with only a single flowOn operator for that.

We also have the luxury of not having to implement all the conceivable operators in the core library. We can pick only the most popular and basic ones, while Kotlin’s support for extension functions combined with the simplicity of Flow design make user-defined operators easy to write and just as easy to use as built-in ones.

Integration

Kotlin Flows are still conceptually reactive streams. Even as they are suspension-based and do not implement the corresponding interfaces directly, they are designed in such a way as to make integration with systems based on reactive streams straightforward. We provide out-of-the-box flow.asPublisher() extension function to convert Flow to the reactive stream Publisher interface and publisher.asFlow() extension for the reverse conversion.

Status

Kotlin Flows was released as an experimental API in kotlinx.coroutines version 1.3.0-M1. It is the first milestone to the upcoming 1.3.0 major release that will stabilize core flow APIs and basic operators. Your feedback is especially valuable at this time, as it helps us nail down the shape and function of these APIs.


Further reading and footnotes

  1. ^ ReactiveX web site to learn more about Reactive Extensions and how they are different from other data-processing approaches.
  2. ^ Cold flows, hot channels to dive into the difference between hot and cold streams.
  3. ^ Reactive Stream 1.0.0 interview by Viktor Klang on the history and origins of Reactive Streams specification.
  4. ^ Reactive Streams web site to explore the challenge of asynchrony with back-pressure.
  5. ^ Kotlin coroutines specification to see the what use-cases Kotlin coroutines were designed to cover and how they are implemented.
  6. ^ Simple design of Kotlin Flow to understand the details of flow design.
  7. ^ Structured Concurrency to review the challenges of concurrency and how a structured approach provides a solution.
  8. ^ Execution context of Kotlin Flows to master the details of flowOn operator and a simple mental model for flow execution context.
  9. ^ Extension-oriented design to appreciate API design implications of Kotlin extension functions.