From Zero To Hero Series ( Kotlin Sequences, Channels, Flows ) — Part 3 — Flows

ömer iyiöz
DigiGeek
Published in
23 min readJul 5, 2021

--

We will learn Kotlin flows with 29 examples in this lesson. This will be a long journey. 🏄‍♂️

I hear, you say to me “Talk is cheap, show me the code”. Then let me start flowing 🌊🌊🌊 !

Flow Example 1

To get all the values in the stream as they're emitted, use collect().

Look at the following example.

From this example, we understand the followings :

  • When we called flow{ } ,
flow {
// the code inside the flow{} is not called.
}
  • When we called flow.collect{} ,
flow {
// the code inside the flow{} is called.
}

In other words, we understand that “flows are cold streams”.

Why we need runBlocking{} in this example ?

Since collect() is a suspending function, it can only be called from a coroutine context. runBlocking{ } provides us a coroutine context, thus we used it in this example.

In this example, we used collect() . collect() is the most basic terminal operator. There are also other terminal operators available.

Read the timeline which is shown in the above screenshot from top to below.

  • Collector invokes the emitter.
  • Emitter emits value.
  • Collector’s lambda method runs.
  • Emitter emits value.
  • Collector’s lambda method runs.
  • Since emission ends in emitter, emitter says to collector “i’m finished”.

Note : Emitter does not start until collect(or any other terminal operator) of collector runs.

As you understand, collector functions calling back and forth. In other words,

  • one emission occurs and collector’s lambda is called ,
  • then one emission occurs and collector’s lambda is called,
  • then one emission occurs and collector’s lambda is called,
  • and so on..

BackPressure

Backpressure is a term in reactive streams which describes the behavior when consumer is not consuming events that fast as producer produces them.

Collector and emitter can be asynchronous.

While collector is suspended waiting for something, e.g. delays 100 miliseconds as in the above example, emitter waits and does not emit anything.

In above picture, each person waiting in the queue represents an emission. If our collector is slow, it slows down the emitter. Thus, each emission wait its turn. There is no special magic to implement this. It just happens automatically because of the nature of suspending function and the fact that our function calls back and forth.

Flow Example 2

In this example we launched a coroutine and flow. launch() and collect() not block eachother. Since collect is a suspending function, println(“end”) is not called until collect() function ends.

Flow Example 3 — collect() and withTimeoutOrNull()

In this example, we collect the stream inside withTimeoutOrNull(). Only two values is emitted and timeout occurs. Thus 3rd value is not emitted in this sample.

Flow example 4 — asFlow()

Various collections and sequences can be converted to flows using asFlow() extension functions.

Flow example 5 — Intermediate Flow Operators

Flows can be transformed with operators, just as you would with collections and sequences. Intermediate operators are applied to an upstream flow and return a downstream flow. These operators are cold, just like flows are. A call to such an operator is not a suspending function itself. It works quickly, returning the definition of a new transformed flow.

The important difference to sequences is that blocks of code inside these operators can call suspending functions.

Flow Example 6 — Transform operator

transform() operator is used to imitate simple transformations like map and filter, as well as implement more complex transformations. Using the transform operator, we can emit arbitrary values an arbitrary number of times.

Flow example 7 — Size limiting operators, such as take() function

Cancellation in coroutines is always performed by throwing an exception.

When the corresponding limit is reached, take() function cancels the execution of the flow . Thus coroutine throws an exception. If you want to handle exception, you can write try catch finally.

Flow example 8 — Terminal flow operator reduce

Remember that you must use a terminal operator to start a flow. Until now, we only used one terminal operator :

collect

Some of other terminal operators are the following :

first, single, reduce, fold

Look at the following example which uses reduce terminal operator. Let’s analyze how sum1 is found.

flow emits 1,2,3,4,5. reduce makes addition of 1 and 2 and gives the result of 3 as operand to the next addition operation. reduce makes addition of 3 and 3 and gives the result of 6 as operand to the next addition operation. reduce makes addition of 6 and 4 and gives the result of 10 as operand to the next addition operation. reduce makes addition of 10 and 5 and returns the result as 15.

Flow Example 9 — Flows are sequential 🦖

