Using Kotlin Flows

Kayvan Kaseb
Software Development
10 min readDec 22, 2020
The picture is provided by Unsplash

Nowadays, Kotlin Coroutines introduce an advanced and efficient approach of concurrency design pattern, which can be used on Android to simplify asynchronous codes. Moreover, a suspending function asynchronously returns a single value, but the critical question is how can we return multiple asynchronously computed values? In Kotlin coroutines, a flow is a type that can be able to emit multiple values sequentially like receiving live updates from a database, as opposed to suspend functions that return just only a single value. This article aims to discuss the main concepts, entities, and async possibilities in Kotlin flow.

Introduction and Overview

As you know, multiple values can be represented in Kotlin by using collections. For instance, we can have a sample function, which returns a List of five numbers and print them all using forEach as follows:

fun sample(): List<Int> = listOf(1, 2, 3, 4, 5)

Moreover, if we want to compute the numbers with some CPU-consuming blocking code, we can be able to show the numbers using a Sequence. Nevertheless, this computation blocks the main thread, which is running the code. So, if these values are computed by asynchronous code, we can specify the sample function with a suspend modifier as follows:

suspend fun sample(): List<Int> {
delay(1000)
return listOf(1, 2, 3, 4, 5)
}
fun main() = runBlocking<Unit> {
sample().forEach { value -> println(value) }
}

Now, the point is that how could you create a stream of data or return multiple results over time? In fact, in Kotlin coroutines, a flow is a type that can emit multiple values sequentially, as opposed to suspend functions that return just only a single value. For example, we can utilize a Flow<Int> type just like we would use the Sequence<Int> type for synchronously computed values that it was mentioned:

fun sample(): Flow<Int> = flow { 
for (i in 1..5) {
delay(200)
emit(i)
}
}
fun main() = runBlocking<Unit> {
launch {
for (k in 1..5) {
println("...")
delay(200)
}
}
sample().collect { value -> println(value) }
}

In coroutines, a flow is a type that can emit multiple values sequentially, as opposed to suspend functions that return only a single value. For example, you can use a flow to receive live updates from a database. Flows are built on top of coroutines and can provide multiple values. A flow is conceptually a stream of data that can be computed asynchronously. The emitted values must be of the same type. For example, a Flow<Int> is a flow that emits integer values.

The following sections of this essay will consider the main concepts, entities, and async possibilities in Kotlin flow based on Google documents.

Flows and suspend function

In Kotlin coroutines, a flow is a type that can emit multiple values sequentially, as opposed to suspend functions that return just only a single value. For instance, you can use a flow to receive live updates from a database. We have the suspend function loadData, which returns an object of type Data as follows:

suspend fun loadData(): Data

Also, in the UI layer, we can be able to call that function from a coroutine, as an example, using a UI scope as follows:

uiScope.launch {  val data = loadData()  updateUi(data)}

When loadData returns, we update the UI with its result. Therefore, this is a one-shot call. You call a suspend function, you obtain the result back, but a flow is a different approach practically. Flow is built on top of Kotlin coroutines. Besides, as a Flow can emit multiple values sequentially, it is conceptually a stream of data whose values can be computed asynchronously. The emitted values have to be of the same type. In the below example, we have a function that returns a Flow of type data. This means flow is able to emit data objects. In fact, this function is no longer a suspend function.

fun dataStream(): Flow<Data>

Another point is that to trigger the Flow, we call collect that is a suspend function. Thus, it requires to be called from a coroutine. In the lambda, we mark what to do when we receive an element from the flow. When a new element is emitted to the flow, updateUi will be called with the new value. New values will be processed until there are no more elements to emit to the flow, or the UI disappears.

uiScope.launch {   dataStream().collect {   updateUi(it)   }}

The main entities in flows

As you noticed, what we have seen is a stream of data. In short, there are three entities involved in this issue. The producer produce data that is added to the stream. In addition, flows can produce data asynchronously. The consumer consumes the values from the stream. As you observed in the previous example, updateUi was the consumer. However, there can also be intermediaries, which can modify each value emitted into the stream of the stream itself. For instance, an intermediary can be able to change the type of emitted elements. As a result, the consumer will receive a various type in this case. In Android, a data source or repository is generally a producer of UI data, which has the ViewModel, or View, as the consumer. Otherwise, the view layer is a producer of user input events, and other layers of the hierarchy consume them as well. Initially, layers between the producer and consumer typically act as intermediaries, which modify the stream of data to adjust it to the requirements of the following layer.

The main entities in flows data streaming , the picture is provided by Google documents

Creating a flow

Basically, the builder function Flow creates a new flow where you can manually emit new values into a stream of data using the Emit function. For example, an Android app that fetches the latest news periodically. In this case, NewsRemoteDataSource has their property latestNews that returns a flow of lists ArticleHeadline. Because a single suspend function cannot return multiple consecutive values, the data source requires to create and return a flow in order to fulfill the requirement. In a word, this uses to notify all the latest news every occasionally. In this example, the data source acts as the producer. NewsRemoteDataSource takes NewsApi as a dependency that is the class that eventually makes the network request exposing a suspend function.

class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews)
delay(refreshIntervalMs)
}
}
}

interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}

So, we will use the builder function Flow to create a new flow. As the block of code will be executed in the context of a coroutine, it can call suspend functions. To repeatedly make network requests, we should create a while loop. Also, we call the API to get the latest news inside the loop. Thus, we call emit to add that object to the flow.

