Introduction To Kotlin Flows

Irem Dogan
Delivery Hero Tech Hub
7 min readFeb 11, 2022

Android development, which I have been working on since the beginning of my college life, has always seemed like a huge world that is developing and changing very fast. I wanted to write an introductory article about Kotlin Flows, which has been used frequently lately and which I think will come up more often. Enjoy reading :)

Table of Contents

Flow Builders
Flow Operators
Using Flows in UI

To give a more detailed definition, flow is a Kotlin language feature that can emit multiple values sequentially over some time. For example, you can use a flow to fetch the latest updates from an API and modify them with flow operators. Before diving deeper into what you can do with flows, making small mentions about the hot/cold streams and flow builders will help you understand the flow logic.

Hot streams transmit data even if there is no subscriber when the data arrives. In cold streams, the incoming data is transmitted only if there is a subscriber. For example, let’s say you are fetching data from an API. You can do this by using channels. Channels are an example of hot stream. However, there is a point to be noted here; since there will always be data flow in hot streams, it is necessary to keep the subscribers under control. Because it can cause data losses (if the subscriber is forgotten) or even memory leaks (open network connection etc). On the other hand, cold streams start pushing values only when you start collecting. Kotlin flow is an implementation of cold stream, powered by Kotlin Coroutines. And because of Kotlin Coroutines, when you cancel the scope, you also release any running coroutines so the same rules apply to Kotlin flow as well. When you cancel the scope, you also dispose of the flow. You don’t have to free up memory manually.

Flow Builders

There are several different flow builders for creating flows. The flow{} builder is the most basic one. With this builder, you create a flow from the suspendable lambda block. Inside the block, you are able to do many operations.

You can use flowOf() to create a Flow from a fixed set of values.

Or you can convert various collections and sequences to a Flow using asFlow() builder.

Flow Operators

Flows can be transformed with operators, just like collections or sequences. There are two types of operators available inside the flow; intermediate and terminal.

Intermediate operators are used to modifying the data flow between the producer and the consumer in order to meet the requirements of the following layer.

In the example above, each operator creates a new flow according to its functionality. We use the filter() operator to get the numbers we need and the map() operator to transform the data to uiModel which is a better abstraction for this layer of the app. The catch() operator catches exceptions that could happen while processing items in the upstream flow. The upstream flow refers to the flow produced by the producer block and those operators called before the current one. Similarly, we can refer to everything that happens after the current operator as the downstream flow. catch() can also re-throw the exception if needed or emit new values.

Collecting flows usually happen from the UI layer as it is where we want to display the data on the screen. We need to use a terminal operator (collect, reduce, launchIn etc.) to start listening for values. To get all the values in the stream as they are emitted, we use the collect() operator. It takes a function as a parameter that is called on every new value and it is a suspend function that needs to be executed within a coroutine.

When you apply a terminal operator to a flow, the flow is created on-demand and starts emitting values. On the contrary, intermediate operators just set up a chain of operations that are executed lazily when an item is emitted into the flow. Every time collect() is called on user messages, a new flow or pipe will be created and its producer block starts refreshing the messages from the API at its own interval. We refer to this type of flows as cold flows as they are created on-demand and emit data only when they are being observed.

The above example uses the transform() and take() operators. We used the transform() operator to bring the flow to the desired form, and with the take() operator, we took the first two of the 5 numbers and printed them on the screen.

The zip() and combine() operators combine the corresponding values of the two streams. Let’s look at the example above; numbers variable is updated every 300ms while numberTexts variable is updated every 400ms. Compressing them using the zip() operator will still produce the same result, even if the results are printed every 400ms but combine() merges the most recently emitted values by each flow.

In this example, other operators that we encounter frequently are used. We return a repeating list from 1 to 10 as flow. The onStart() is where it is entered before the flow is collected and the onCompletion() is the place entered when the flow is completed or canceled. Each time a new value arrives, it is entered into onEach(). With distinctUntilChanged() we filter out all subsequent occurrences of the same values, and with takeWhile() we pass the values to the next operator until the given condition is met. You can go to the link here to see it in more detail :)