Each emission of a flow is performed sequentially unless special operators that operate on multiple flows are used. For example, in the below example, the order of emissions is 1,2,3,4,5.

Each emitted value is processed by all the intermediate operators from upstream to downstream and is then delivered to the terminal operator after.

Flow Example 10 — Flow context and context preservation 🚓

flow and collector function runs in the context which collect() function is called. This property of a flow is called context preservation.

In the below example, collect() function is called inside the main thread, so flow and collector function runs in the main thread too.

If you call collect() function inside IO dispatcher context, then flow and collector runs in IO dispatcher context.

If collect() is called in main thread context, then updateUI() is called in main thread too.

What if my collector is on the main thread, but my computation contains an intensive work thus flow should not be run on main thread. If so, i can call flowOn(Dispatchers.Default) operator so that it changes the execution context as background thread for the preceding code. However my collector’s lambda function still runs in the main thread.

Flow Example 11 — Wrong emission withContext

withContext() is called to change coroutine context.

In the following example, collect() is called in main thread. We called the withContext() to change the coroutine context to default dispatcher thread and try to emit a value inside dispatcher thread. Exception occurs in this case because the code in the flow { ... } builder has to honor the context preservation property and is not allowed to emit from a different context.

If you want to safely change the context of the flow emission, you should use flowOn function.

Flow Example 12 — Safely change the context of flow emission using flowOn() function

flowOn() function should be used to change the context of the flow emission.

In the following example, we explicitly specify the flow context by calling

flowOn(Dispatchers.Default)

Thus, flow runs in the default dispatcher context, although collection happens in the main thread.

The equivalent of flowOn() function is subscribeOn() function in RxJava.

Flow Example 13 — Buffer

In order to understand buffering, firstly we analyze the following flow exam which does not use buffering.

Flow example without buffering

In this example, Three integer value will be emitted inside flow. However, 1 second delay occurs before emitting. Furthmore, 5 second delay occurs in the collector. Lets analyze step by step:

  • When we called collect, flow starts to run. and 1 second delay occurs. After 1 second delay finishes, value 1 is emitted.
  • Collect is invoked with value 1, and 5 second delay occurs in collector. Since we do not use buffering, flow does not continue to work and waits for collector completes for value 1. After 5 second delay finishes, flow can continue to work for value 2. 1 second delay occurs.
  • After 1 second delay finishes, collect is invoked with value 2. 5 minute delay occurs and flow cannot continue and wait for collector completes for value 2 since we do not use buffering. After 5 second delay finishes, flow can continue to work for value 3. 1 second delay occurs.
  • After 1 second delay finishes, collect is invoked with value 3. 5 minute delay occurs and flow cannot continue and wait for collector completes for value 3 since we do not use buffering. After 5 second delay finishes, collector completes its work.

Now, let’s do an example using buffering.

Flow example with buffering

The only difference in this example from the previous one is that we called buffer() method.

What does buffer() method provide us? It provides us that flow and collector runs concurrently. For example if a delay occurs in collector, flow continue its work and does not wait collector.

In the first example which does not use buffering, flow and collector runs sequentially.

Lets analyze the example,

Value 1 is emitted inside flow. So Collector is invoked and collector is delayed for 5 second. Flow does not wait the finish of collector’s delay, thus value 2 is emitted. Collector is still in delay for value 1. Value 3 is emitted in flow, and flow completes. Collector is still in delay for value 1. As you see flow emits 3 value and does not wait completion of collector for each emission. After some time, collector finishes its work for value 1, then delays 5 second, finishes it work for value 2, then delays 5 second, finishes it work for value 3.

As you see, the emission of flows are processed by collector in the first in first out order.

Notice that, each emission is processed when you use buffer() method.

Flow Example 14 — Conflation

Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values if emisson does not start to be processed by collector, it collects last unprocessed emission. The emissions in progress does not affect from this, and continue until completion.

Flow example with using conflate()

In this example, flow 1 is emitted and collector starts to process this emission. Collector delays 5 second. Before the delay of the collector doesn’t finish, flow makes 2 emissions, and done its work. When collector process the emission for value 1, there is 2 emission waiting in the queue. Since we use conflation, collector drop the emission for value 2, and starts to process the emission for value 3.

