Coroutines Flow

Maciej Nowak
Fandom Engineering
Published in
10 min readDec 3, 2020
Photo by Martin Jernberg on Unsplash

This blogpost is a continuation of the previous post: Coroutines Basics, and covers more advanced mechanisms than basic coroutines have to offer. In this article, I’m going to present an idea of coroutine streams called Flow. Why and when do we need it? What can be achieved by using it? How to create and manage it? And how does it affect and cooperate with coroutines? I hope you will find all the answers below.

Multiple values problem

As far as we already know from the previous article, a suspending function can return a single value. What if there is a need to return multiple asynchronously calculated values? Let’s make it clear what multiple values actually are. They can be represented by any collection like List and also by Kotlin’s Sequence. So what is the problem? Why not just use the suspend function and return a collection? The answer is obvious: all the values are returned once as a single value wrapped in a collection when each of them is calculated.

fun handleCollection() {
val values = listOf(1, 2, 3)
values.forEach {
// do something
}
}
fun handleSequence() {
val values = sequenceOf(1, 2, 3)
values.forEach {
// do something
}
}
fun handleSuspendFun() = runBlocking {
val values = multipleValues()
// wait here to calculate all of them
values.forEach {
// do something
}
}
suspend fun multipleValues(): List<Int> {
// do some time consuming calculations
return listOf(1, 2, 3) // all of them at once
}

Flow to the rescue

Now Kotlin Flow comes in. Flows allow us to emit value one by one (just like streams) inside a coroutine. To declare a Flow, just create it by flow builder, emit values inside it, and collect values on Flow in a coroutine. Pay attention that Flows are cold streams which means that the code inside Flow doesn’t run until the Flow is collected, i.e. collect is called.

fun useFlow() = runBlocking<Unit> {
val notBlocked = launch {
repeat(3) {
delay(10)
}
}

val flow = flowValues()
// just get the flow, nothing has happened

flow.collect { value -> // now the flow runs
// handle each value
}
// collect is a suspend fun so the coroutine before
// is not blocked and work simultaneously with flow
val blocked = launch {
repeat(3) {
delay(10)
}
}
// but the coroutine after is blocked by collecting the flow
// and waits until collect completes
}
fun flowValues(): Flow<Int> = flow { // not suspend
for (value in 1..3) {
delay(10) // mock some computations
emit(value)
}
}

Builders

Flow can also be created by other builders like flowOf or converting collection into Flow by extension functions like asFlow.

fun flowBuilder() = flow {
emit(1)
emit(2)
emit(3)
}
fun flowOfBuilder() = flowOf(1, 2, 3)fun flowFromCollection() = listOf(1, 2, 3).asFlow()

Launching

Flows don’t seem to be a perfect solution if collecting values suspends a coroutine. There must be some way to use a Flow without suspending a coroutine, something like an event listener. And of course, there is! First, invoking emitted values can be done by onEach (that returns a Flow with given action) instead of invoking them in collect. Secondly, suspending collect can be replaced by launchIn, which actually creates a new coroutine.

fun invokeByOnEach() = runBlocking<Unit> {
flowValues()
.onEach { value ->
// do something here
}
.collect()
// still have to wait for collect completion
val blocked = launch {
repeat(3) {
delay(10)
}
}
}
fun launchFlow() = runBlocking<Unit> {
// create a new coroutine by launching it
val job = flowValues()
.onEach {
// do something here
}
.launchIn(this) // launch in CoroutineScope
// don't have to wait for collect completion
val notBlocked = launch {
repeat(3) {
delay(10)
}
}
}

Cancellation

Flow applies to the general cancellation rules of coroutines. So, it can be canceled when it’s suspended in a cancellable (in the terms of coroutines) block of code, like the suspend function. Also, it’s worth to notice that the flow builder performs auto cancellation checks called ensureActive on each emitted value. However, most other flow builders don’t do auto cancellation checks, but it’s possible to explicitly check for cancellation by using a cancellable operator.

fun cancelFlow() = runBlocking<Unit> {
val job = flowValues()
.onEach {
// do something here
}
.launchIn(this)
delay(10)
job.cancel()
// it's Job so can be cancelled by cancel function
withTimeoutOrNull(10) {
flowValues().collect {
// do something here
}
}
// set timeout like we do for coroutines also
}
fun cancelDifferentBuilders() = runBlocking<Unit> {
flowValues().collect { value ->
if (value == 2) {
cancel()
}
// do something
}
// flow was cancelled because of ensureActive embedded
// in flow builder, collected elements: 1, 2
listOf(1, 2, 3).asFlow().collect { value ->
if (value == 2) {
cancel()
}
// do something
}
// flow wasn't cancelled, collected elements: 1, 2, 3
listOf(1, 2, 3).asFlow()
// a shortcut for
// .onEach { currentCoroutineContext().ensureActive() }
.cancellable()
.collect { value ->
if (value == 2) {
cancel()
}
// do something
}
// flow was cancelled because of cancellable
// collected elements: 1, 2
}