As a matter of fact, flows are sequential. Since the producer is in a coroutine, when you call a suspend function, the producer suspends until the suspend function returns. In this situation, the coroutine suspends until fetchLatestNews return the response. So, just only emit is called. To make requests on a fixed interval, we can call delay with a refresh interval passed as a parameter. Delay is a suspend function, which suspends the coroutine for a while. After finishing that period of time, the coroutine will resume, and another iteration in the while loop will occur. The significant point is that by using Flow Coroutine builder the producer cannot emit values from a different coroutine context. As a result, you should not call emit in your coroutines or in with context blocks of code. For addressing this issue, you can be able to use other Flow builders like Callback Flow.

Modifying the stream and intermediate operators

Fundamentally, intermediaries can use intermediate operators to modify the system of data without consuming the values. When these operators apply to a stream of data, set up a change of operations that are not executed until the values are consumed in the future. In the previous example, we have the NewsRepository. In its constructor, it takes the NewsRemoteDataSource class, as well as UserData to know more information about the logged in user. This exposes this favouriteLatestNews property of type Flow of a list ArticleHeadline. Now, we want to expose just the news articles that are relevant to the user. This function accesses the latestNews property from newsRemoteDataSource. Afterwards, it applies the map operator to modify the stream of data. We use filter on the list that has been emitted to select those articles whose topic the user is in favor of. The transformation performs on the Flow. Therefore, the consumer will observe the filtered list instead of the original one.

class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
}

Eventually, intermediate operators can be applied respectively. This means forming a chain of operators that are executed lazily while an item is emitted into the Flow. Applying an intermediate operator to a stream does not begin the flow collection indeed.

Collecting in flow

To enable the Flow and start listening for values, you should use a terminal operator. By using collect in this direct way, you can obtain all values at the time they are emitted into the stream. At the moment, in ViewModel in this sample, the LatestNewsViewModel, we aims to continue the Flow to get notified of the news and update the UI appropriately. So, we can call collect that invokes the Flow and starts using it for values. The lambda will be executed on every new value received. However, collect is a suspend function. Thus, it requires to be executed within a coroutine, which we can implement with the built-in ViewModelScope.

class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {

init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
}
}
}
}

When the ViewModel is created, we should create a new coroutine to collect the results from favouriteLatestNews. This triggers the flow in the data source, which will start fetching the latest news from the network. All emissions are modified by the map operator in the Repository layer to catch the user’s favorite issues. After accomplishing this step, the repository will save that info in the cache. Furthermore, the ViewModel will get the latest filtered information. Since the producer always remains active with the while loop, the stream of data will be closed when the ViewModel is cleared and ViewModelScope is canceled.

Handling exceptions efficiently: Exception transparency

As a matter of fact, the implementation of the producer can come from a third-party library. Therefore, it can throw unexpected exceptions. To manage these exceptions properly, you should use the catch intermediate operator. For instance, in the ViewModel layer, to catch unexpected exceptions, we can use the catch operator to handle them and represent the proper message to the user. Because catch is an intermediate operator, it requires to be called before collecting. In addition, catch can emit items to the flow. If we intend to handle those unexpected exceptions in a repository layer in our Android app, we can be able to utilize the catch operator and emit to the Flow applying the emit functions with the latest cached news. For instance:

class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
.catch { exception -> emit(lastCachedNews()) }
}

Using flows in Android development

Basically, flow is integrated in a number of Android Jetpack libraries. Besides, it is well-known among Android third-party libraries. Flow is extremely appropriate for data updates. For example, you can use flow with Room to be notified of changes in your database indeed. As you can see here, in the Dao, return a Flow type to obtain live updates. At any time, if there is a change in the Sample table, a new List is emitted with the items in the database.

@Dao
abstract class SampleDao {
@Query("SELECT * FROM Sample")
abstract fun getSamples(): Flow<List<Sample>>
}

Converting API with callbackFlow

Finally, there is another important Flow builder that it is used occasionally. In short, this assist you convert callback-based API into flows. As an example, the Firebase Firestore Android APIs use callbacks. At the moment, the question is how to convert those callbacks to flow and listen for Firestore database updates? In this sample, we have a FirestoreUserEventsDataSource, whose getUserEvents method returns a flow of UserEvents. As you can notice, we take an instance of FirebaseFirestore as a dependency. To create the flow, we use the callbackFlow API. By using the Flow builder API, in this example, we are in the context of a Coroutine. Nevertheless, in contrast to the Flow buider, Channel allows values to be emitted from a various Coroutine context or outside a coroutine with the offer method.

class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
fun getUserEvents(): Flow<UserEvents> = callbackFlow {

var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {

close(e)
}

val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }

try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// ...
}
}

awaitClose { subscription?.remove() }
}
}

The first step to do is initializing in the Firebase and getting the eventsCollection from it. However, this could potentially throw an exception if it fails the eventsCollection. If that arises (or Firebase cannot be initialized), we need to close the Flow. If Firebase can be initialized and be obtained the eventsCollection, we require to add a callback using addSnapshotListener. The callback lambda will be executed every time that there is a change to eventsCollection. Therefore, we can be able to check if the snapshot of the collection that we receive is new or not. Then, if there is not, we call offer to emit an item to the flow. Offer is not a suspend function. That is why we can call it from inside a callback. Thus, we initialized Firestore and added the subscription. Currently, we want to maintain the communication open with the consumer of the flow so that it can receive all events sent by Firestore properly. For accomplishing that purpose appropriately, we should use the awaitClose() method that will wait until the flow is closed or canceled.

In conclusion

In fact, Flows are built on top of coroutines and can support multiple values effectively. A flow is conceptually a stream of data, which can be computed asynchronously. Also, the emitted values have to be of the same type. This article considered the main concepts, entities, and async possibilities in Kotlin flow based on Google resources.

--

--

Kayvan Kaseb
Software Development

Senior Android Developer, Technical Writer, Researcher, Artist, Founder of PURE SOFTWARE YAZILIM LİMİTED ŞİRKETİ https://www.linkedin.com/in/kayvan-kaseb