Kotlin Flows Part1 | Android

Awadhesh Singh
5 min readMar 17, 2024

--

Kotlin Flow is a reactive programming library for Kotlin
It provides a way to asynchronously process streams of data.
Its concise and streamlined syntax, based on Coroutines, makes it easy to create and manipulate data streams.

What is Flow →
Flows are built on the top of Coroutine and can provide multiple values. A flow is conceptually a stream of data that can be computed asynchronously.
A flow uses suspended function to produce and consume the data and the emitted data should be same type.

Flows allow you to implement the observable design pattern and it involves three part/entities

  1. Producer → Producer produces the data that is added in stream.flows can also produce data asynchronously.
  2. Intermidiaries(Optional) → it can modify the data emitted in to streams without consuming it.
  3. Consumer → Consumer consumes the value from streams emitted by producers
Flow Layers

Explainantion
A Repository is typically a producer tha produces data for UI, and User interface consumes the data and act as a consumer. and other times UI is a producer of user inputs event and other layer in hierarchy consumes the data. Layers in between the producer and consumer is intermediaries that can modify the stream to fullfil the requirements of consumer layer.

Creating A Flow →
Flow Builder Apis
is used for creating a flow. The flow builder function creates a new flow where you can manually emit new values into the stream of data using the emit function.
Flow builder can be executed within the coroutine scope only
- Flows are sequential. the producer is in a coroutine, when calling a suspend function, the producer suspends until the suspend function returns.
- With the flow builder, the producer cannot emit values from a different CoroutineContextTherefore, don't call emit in a different CoroutineContext by creating new coroutines or by using withContext blocks of code. You can use other flow builders such as callbackFlow in these cases.

class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Modifying the stream

Intermediaries can use intermediate operators to modify the stream of data without consuming the values. These operators are functions that, when applied to a stream of data, set up a chain of operations that aren’t executed until the values are consumed in the future

class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}

Collecting from a flow

Use a terminal operator to trigger the flow to start listening for values. To get all the values in the stream as they’re emitted, use collect
As collect is a suspend function, it needs to be executed within a coroutine. It takes a lambda as a parameter that is called on every new value. Since it's a suspend function, the coroutine that calls collect may suspend until the flow is closed.

class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {

init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}

Collecting the flow triggers the producer that refreshes the latest news and emits the result of the network request on a fixed interval. As the producer remains always active with the while(true) loop, the stream of data will be closed when the ViewModel is cleared and viewModelScope is cancelled.

Flow collection can stop for the following reasons:
— The coroutine that collects is cancelled, as shown in the previous example. This also stops the underlying producer.
— The producer finishes emitting items. In this case, the stream of data is closed and the coroutine that called collect resumes execution.

Flows are cold and lazy unless specified with other intermediate operators. This means that the producer code is executed each time a terminal operator is called on the flow.

Catching unexpected exceptions

The implementation of the producer can come from a third party library. This means that it can throw unexpected exceptions. To handle these exceptions, use the catch intermediate operator.

class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {

init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}

in this Example, the collect lambda isn't called, as a new item hasn't been received.
catch
can also emit items to the flow. The example repository layer could emit the cached values instead:

class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}

Executing in a different CoroutineContext

By default, the producer of a flow builder executes in the CoroutineContext of the coroutine that collects from it.
it cannot emit values from a different CoroutineContext. This behavior might be undesirable in some cases.
To change the CoroutineContext of a flow, use the intermediate operator flowOn. flowOn changes the CoroutineContext of the upstream flow, meaning the producer and any intermediate operators applied before (or above) flowOn. The downstream flow (the intermediate operators after flowOn along with the consumer) is not affected and executes on the CoroutineContext used to collect from the flow. If there are multiple flowOn operators, each one changes the upstream from its current location.

class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}

With this code, the onEach and map operators use the defaultDispatcher, whereas the catch operator and the consumer are executed on Dispatchers.Main used by viewModelScope.

Please Comment and follow for more updates on flows

GitHub

--

--