To sum up, conflation is a type of buffering. However, collector does not process all emissions.

  • flow and collector runs concurrently.
  • If a collection collected an emission and started proccessing, it continues until completion.
  • When collector is available to work, if there are more than one emission which does not started to process in collector, only the latest emission processed. The old emissions which has not collected are dropped.

Flow Example 15 — collectLatest — Processing the latest value

Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values. The other way is to cancel a slow collector and restart it every time when a new value is emitted.

Flow example with using collectLatest

In this example,

  • when collectLatest is called, flow starts to run, delays 1 second, and emit value 1, then delays 1 second too.
  • Collector collects the emission for value 1 and delays 5 second.
  • Then flow emits value 2. At this point, collector does not complete processing for value 1, but since a new emission comes and we used collectLatest, collector cancels processing for value 1, collects value 2, starts to process it, delay 5 second.
  • Then flow emits again a new value 3. At this point, collector does not complete processing for value 2, but since a new emission comes and we used collectLatest, collector cancels processing for value 2, collects value 3, starts to process it, delay 5 second. Collector completes its work only for the last emission for value 3. Collector cancel its processing for the other two emissions for value 1 and 2.

To sum up, when flow emits a new value, if the collector is busy with processing an old emission it cancels processing of the old emission, collects the new emisson and starts to process it.

Summary of difference between buffer(), conflate(), collectLatest().

They all provides us that flow and collector runs concurrently.

buffer() :

  • All emissions made by flow is collected and processed until completion.

conflate() :

  • If a collection started processing an emission it continues until completion.
  • When collector is available to work, if there are more than one emission which does not started to process in collector, only the latest emission processed. The old emissions which has not collected yet are dropped.

collectLatest :

  • Assume that a collection started processing an emission. Before completion of the processing of this emission, if a new emission occurs, collector cancels the processing of current work, collects the latest value, and starts to process it.

Flow Example 16 — onEach