When Flow completes because of cancellation, a threw exception, or just finishes the work, it may need to run some action. This can be done in an imperative way by try — finally block or declarative way by onCompletion operator.

fun completeFlowImperative() = runBlocking<Unit> {
try {
flowValues().collect {
// some work
}
} finally {
// complete the work
}
}
fun completeFlowDeclarative() = runBlocking<Unit> {
flowValues()
.onCompletion {
// complete the work
}
.collect {
// some work
}
}

Context

Flow has a property called context preservation, which means that it’s always collected in the context of the calling coroutine, so code inside flow builder runs in the collector’s context. It’s not hard to imagine that there could be a need to switch the context inside a Flow, but you cannot emit values from a different context because of the flow’s context preservation. The only way to change the context of a Flow is using flowOn operator, which creates a new coroutine if the context has changed. It breaks the sequential nature of the Flow and collection takes place in one coroutine while emission happens in another one.

fun flowWithContext() = flow {
withContext(Dispatchers.IO) {
repeat(3) { value ->
emit(value)
}
}
}
fun withContextThrowsException() = runBlocking<Unit> {
flowWithContext().collect {
// can't collect because exception is thrown
}
}
fun changeContextByFlowOn() = runBlocking<Unit> {
flowValues()
.flowOn(Dispatchers.IO) // emit on IO thread
.collect {
// some work on coroutine's context
}
}

Operators

Collection of a Flow is performed sequentially by default, it means that each emitted value is invoked from upstream to downstream by every intermediate operator and finally delivered to the terminal operator. Flows, like collections, can be transformed with many different operators. Some of them called intermediate are cold (not suspended); they apply to an upstream Flow and return a downstream Flow. They are not suspending functions themselves, but they are able to call suspending functions inside. The most popular ones are e.g. map, filter, transform, or size limiting take.

fun filterAndMapFlow() = runBlocking<Unit> {
flowValues() // 1, 2, 3
.filter {
it % 2 == 1
}
.map {
mapValue(it) // can run suspend fun inside
}
.collect {
// collect following values:
// value = 1
// value = 3
}
}
fun takeAndTransformFlow() = runBlocking<Unit> {
flowValues() // 1, 2, 3
.take(2) // take two values, ignore the rest of them
.transform { value ->
emit("Start emitting $value")
emit(mapValue(value))
}
.collect {
// collect following values:
// Start emitting 1
// value = 1
// Start emitting 2
// value = 2
}
}
suspend fun mapValue(value: Int): String {
delay(10) // mock time consuming work
return "value = $value"
}

Another type of operator is called terminal. These are suspending functions that start a collection of the Flow. As you could have noticed, we used one of them (collect) many times before, but there are others e.g. toList, first, single, reduce, etc.

fun flowTerminalOperators() = runBlocking<Unit> {
val values = flowValues() // 1, 2, 3
val list = values.toList() // opposite to List.asFlow()
// result [1, 2, 3]
val first = values.first() // get first and cancel flow's collection
// result 1
val reduce = values.reduce { a, b -> a + b }
// result 6
}

Buffering

Sometimes, running different parts of a Flow in different coroutines can be helpful, especially when asynchronous time-consuming operations are involved. Imagine that emission is slower than collecting, so there could be a queue of emitted values and a bottleneck may arise. But, what if emission and collecting don’t have to run synchronously? Flows have a buffering mechanism built-in, which allows running emission concurrently with collecting. There are some operators for that, like buffer, conflate or collectLatest. They work in different ways, so the use of them depends on expected behavior.

// the work must be done synchronously
fun synchronousCollecting() = runBlocking {
// flowValues emits each values every 10ms
flowValues() // wait for collect complete to emit next one
.collect { // wait for emission complete to collect next one
delay(25)
// consume emitted value
}

// the whole work takes about 105ms
// 10ms - emit(1), 35ms - collect(1)
// 45ms - emit(2), 70ms - collect(2)
// 80ms - emit(3), 105ms - collect(3)
}
// the work can be done asynchronously
fun bufferOperator() = runBlocking {
flowValues()
.buffer() // emit values ASAP, don't wait for collecting
.collect { // wait for the first emission to start
delay(25)
// consume emitted value
}

// the whole work takes about 85ms
// 10ms - emit(1), 20ms - emit(2), 30ms - emit(3)
// 35ms - collect(1), 60ms - collect(2), 85ms - collect(3)
}
// a flow represents a part or state of the work
// so only the newest one should be invoked
fun conflateOperator() = runBlocking {
flowValues()
.conflate() // conflate emission
.collect { // process only the recent emission
delay(25)
// consume emitted value
}

// the whole work takes about 60ms
// 10ms - emit(1), start collect(1)
// 20ms - emit(2)
// 30ms - emit(3)
// 35ms - complete collect(1), start collect(3)
// 60ms - complete collect(3)
}
// similar to conflate
// but cancel collecting instead of dropping emitted values
fun collectLatestOperator() = runBlocking {
flowValues()
.collectLatest { // cancel current consuming on new emission
delay(25)
// consume emitted value
}

// the whole work takes about 55ms
// 10ms - emit(1), start collect(1)
// 20ms - emit(2), cancel collect(1), start collect(2)
// 30ms - emit(3), cancel collect(2), start collect(3)
// 55ms - complete collect(3)
}

