Kotlin Flows — Fundamentals

Anitaa Murthy
6 min readJun 23, 2024

--

I’ve been looking more into Kotlin coroutines last week. The previous article focused on some of the fundamentals of coroutines such as CoroutineContext, CoroutineScope, Coroutine Builder etc. As promised, this is a follow up to that on Flows.

What are Flows?

A stream of data that can be computed asynchronously is referred to as a Flow . Flow, like LiveData and RxJava streams, allows you to implement the observer pattern: a software design pattern consisting of an object (source) that keeps a list of its dependents, called observers (collectors) and automatically notifies them of any state changes. A Flow uses suspend functions to produce and consume values asynchronously.

To create a Flow, first you need to create a producer.

val randomFlow: Flow<Int> = flow {
repeat(10) { it ->
emit(it+1) //Emits the result of the request to the flow
delay(1000) // Suspends the coroutine for 1 sec
}
}

To collect flow, first you will launch a Coroutine because flow operates on Coroutines under the hood. The collect operator is used to collect the values emitted by it.

lifecycleScope.launch {
viewModel.uiStateFlow.collect { it ->
binding.uiText.text = it.toString()
}
}

There are two different types of Flows:

  • Cold Flow — It does not start producing values until one starts to collect them. It can have only one subscriber and it does not store data.
// Regular Flow example
val coldFlow = flow {
emit(0)
emit(1)
emit(2)
}

launch { // Calling collect the first time
coldFlow.collect { value ->
println("cold flow collector 1 received: $value")
}

delay(2500)

// Calling collect a second time
coldFlow.collect { value ->
println("cold flow collector 2 received: $value")
}
}


// RESULT
// Both the collectors will get all the values from the beginning.
// For both collectors, the corresponding Flow starts from the beginning.
flow collector 1 received: [0, 1, 2]
flow collector 1 received: [0, 1, 2]
  • Hot Flow — It will produce values even if no one is collecting them. It can have multiple subscribers and it can store data.
// SharedFlow example
val sharedFlow = MutableSharedFlow<Int>()

sharedFlow.emit(0)
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
sharedFlow.emit(4)

launch {
sharedFlow.collect { value ->
println("SharedFlow collector 1 received: $value")
}

delay(2500)

// Calling collect a second time
sharedFlow.collect { value ->
println("SharedFlow collector 2 received: $value")
}
}

// RESULT
// The collectors will get the values from where they have started collecting.
// Here the 1st collector gets all the values. But the 2nd collector gets
// only those values that got emitted after 2500 milliseconds as it started
// collecting after 2500 milliseconds.
SharedFlow collector 1 received: [0,1,2,3,4]
SharedFlow collector 2 received: [2,3,4]

We can convert any cold flow to a hot one using the stateIn() and shareIn() operators respectively.

SharedFlow & StateFlow

  • StateFlow — A StateFlow is a HotFlow that represents a state holding a single value at a time. It is also a conflated flow, meaning that when a new value is emitted, the most recent value is retained and immediately emitted to new collectors. It is useful when you need to maintain a single source of truth for a state and automatically update all the collectors with the latest state. It always has an initial value and only stores the latest emitted value.
class HomeViewModel : ViewModel() {

private val _textStateFlow = MutableStateFlow("Hello World")
val stateFlow =_textStateFlow.asStateFlow()

fun triggerStateFlow(){
_textStateFlow.value="State flow"
}
}

// Collecting StateFlow in Activity/Fragment
class HomeFragment : Fragment() {
private val viewModel: HomeViewModel by viewModels()

override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)

lifecycleScope.launchWhenStarted {

// Triggers the flow and starts listening for values

// collectLatest() is a higher-order function in Kotlin's Flow API
// that allows you to collect emitted values from a Flow and perform
// a transformation on the most recent value only. It is similar to
// collect(), which is used to collect all emitted values,
// but collectLatest only processes the latest value emitted and
// ignores any previous values that have not yet been processed.
viewModel.stateFlow.collectLatest {
binding.stateFlowButton.text = it
}
}
}