fun foo20(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
fun main() = runBlocking<Unit> { foo20()
.collect {
println(it)
}
}

We can write foo20() function shown in above code example, as the following using the onEach().

fun main() = runBlocking<Unit> {    val numbersFlow = (1..3).asFlow().onEach { delay(1000) }    numbersFlow
.collect {
println(it)
}
}

Output :

1
2
3

onEach function is called for each emission. There is no need to call emit() inside onEach. numbersFlow will emit 3 values, for each emission delays 1 second, then emit the value.

Composing multiple flows

There are lots of ways to compose multiple flows.

Flow Example 17 — zip

Flows have zip and combine operator that combines the corresponding values of two flows. There is an important difference between these two operator. Let me explain, firstly with zip() operator.

Just like the Sequence.zip extension function in the Kotlin standard library, flows have a zip operator that combines the corresponding values of two flows:

Another zip example

Zip operator waits for 2 flow to emit value to continue execution. In this example, 2nd flow is 40 times faster than 1st flow.

  • 1st flow delays execution 100 ms, then emit value 1, then suspended until second flow emits value. 2nd flow delays 4000 ms, in this time, zip operator and 1st flow do nothing, wait for emission of 2nd flow. 2nd flow emits value. At this point, since 2 flow has emitted value, zip can continue execution from now on. Thus, map function works.
  • 2nd flow delays execution 100 ms with value 2, then emit, then waits without doing nothing. 3rd from delays 4000ms, then emit value, At this point, since 2 flow has emitted value, zip can continue execution from now on. Thus, map function works.
  • The same steps occur for the 3rd iteration as i explained earlier.
The same zip() example with extra logs

Flow Example 18 — combine

Let’s explain combine() operator and we hopefully understand the difference from zip() operator.

Let’s explain the below example. The only difference from the previous zip() function example is the use of combine() function.

combine() example

In this example, 1st flow emits value, then 2nd flow delays 4 second. 1st flow does not need to wait the finish of 2nd flow’s delay. 1st flow can freely can continue execution. So 1st flow makes 2 more emissions. As you see, while 2nd flow is in delay, 1st flow can continue execution and makes 3 emissions. When 2nd flow’s delay finished, combine operator care about the last value emitted by two flow, 3 and one is printed. When 2nd flow emits again, 3 and two is printed.When 2nd flow emits again, 3 and three is printed.

combine() example with more logs

Another combine example

As you see in this example, combine works as the following:

  • When one flow makes an emission, if the other flow made an emission before, the last value emitted by each flow is combined and collected for each emission.

Flattening flows

In this example, we have a flow of three integers and called requestFlow23 function for emission of three integers.

Then we end up with a flow of flows (Flow<Flow<String>>) that needs to be flattened into a single flow for further processing. Collections and sequences have flatten and flatMap operators for this. However, due to the asynchronous nature of flows they call for different modes of flattening, as such, there is a family of flattening operators on flows.

Flow Example 19 — Flattening flows using flatMapConcat

Outer flow waits for the inner flow to complete before starting to collect the next one.

  • In outer flow, 100ms delay occurs, then value 1 is emitted. Inside flatMapConcat, we called requestFlow23() method, and 5 second delay occurs inside inner flow. Outer flow waits for the inner flow to complete for 5 second, then inner flow emits “1: Second” and completes.
  • In outer flow, 100ms delay occurs, then value 2 is emitted. In inner flow, “2: First” is emitted thus collect is invoked, then 5 second delay occurs, then emits “2: Second” thus collect is invoked, and completes.
  • In outer flow, 100ms delay occurs, then value 3 is emitted. In inner flow, “3: First” is emitted thus collect is invoked, then 5 second delay occurs, then emits “3: Second” thus collect is invoked, and completes.

As you see if you use flatMapConcat, Outer flow waits for the inner flow to complete before starting to emitting the next value.

Flow Example 20 — Flattening flows using flatMapMerge

Another flattening mode is to concurrently collect all the incoming flows and merge their values into a single flow so that values are emitted as soon as possible.

flatMapMerge() example
  • value 1 is emitted, and inner flow for value 1 delays 5 second.
  • Outer flow does not wait inner flow complete its work for value 1. Thus outer flow emits value 2 and 3.
  • When the inner flow completes after 5 second delay, inner flow works for value 2 and 3(first in first out order).

As you see if you use flatMapMerge(), outer flow does not wait for the inner flow to complete before starting to emitting the next value. In other words, outer flow and inner flows run concurrently. Collect catches the emissions.

Note :

flatMapMerge() accepts an optional concurrency parameter that limits the number of concurrent flows that are collected at the same time (it is equal to DEFAULT_CONCURRENCY(16) by default).

Note that the flatMapMerge calls its block of code ({ requestFlow(it) } in this example) sequentially, but collects the resulting flows concurrently, it is the equivalent of performing a sequential map { requestFlow(it) } first and then calling flattenMerge on the result.

Flow Example 21 — Flattening flows using flatMapLatest

flatMapLatest() has a similar behaviour with collectLatest() which we saw earlier. When you see xxxLatest operator, remember that previous flows are cancelled when new flow is emitted. By using flatMapLatest() operator, when outer flow emits a new value, if there is a flow in progress they all are cancelled. Only the latest emitted value is processed.

flatMapLatest() example

In this example, since outer flow does not wait the completion of inner flow. Value 1 is emitted and is in progress in inner flow.

  • Then value 2 is emitted, thus process of previous emission of value 1 is cancelled.
  • Then value 3 is emitted, thus process of previous emission of value 2 is cancelled. Only the emission with value 3 is completed inside inner flow.

To sum up, difference of the 3 Flattening flow operators is as the following :

flatMapConcat : Outer flow waits for the inner flow to complete before starting to collect the next one. Each emission of outer flow processed.

flatMapMerge : outer flow does not wait for the inner flow to complete before starting to emit the next value. Each emission of outer flow processed.

flatMapLatest : outer flow does not wait for the inner flow to complete before starting to emit the next value. At each emission, the processing of old emissions are cancelled. Only the last emission continues execution.

Flow Exceptions

Flow emitter, collector, or code inside the flow operators can throw exception. There are several ways to catch and handle the exceptions.

Flow Example 22 — Handling Exceptions — Strategy 1 — wrap your entire code inside try catch block.

We can write flow builder and collector directly inside try catch, as in the following. If any exception is thrown inside collector, flow builder, or flow operators, we can catch all of them using this strategy.

In the above example,

  • Flow builder emits value 1. Collector collected the value 1 and printed it. Then the value is checked. Since value≤1 is true, everything is OK, no exception is thrown, flow continue its work.
  • Flow emits value 2. Collector collected the value 2 and printed it. Then the value is checked. Since value≤1 is false, exception is thrown. Catch block catches the exception. Flow stops its execution, so no more value is emitted. The string value inside check block is printed when we print throwable inside catch method.

Using the same exception handling strategy, we can catch exceptions thrown inside flow emitter, as we said earlier.

Flow Example 23 — Handling Exceptions — Strategy 2 — catch() operator

Rather than, wrapping our flow builder and collector in try catch block, we can use catch operator.

The emitter can use a catch operator that preserves this exception transparency and allows encapsulation of its exception handling. The body of the catch operator can analyze an exception and react to it in different ways depending on which exception was caught:

  • Exceptions can be rethrown using throw.
  • Exceptions can be turned into emission of values using emit from the body of catch.
  • Exceptions can be ignored, logged, or processed by some other code.

In this example,

  • Flow builder emits value “1”, then the value is checked inside map operator. The value 1 mapped to the value “string 1”. This value is collected by collector and printed.
  • Flow builder emits value “2”, then the value is checked inside map operator. check operator throws exception, catch operator catches the exception, and emits a new value “Caught …”. Collector collects the value “Caught …” and printed.
  • Since catch operator catches an exception, flow builder stops execution and don’t emit anything from now on.

catch operator preserves exception transparency and allows encapsulation of its exception handling.

Another catch() operator example with wrong usage — catch operator catches only upstream exceptions, catch operator escapes downstream exceptions

🔥🔥🔥The catch intermediate operator, honoring exception transparency, catches only upstream exceptions (that is an exception from all the operators above catch, but not below it). If the block in collect { ... } (placed below catch) throws an exception then it escapes, as in the following example.

🦖 In this example, when flow emits value 2, collector collects it, then check operator throws exception. Catch operator cannot catch the exception because catch operator can catch only upstream exceptions and exception occurred in this example in the downstream of catch operator. In other words, since catch is called before collect(), catch operator cannot catch exceptions occurred in collector.

Another catch operator example which solves the previous non-catchable exception problem — Catch declaratively

We can combine the declarative nature of the catch operator with a desire to handle all the exceptions, by moving the body of the collect operator into onEach and putting it before the catch operator. Collection of this flow must be triggered by a call to collect() without parameters:

As you see in this example, we moved the collector body to inside of onEach operator. We write onEach operator before catch operator. By using this way, catch operator can catch all exceptions because exception cannot be occur inside collector, and we write all collector codes inside onEach operator.

When value 2 is emitted, exception is thrown from onEach operator. Catch operator catches the exception. Flow builder stops execution.️ Collector cannot collect the value 2. 👈👈🤹‍♂️️️️️️🤹‍ 🤹‍♂️️️️️️🤹‍

Flow completion

When flow collection completes (normally or exceptionally) it may need to execute an action. As you may have already noticed, it can be done in two ways: imperative or declarative.

Flow Example 24.1 — Imperative finally block example, try catch finally

In addition to try/ catch, a collector can also use a finally block to execute an action upon collect completion.

🦖As you guess, after flow completes its job, or after catch operator catches an exception, finally block is invoked.

In the following example, after flow completes its job, finally block is invoked.

imperative finally block example 1

In the following example, after catch block catches an exception, finally block is invoked.

imperative finally block example 2

Flow example 24.2 — Declarative completion example — onCompletion operator

In the following example, after flow completes its job, onCompletion operator is invoked. In other words, flow builder emits all its emissions, collector collected all emissions. No exception occurred. Then onCompletion() is invoked.

In the following example, flow builder emits value 2, then check operator throws an exception. Then catch block catches that exception. Then onCompletion operator is invoked with nonnull parameter.

No need to say that you can call onEach operator just after the flow builder as in the below example. The below and above example is the same. Keep that in mind.

Note that, in these examples, we see only check operator throw exception. However, anything can throw exception, for example an exception can be thrown inside collector, flow builder, or intermediate flow operators, as the following,

throw RuntimeException()

🔥 Question :

When onCompletion operator is invoked, how do you understand if the flow completed normally, or an exception occurred ?

🔥 Answer :

By checking the parameter of onCompletion operator. If it is null, you understand that flow completed normally. If it is not null, an exception occurred, then catch operator is invoked, then onCompletion block is invoked.

Flow Example 25 — An important difference between onCompletion and onCatch operator

onCompletion sees all exceptions, whether the exception occurs in the upstream, or downstream. It does not matter. However, as we said earlier, catch operator catches only upstream operator. You can see that from the below example.

onCompletion() catches all exceptions. catch() catches only upstream exceptions.

Flow Example 26 — collect vs launchIn

You are going very well. We come to the last of the flow article. We learned that collect() is a terminal operator and used many times in our previous examples. However, i have not mentioned about one important thing about collect terminal operator yet. Guess that ?

Ok ok i’m saying that :

Collect terminal operator is a suspending function so if your collector or emitter is slow, the next lines of codes are waiting for your collector to finish as in the following and all of our previous examples:

Collect terminal operator is a suspending function

In the above example, in order to run the following

println("Done")

we must wait for 30 seconds. Whaaat ! If you have such a situation, and do not want to collector to block the execution of the next line, you can use the launchIn terminal operator. Below is a screenshot from its definition.

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html

As you see, launchIn works as fire and forget rather than suspending. launchIn() takes CoroutineScope as a parameter.

The following two examples are the same.

In the below example which use launchIn(), this scope comes from the runBlocking coroutine builder. Firstly runBlocking completes, then the coroutine launched inside runBlocking() starts to run.

Both examples print “Done” firstly because launchIn() is a shorthand for

scope.launch{
flow.collect()
}

Note that launchIn also returns a Job, which can be used to cancel the corresponding flow collection coroutine only without cancelling the whole scope or to join it.

Flow Example 27 — Cancelling flow emission

For convenience, the flow builder

flow {
...

}

performs additional ensureActive checks for cancellation on each emitted value.

In the following example, when value 3 is emitted, we cancel the flow emission. As you see, collector for value 3 continue to work and completed normally. Also flow builder continue normally until it comes to emission. When the flow builder emits, JobCancellationException occurs.

Flow Example 28 — Many flow operators don’t do cancellation check for performance reasons

Checking whether the flow is cancelled comes with a cost . Thus, for performance reasons, many flow operators don’t do cancellation check.

For example, if you use IntRange.asFlow extension to write the same busy loop and don’t suspend anywhere, then there are no checks for cancellation. All numbers from 1 to 5 are collected and cancellation gets detected only before return from runBlocking, in the following example.

asFlow operator is not cancellable by default

Flow Example 29 — How to do cancellation check when using flow operators which don’t do cancellation check

Say that you want to use a flow operator(for example, asFlow operator) which doesn’t do cancellation check but you want to do cancellation check. There are 2 different ways to do it.

1st way: asFlow() doesn’t do cancellation check. Before each emission, checking ensureActive() makes asFlow() cancellable.

Make asFlow operator cancellable manually — way 1

2nd way : For example, asFlow() doesn’t do cancellation check. asFlow().cancellable() makes asFlow() cancellable.

Make asFlow operator cancellable manually — way 2

In this article, i used the example and quotations mainly from the following links. I added extra explanatory comments .

https://kotlinlang.org/docs/flow.html#flow-and-reactive-streams

https://github.com/Kotlin/kotlinx.coroutines/tree/master/kotlinx-coroutines-core/jvm/test/guide

Flow is conceptually a reactive stream. Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in Reactive Streams and Kotlin Flows article.

In my article, i got help from the following links. Check these links for more information.

KotlinConf 2019: Asynchronous Data Streams with Kotlin Flow by Roman Elizarov

That’s all for now.

If you liked, please clap and share.

Happy flow coding!

--

--