Using Flows In UI

There are two major points to consider when using flows in UI. The first is avoiding wasting resources when the program is running in the background and the second is about changing the configuration.

When the flow isn’t visible on the screen, the UI should stop collecting data from it. There are several options for doing so, all of which are aware of the UI lifecycle. Live data or lifecycle coroutine specific APIs like repeatOnLifecycle() and flowWithLifecycle() can be used. The asLiveData() operator turns the flow into live data that only observes items while the UI is displayed on the screen. We can perform this conversion in the view model class like the code below.

However, it is not required. repeatOnLifecycle() is recommended way to collect flows from the UI layer. It is a suspend function that takes Lifecycle.State as a parameter and also it is lifecycle aware so it automatically launches a new coroutine with the block passed to it when that lifecycle reaches a specific given state. The ongoing coroutine is then canceled when the lifecycle falls below that state. As we are in the context of a coroutine, we can call collect() inside the block. repeatOnLifecycle() must be called in a coroutine because it is a suspending function. The best practice is to call this function when the lifecycle is initialized.

Something important to notice is that the coroutine that calls repeatOnLifecycle() won’t resume executing until the lifecycle is destroyed. If you need to collect numerous flows, use launch inside the repeatOnLifecycle() block to generate multiple coroutines.

When there is only one flow to collect, you can use the flowWithLifecycle() operator instead of repeatOnLifecycle(). When the lifecycle enters and exits the target state, this API emits items and cancels the underlying producer.

To mention an important point, if you collect directly from lifecycleScope.launch, the activity keeps receiving flow updates while in the background. To solve this issue, we can manually start collecting in onStart() and stop collecting in onStop() but using repeatOnLifecycle() already removes all that boilerplate code for us. launchWhenStarted is also better to launch since it suspends flow collecting while the app is running in the background.

This technique, on the other hand, keeps the flow producer running, potentially emitting things in the background that will load the memory with data that will not be displayed on the screen. Because the UI has no idea how the flow producer is constructed, it’s always wiser to play it safe and use repeatOnLifecycle() or flowWithLifecycle() to prevent collecting things and keeping the flow producer active while the UI is in the background.

While using flows on the UI side, we said that one of the two important things is configuration changes. For example, screen rotation or an update may cause a configuration change. In such use cases, we should be able to protect the current data with a buffer and share it with other collectors regardless of which lifecycle it is in. We can achieve this by using StateFlow. Even if there are no collectors, it stores the data. It’s safe to utilize with activities or fragments because you can collect multiple times from it. You could use the mutable version of stateflow to change the value at any time but that isn’t very responsive. Instead, any flow can be converted to a stateflow. The stateflow receives all updates from the upstream flows and saves the most recent value, and it can contain zero or more collectors. As a result, this is ideal for viewmodels.

You can use the stateIn() operator to convert a flow to a stateflow.
It has three parameters: initialValue since a stateflow must always have a value, scope which controls when the sharing begins, and started which takes the value of how much time is required before upstream flows are killed.

Let’s take a few examples to make it more clear. Consider the scenarios of rotation and navigation to the home screen. The activity is briefly removed and then regenerated when the screen is rotated. We don’t want to restore any flows in this scenario because we want to complete the changeover as quickly as possible. However, in order to save battery and other resources, we aim to stop all flows on the way back home screen. We set a timeout using the parameter started to determine which one is which. We don’t immediately stop all upstream flows when a stateflow stops being collected. Instead, we wait a set amount of time, such as 5 seconds. No upstream flows are canceled if the flow is collected again before the timeout. That is precisely what the WhileSubscribed() configuration achieves. When the view is turned off, it takes five seconds for the upstream flows to be turned off and if the timeout expires, the upstream flows are terminated.

We have come to the end of my article. I tried to give some detailed basic information about Kotlin Flows. I hope you had a good time reading it :)

--

--