// Collecting StateFlow in Compose
@Compose
fun HomeScreen() {
// Compose provides the collectAsStateWithLifecycle function, which
// collects values from a flow and gives the latest value to be used
// wherever needed. When a new flow value is emitted, we get the updated
// value, and re-composition takes place to update the state of the value.
// It uses LifeCycle.State.Started by default to start collecting values
// when the lifecycle is in the specified state and stops when it falls
// below it.
val someFlow by viewModel.flow.collectAsStateWithLifecycle()

}
  • SharedFlow — A SharedFlow is a HotFlow that can have multiple collectors. It can emit values independently of the collectors, and multiple collectors can collect the same values from the flow. It’s useful when you need to broadcast a value to multiple collectors or when you want to have multiple subscribers to the same stream of data. It does not have an initial value, and you can configure its replay cache to store a certain number of previously emitted values for new collectors.
class HomeViewModel : ViewModel() {
private val _events = MutableSharedFlow<Event>() // private mutable shared flow
val events = _events.asSharedFlow() // publicly exposed as read-only shared flow

suspend fun produceEvent(event: Event) {
_events.emit(event) // suspends until all subscribers receive it
}
}

// Collecting StateFlow in Activity/Fragment
class HomeFragment : Fragment() {
private val viewModel: HomeViewModel by viewModels()

override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)

lifecycleScope.launchWhenStarted {

// Triggers the flow and starts listening for values

// collectLatest() is a higher-order function in Kotlin's Flow API
// that allows you to collect emitted values from a Flow and perform
// a transformation on the most recent value only. It is similar to
// collect(), which is used to collect all emitted values,
// but collectLatest only processes the latest value emitted and
// ignores any previous values that have not yet been processed.
viewModel.events.collectLatest {
binding.eventFlowButton.text = it
}
}
}

// Collecting StateFlow in Compose
@Compose
fun HomeScreen() {
// Compose provides the collectAsStateWithLifecycle function, which
// collects values from a flow and gives the latest value to be used
// wherever needed. When a new flow value is emitted, we get the updated
// value, and re-composition takes place to update the state of the value.
// It uses LifeCycle.State.Started by default to start collecting values
// when the lifecycle is in the specified state and stops when it falls
// below it.
val someFlow by viewModel.events.collectAsStateWithLifecycle()

}

Exception Handling in Flows

Kotlin Flow offers several mechanisms for handling exceptions and errors.

  • try-catch Blocks — One of the fundamental ways to handle exceptions is by using try-catch blocks within your flow.
flow {
try {
emit(productsService.fetchProducts())
} catch (e: Exception) {
emitError(e)
}
}
  • catch Operator — The catch operator in Flow allows you to handle exceptions by encapsulating the error-handling logic in one place.
flow {
emit(productsService.fetchProducts())
}.catch { e ->
emitError(e)
}
  • onCompletion Operator — is used for executing code after the Flow completes, whether it completes successfully or with an exception.
flow {
emit(productsService.fetchProducts())
}.onCompletion { cause ->
if (cause != null) {
emitError(cause)
}
}
  • Custom Error Handling — In complex scenarios in Android, we can create custom operators or extension functions to handle errors in a way that suits our application.
fun <T> Flow<T>.sampleErrorHandler(): Flow<Result<T>> = transform { value ->
try {
emit(Result.Success(value))
} catch (e: Exception) {
emit(Result.Error(e))
}
}

Flows vs LiveData

  • LiveData is lifecycle-aware, which means it automatically manages the lifecycle of the observers, ensuring that updates are only delivered when the observer is in an active state. Flow, on the other hand, is not lifecycle-aware by default. We can use the collectLatest() or collectAsStateWithLifecycle() functions in Compose to collect results from Flow.
  • Flow provides more flexibility and is suitable for more complex asynchronous data operations, while LiveData is typically used for simpler UI updates.
  • Flow provides built-in support for backpressure, allowing control over the rate of data emission and processing, whereas LiveData doesn’t support backpressure handling.
  • Flow offers a rich set of operators for sequential and structured processing, while LiveData focuses on delivering the latest data to observers.

Thanks for reading guys! I hope you enjoyed this article and found it useful. Let me know your thoughts in the comments section.

Happy coding!

--

--