Composing and flattening

Some of the problems require advanced solutions and can’t be done by a single Flow. The final result could depend on composing or flattening multiple Flows. But don’t worry, Flows also contain operators for that case, similar to collection’s operators, e.g. zip, combine, or flatMap family. Let’s look at them closer.

// combine each values from different flows one to one
fun zipFlows() = runBlocking {
val dictionary = flowOf("st", "nd", "rd", "th")
flowValues() // 1, 2, 3
.zip(dictionary) { number, word ->
"$number$word"
}
.collect {
// collected values:
// 1st
// 2nd
// 3rd
}
// the fourth value from dictionary was ignored
// because flowValues emitted only 3
}
// combine the latest values from each flow
fun combineFlows() = runBlocking {
val dictionary =
flowOf("player1", "player2", "player3").onEach {
delay(25)
}
flowValues() // delay 10ms before emit
.combine(dictionary) { number, word ->
"$number is $word"
}
.collect {
// collected values:
// 2 is player1 (25ms)
// 3 is player1 (30ms)
// 3 is player2 (50ms)
// 3 is player3 (75ms)
}
}
// concat one to one
// external flow waits for completion of internal flow
fun flatMapConcatFlows() = runBlocking {
flowValues()
.flatMapConcat { value ->
subworkFlow(value)
}
.collect {
// collected values
// First part of 1 task (10ms)
// Second part of 1 task (35ms)
// First part of 2 task (45ms)
// Second part of 2 task (70ms)
// First part of 3 task (80ms)
// Second part of 3 task (105ms)
}
}
// collect and merge flows concurrently ASAP
fun flatMapMergeFlows() = runBlocking {
flowValues()
.flatMapMerge { value ->
// cancel current internal consuming
// on new external emission
subworkFlow(value)
}
.collect {
// collected values
// First part of 1 task (10ms)
// First part of 2 task (20ms)
// First part of 3 task (30ms)
// Second part of 1 task (35ms)
// Second part of 2 task (45ms)
// Second part of 2 task (55ms)
}
}
// merge only the latests values from both flows
fun flatMapLatestFlows() = runBlocking {
flowValues()
.flatMapLatest { value ->
// cancel current internal consuming
// on new external emission
subworkFlow(value)
}
.collect {
// collected values
// First part of 1 task (10ms)
// First part of 2 task (20ms)
// First part of 3 task (30ms)
// Second part of 3 task (25ms)
}
}
fun subworkFlow(value: Int) = flow {
emit("First part of $value task")
delay(25)
emit("Second part of $value task")
}

Exception

Flows should be transparent to exceptions which means that emitting values from try — catch block break the rule. This transparency allows a collector to catch an exception using try — catch. But it doesn’t mean that catching exceptions from emitters is impossible. To do that, simply just use the catch operator and specify a behavior depending on which exception was caught. Pay attention that this operator is intermediate — it applies to an upstream Flow, so a threw exception in downstream Flow isn’t caught by this operator. Even caught exceptions cancel a Flow.

fun catchExceptionFromEmitter() = runBlocking<Unit> {
flow {
emit(1)
emit(2)
throw NullPointerException()
emit(3) // won't be emitted
}
.catch { exception ->
// do something like throw, emit or just ignore
}
.collect {
// some work
}
// NPE was caught by catch block
}
fun tryCatchExceptionFromEmitter() = runBlocking<Unit> {
flow {
try {
emit(1)
emit(2)
throw NullPointerException()
emit(3) // won't be emitted
} catch(e: Exception) {
// NPE is caught here but it breaks the transparency
}
}
.catch { exception ->
// it knows nothing about the exception thrown
}
.collect {
// some work
}
// NPE was caught by try - catch block
}
fun notCatchExceptionFromCollector() = runBlocking<Unit> {
flowValues()
.catch { exception ->
// do something like throw, emit or just ignore
}
.collect { value ->
check(value <= 2) { }
// some work
}
// an exception wasn't caught by catch block
// because it happened downstream
}
fun catchExceptionFromCollector() = runBlocking<Unit> {
flowValues()
.onEach { value ->
check(value <= 2) { }
// some work
}
.catch { exception ->
// do something like throw, emit or just ignore
}
.collect()
// an exception was caught by catch block
// because it happened upstream
}

Conclusion

In this blogpost, I have presented the concept of coroutines native streams called Flow. I have demonstrated what could be hidden in a stream definition and why it’s so important to have it in coroutines. Flow makes code more interactive and live, there is no doubt. Streams can be divided into two types: cold ones and hot ones. As was mentioned before, Flows are cold streams. So what about hot streams? Well, it looks like there is still more to explore. Stay tuned!

Originally published at https://dev.fandom.com.

--

--