Kotlin Flows Part1 | Android
Kotlin Flow is a reactive programming library for Kotlin
It provides a way to asynchronously process streams of data.
Its concise and streamlined syntax, based on Coroutines, makes it easy to create and manipulate data streams.
What is Flow →
Flows are built on the top of Coroutine and can provide multiple values. A flow is conceptually a stream of data that can be computed asynchronously.
A flow uses suspended function to produce and consume the data and the emitted data should be same type.
Flows allow you to implement the observable design pattern and it involves three part/entities
- Producer → Producer produces the data that is added in stream.flows can also produce data asynchronously.
- Intermidiaries(Optional) → it can modify the data emitted in to streams without consuming it.
- Consumer → Consumer consumes the value from streams emitted by producers
Explainantion
A Repository is typically a producer tha produces data for UI, and User interface consumes the data and act as a consumer. and other times UI is a producer of user inputs event and other layer in hierarchy consumes the data. Layers in between the producer and consumer is intermediaries that can modify the stream to fullfil the requirements of consumer layer.
Creating A Flow →
Flow Builder Apis is used for creating a flow. The flow
builder function creates a new flow where you can manually emit new values into the stream of data using the emit
function.
Flow builder can be executed within the coroutine scope only
- Flows are sequential. the producer is in a coroutine, when calling a suspend function, the producer suspends until the suspend function returns.
- With the flow
builder, the producer cannot emit
values from a different CoroutineContext
Therefore, don't call emit
in a different CoroutineContext
by creating new coroutines or by using withContext
blocks of code. You can use other flow builders such as callbackFlow
in these cases.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
Modifying the stream
Intermediaries can use intermediate operators to modify the stream of data without consuming the values. These operators are functions that, when applied to a stream of data, set up a chain of operations that aren’t executed until the values are consumed in the future
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Collecting from a flow
Use a terminal operator to trigger the flow to start listening for values. To get all the values in the stream as they’re emitted, use collect
As collect
is a suspend function, it needs to be executed within a coroutine. It takes a lambda as a parameter that is called on every new value. Since it's a suspend function, the coroutine that calls collect
may suspend until the flow is closed.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Collecting the flow triggers the producer that refreshes the latest news and emits the result of the network request on a fixed interval. As the producer remains always active with the while(true)
loop, the stream of data will be closed when the ViewModel is cleared and viewModelScope
is cancelled.
Flow collection can stop for the following reasons:
— The coroutine that collects is cancelled, as shown in the previous example. This also stops the underlying producer.
— The producer finishes emitting items. In this case, the stream of data is closed and the coroutine that called collect
resumes execution.
Flows are cold and lazy unless specified with other intermediate operators. This means that the producer code is executed each time a terminal operator is called on the flow.
Catching unexpected exceptions
The implementation of the producer can come from a third party library. This means that it can throw unexpected exceptions. To handle these exceptions, use the catch
intermediate operator.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
in this Example, the collect
lambda isn't called, as a new item hasn't been received.
can also
catchemit
items to the flow. The example repository layer could emit
the cached values instead:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
Executing in a different CoroutineContext
By default, the producer of a flow
builder executes in the CoroutineContext
of the coroutine that collects from it.
it cannot emit
values from a different CoroutineContext
. This behavior might be undesirable in some cases.
To change the CoroutineContext
of a flow, use the intermediate operator flowOn. flowOn
changes the CoroutineContext
of the upstream flow, meaning the producer and any intermediate operators applied before (or above) flowOn
. The downstream flow (the intermediate operators after flowOn
along with the consumer) is not affected and executes on the CoroutineContext
used to collect
from the flow. If there are multiple flowOn
operators, each one changes the upstream from its current location.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
With this code, the onEach
and map
operators use the defaultDispatcher
, whereas the catch
operator and the consumer are executed on Dispatchers.Main
used by viewModelScope
.
Please Comment and follow for more updates on flows
GitHub