Advanced Kotlin Flow Cheat sheet (for Android Engineer)
You’ve been working with Kotlin Flows for a while and you’re already familiar with basic concepts. But you might have never used Channel
don’t know the difference between merge
, combine
and zip
or don’t fully understand SharedFlow
and StateFlow
and how to use them.
This cheat sheet consolidates the key concepts of all those elements and I’m sharing the insights I’ve picked up along the way. It’s designed to be a handy reference for tackling more intricate flow scenarios.
Note: For advanced Kotlin Coroutines tips, consider checking out my previous cheat sheet:
Cold streams VS Hot streams
Hot streams
- Example:
channel
, Collections (List
,Set
…). - Start immediately: begins emitting values regardless of whether there are subscribers.
- Store the elements: they don’t need to be recalculated and all subscribers receive the same sequence of values.
Cold streams
- Examples:
Sequence
,Flow
- Start on demand: A cold stream begins emitting values only when a subscriber actively subscribes to it. The data source are lazy.
- Independent emissions: Each subscriber receives its own independent sequence of values. No elements are stored.
Channel
Main principles
- Are a hot stream of values.
- Guarantee no conflicts (no problem with shared state) and fairness so they are useful when different coroutines need to communicate with each other.
- Support any numbers of senders and receivers.
- Every value send to the channel is received only once.
- If there are several receivers subscribed at the same time, the elements will be fairly distributed between the receivers. (FIFO queue of the receivers).
The channel has 3 receivers, by order of subscription:
Receiver1, Receiver2, Receiver3.
All the receivers have already subscribed to the channel.
The channel emit 4 values: "A", "B", "C" then "D".
Receiver1 receives "A" and "D"
Receiver2 receives "B"
Receiver3 receives "C"
- They have 2 suspending function
send
andreceive
. receive
is suspended if there are no elements in the channel and will wait for an element to be available to resume.send
is suspended if the channel reaches its capacity (see below for the channel capacity).- We can also use the non suspending version
trySend
andtryReceive
which return aChannelResult
(tells us if the operation was successful or not). - They need to be closed manually once we are done sending data or when an exception occurs:
myChannel.close()
. Otherwise the receivers will wait for elements forever.
Channel capacity types
val myChannel = Channel<Int>(capacity = 3)
// OR
val myChannel = produce(capacity = 3) {
// emit values here
}
Channel.UNLIMITED
: unlimited buffer and send never suspends.Channel.BUFFERED
: Buffer capacity of 64. This default value can be overridden with the system propertykotlinx.coroutines.channels.defaultBuffer
in JVM.Channel.RENDEZVOUS
: (default behavior) buffer capacity of 0. The receiver will only receive the data if it is subscribed to the sender when the data is being emitted.Channel.CONFLATED
: Buffer capacity of 1. Each new element replaces the previous one.- Any
int
value: Buffer will have the capacity set by the int.
Handling buffer overflow
Channels have a parameter onBufferOverflow
that controls what happens when the buffer is full. There are 3 options:
BufferOverflow.SUSPEND
: (default behavior) suspend thesend
method when the buffer is full.BufferOverflow.DROP_OLDEST
: drop the oldest element when the buffer is full.BufferOverflow.DROP_LATEST
: drop the latest element when the buffer is full.
Creating a Channel that will close automatically
The coroutine builder, produce
closes the channel whenever the builder coroutine ends (it finishes, stopes or is cancelled).
suspend fun myFunction() = coroutineScope {
val channel = produce {
// emit values here and don't need to call close() at the end
}
}
Automatically clean up if an element could not be handled
If the channel was closed or cancelled or when send
, receive
or hastNext
throw an error
val myChannel = Channel(
capacity,
onUnderliveredElement = { /* clean up operations here */ }
)
Use case: triggering a refresh
In Android a common use case for channels would be when triggering a refresh of a screen (pull to refresh or retry button). The snippet below demonstrate how to fetch data from an API when we first subscribe to the flow or when a refresh is triggered.
A lot of people use SharedFlow
to trigger a refresh and it works but it is not the best solution as SharedFlow
are designed to have several receivers (see below for more details about SharedFlow
).
// This is a simplify version to demonstrate how we can use channels.
// In a real use case, we would require some extra logic to avoid
// refreshing if the data is already loading for example.
interface ApiService {
suspend fun fetchData(): List<String>
}
class FetchDataUseCase @Inject constructor (
private val apiService: ApiService
) {
// create a channel with a buffer of 1 and will drop the newest data
// so if we trigger refresh several times in a row we will only
// keep the first element.
private val refreshChannel = Channel<Unit>(
capacity = 1,
onBufferOverflow = BufferOverflow.DROP_LATEST
)
// this flow can be received by the viewModel to build the UI state
val dataState: Flow<FetchDataState> =
refreshChannel
// convert channel into a flow
.consumeAsFlow()
// emit an element on start to fetch data as soon as we subscribe
// to the flow
.onStart { emit(Unit) }
.map { fetchData() }
fun refresh() {
// We use the trySend function here to not have to create a
// suspend function and so we don't need a scope to call it.
// this method can be called from the viewModel to trigger a refresh
refreshChannel.trySend(Unit)
}
private suspend fun fetchData(): FetchDataState =
try {
val data = apiService.fetchData()
FetchDataState.Success(data)
} catch (e: Exception) {
FetchDataState.Error(e.message ?: "An error occurred")
}
sealed interface FetchDataState {
data object Loading : FetchDataState
data class Success(val data: List<String>) : FetchDataState
data class Error(val message: String) : FetchDataState
}
}
Flow
Main principles
- Are a cold stream of values.
- Structured concurrency is supported out of the box.
- The last operation of a flow is called terminal operation (
collect
,first
, etc…). - A flow can have intermediate operations that modify the flow (
map
,onEach
,flatMapLastest
, etc…). - Terminal operations are suspending and require a scope.
- Uncaught exceptions immediately cancel a flow and
collect
will re-throws the exception. - By default a flow will take its context from the context where
collect
is called.
Combining flows together
merge
, combine
and zip
are all intermediate function that allow us to combine 2 (or more) flows into 1. So what are the key differences between those 3 functions?
merge
- Do not modify any elements.
- Elements are emitted as soon as they are produced and we don’t wait for the other flow to produce an element as well before producing the value.
- Use it when you have multiple source of events that should produce the same action.
flowA emits: 1
flowB emits: 2
flowA emits: 3
merge(flowA, flowB) produces 1, 2, 3
zip
- Combine elements from the different flows to create a new value.
- We need a function that will specify how the elements are combined together.
- We need to wait for each flow to emit a value to be able to create pair.
- Elements can only be part of one pair.
- Elements left without a pair are lost.
flowA emits: 1
flowB emits: 2
flowA emits: 3
flowA.zip(flowB) {fA, fB -> fA + fB } produces 3 (1+2 and 3 is dropped)
combine
- Combine elements from the different flows to create a new value.
- We need a function that will specify how the elements are combined together.
- We need to wait for the slower flow to emit a value for the first time before producing the new element.
- When a flow produces a new element, it replaces its predecessor and a new value is emitted right away (we don’t wait for each flow to emit a new element).
flowA emits: 1
flowB emits: 2
flowA emits: 3
flowA.combines(flowB) { fA, fB -> fA + fB } produces 3 (1+2) then 5 (3+2)
Differences between fold and scan
Both fold
and scan
combine all the values emitted by a flow into one element by applying an operation that combines the values together.
fold
is a terminal operation. It suspends until the flow complete and produces the final valuescan
is an intermediate operation and produces all the intermediate values
val myflow = flowOf(1, 2, 3, 4)
myFlow.fold(0) { acc, newElement -> acc + newElement } // produces 10
myFlow.scan(0) { acc, newElement -> acc + newElement }
// produces 1, 3 (1+2), 6 (3+3), 10 (6+4)
flatMapConcat, flatMapMerge and flatMapLatest
- They are all intermediate operators
- They transforms elements emitted by the original flow by applying another flow on the element and returns another flow
myFlowA.flatMapConcat { fA -> myFlowB(fA) } // return value produced by flow B
flatMapConcat
- Transforms each emitted value into a Flow and concatenates the resulting Flows sequentially.
- Emits values from the first inner Flow completely before starting the next.
- Use Case: When you need to process inner Flows in order, without overlapping.
flatMapMerge
- Transforms each emitted value into a flow and merges the resulting Flows concurrently.
- Emits values from all inner flows as they become available, potentially out of order.
- Use Case: When you want to process inner flows concurrently and don’t care about the order of emitted values.
flatMapLatest
- Transforms each emitted value into a flow, cancels previous flows when a new value is emitted, and emits values from the latest flow.
- Only the latest flow is active, and its values are emitted. Previous flows are canceled.
- Use Case: When you only care about the latest value and want to cancel previous operations.
data class User(val id: Int, val name: String)
data class UserDetails(val userId: Int, val address: String)
fun fetchUserData(): Flow<User> = flow {
emit(User(1, "Alice"))
delay(500)
emit(User(2, "Bob"))
delay(500)
emit(User(3, "Charlie"))
}
fun fetchUserDetails(userId: Int): Flow<UserDetails> = flow {
delay(1000) // Simulate network delay
emit(UserDetails(userId, "$userId's address"))
}
// flatMapConcat
fetchUserData()
.flatMapConcat { user ->
fetchUserDetails(user.id)
}
.collect { userDetails ->
println("flatMapConcat: ${userDetails}")
}
// Each user's details are fetched sequentially.
// flatMapConcat: UserDetails(userId=1, address=1's address)
// flatMapConcat: UserDetails(userId=2, address=2's address)
// flatMapConcat: UserDetails(userId=3, address=3's address)
// flatMapMerge
fetchUserData()
.flatMapMerge { user ->
fetchUserDetails(user.id)
}
.collect { userDetails ->
println("flatMapMerge: ${userDetails}")
}
// User details might be interleaved due to concurrent fetching.
// flatMapMerge: UserDetails(userId=1, address=1's address)
// flatMapMerge: UserDetails(userId=2, address=2's address)
// flatMapMerge: UserDetails(userId=3, address=3's address)
// flatMapLatest
fetchUserData()
.flatMapLatest { user ->
fetchUserDetails(user.id)
}
.collect { userDetails ->
println("flatMapLatest: ${userDetails}")
}
// Only the details for the last user are fetched and printed because
// new users cancel previous fetches.
// flatMapLatest: UserDetails(userId=3, address=3's address)
Converting a function into a Flow
val function = suspend {
// this is suspending lambda expression
// define function here
}
function.asFlow()
OR
suspend fun myFunction(): Flow<T> {
// define function here
}
::myFunction.asFlow()
Creating a Flow that will produce elements before we subscribe to it
The channelFlow
function creates an hybrid between a flow and a channel. It produces a hot stream of data but also implement the Flow
interface.
val myChannelFlow = channelFlow {
val myData = // fetch data here
send(myData)
}
suspend fun fetchData() {
myData.first()
}
Modifying the context of a Flow
myFlow
.flowOn(Dispatchers.IO)
// OR
myFlow
.flowOn(CoroutineName("NewName"))
Avoiding extra nesting level when launching a Flow
// instead of this
viewModelScope.launch {
myFlow
.collect()
}
// do this
myFlow
.launchIn(viewModelScope)
SharedFlow
Main principles
- Is a hot stream of values.
- Can have multiple receivers and they will all receive the same values.
- Useful when you need to broadcast values to multiple consumers or want to share a state or event among different parts of your application.
- Never completes until we close the whole scope.
- Has a mutable version
MutableSharedFlow
which allow us to update the state by emitting new values with the suspending functionemit
. - We can also use the non suspending version
tryEmit
. - Supports configurable replay and buffer overflow strategy.
- All methods of shared flow are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.
Configuration parameters
Kotlin is providing us with a useful method to create a MutableSharedFlow
and define how we want the buffer to behave:
public fun <T> MutableSharedFlow(
// the number of values replayed to new subscribers
replay: Int = 0,
// the number of values buffered in addition to `replay`
extraBufferCapacity: Int = 0,
// action on buffer overflow
// Possible values: SUSPEND, DROP_OLDEST, DROP_LATEST
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
shareIn function
- Transforms a
Flow
into aSharedFlow
. - Useful when we want to turn one flow into multiple flows
- Expect a coroutine scope as the first argument (
scope
) to start a coroutine and collect the element of the flow. - The second argument
started
determines when the sharedFlow should starts listening to the value emitted by the flow. It takes aSharingStarted
object (see below for options). - The third argument,
replay
, (0 by default) which defines the numbers of values replayed to new subscribers.
SharingStarted
options:
SharingStarted.Eagerly
: starts listening for elements immediately and never stops until the scope is cancelled.SharingStarted.Lazily
: starts listening when the first subscriber appears and never stops until the scope is cancelled.SharingStarted.WhileSubscribed()
: starts listening when the first subscriber appears and immediately stops when the last subscriber disappears. We configures a delay (in milliseconds) between the disappearance of the last subscriber and the stopping of the sharing coroutine in thestopTimeoutMillis
parameter.
Note about WhileSubscribed
: if you open a new intent from your screen like the camera app for example, your screen will be paused and so your SharedFlow will have no subscribers anymore and will stop emitting. When going back to your original screen you will re-subscribe to your screen and might run the operation inside your flow again. This could cause issue or re-trigger an unnecessary operation.
Note about SharingStarted.Eagerly
and SharingStarted.Lazily
: if you are using ViewModelScope or a LifecycleScope the SharedFlow
will stop sending elements when the screen is destroyed.
For a more detailed explanation on how those options works, I recommand this great blog post from P-Y:
Turning a flow into a SharedFlow
// from a viewModel or a class with a lifeCycleScope
myFlow.shareIn(
scope = viewModelScope
started = SharingStarted.Lazily
)
// from a class without a lifeCycleScope (repository or use case)
suspend fun myFunction() = coroutineScope {
myFlow.shareIn(
scope = this
started = SharingStarted.Lazily
)
}
Use case: Observing database changes from several locations
If you use Room for your database you might already know that it supports Flow out of the box. So you can observe changes in your database and receive the new data as soon as it is available. But reading data from the disk can be heavy. If you need to receive the data in several screens, you can use SharedFlow
to avoid having to fetch it for every screen.
In this example we will demonstrate how we can fetch the UserSettings
once but receive the updates on several screen
// simple DAO to fetch the data from Room
@Dao
interface UserSettingsDao {
// fetch all the user settings from the database and emit a flow
@Query("SELECT * FROM user_settings")
fun getAll(): Flow<List<UserSettings>>
}
class UserSettingsRepository @Inject constructor(
private val dao: UserSettingsDao
) {
// We only read from the DB once and all the receiver will receive the
// data that is computed here.
suspend fun getAll(): SharedFlow<List<UserSettings>> = coroutineScope {
dao.getAll.shareIn(
// pass down the scope
scope = this,
// only start emitting when we have a receiver
started = SharingStarted.Lazily,
// replay the latest element when a new receiver subscribe to it
replay = 1
)
}
}
StateFlow
Main principles
- Works similarly to a
SharedFlow
with thereplay
parameter set to 1. - Always only stores one value.
- The value stored can be accessed with the
value
property. - We need to set an initial value in the constructor.
- Modern alternative to
LiveData
. - Won’t emit the new element if it’s equal to the previous one.
Setting and reading a value
val state = MutableStateFlow("A") // initial value is A
state.value = "B" // set value to B
state.value = "B" // this won't emit a new element because the value is already B
val myValue = state.value // read value from the state, here "B"
stateIn function
- Transforms a flow into a
StateFlow
. - Need to specify a scope.
- Has 2 variants, one suspending and one not suspending
stateIn suspending
- suspend until the first element of the flow is emitted and the new value is calculated
suspend fun myFunction() = coroutineScope {
myFlow.stateIn(this)
}
stateIn not suspending
- Requires an initial value in its
initialValue
argument. - Its second argument is
started
and expect aSharingStarted
element. (see SharedFlow shareIn above for more detail about this argument).
myFlow.stateIn(
scope = viewModelScope,
started = SharingStarted.Lazily,
initialValue = "A"
)
Use case: Emitting data from the viewModel to a view
Snippet on how to convert a flow into a StateFlow
in order to emit a state from the view model that can be observed from your view:
class MyViewModel @Inject constructor(
private val fetchDataUseCase: FetchDataUseCase
) : ViewModel() {
val myState: StateFlow<MyState> =
fetchDataUseCase.dataState
.map {
when (it) {
is FetchDataUseCase.FetchDataState.Loading -> MyState.Loading
is FetchDataUseCase.FetchDataState.Success -> MyState.Success(it.data)
is FetchDataUseCase.FetchDataState.Error -> MyState.Error(it.message)
}
}
// transform flow into a state flow
.stateIn(
// set the scope to the viewModel so we will stop
// listening when the viewModel is destroyed
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = MyState.Loading
)
sealed interface MyState {
data object Loading : MyState
data class Success(val data: List<String>) : MyState
data class Error(val message: String) : MyState
}
}
@Composable
fun MyScreen(viewModel = MyViewModel()) {
val state = viewModel.myState.collectAsStateWithLifecycle()
when (state) {
is MyState.Loading -> // show loading view
is MyState.Success -> // show success view
is MyState.Error -> // show error view
}
}
To take a deep dive on flows and coroutines and better understand how they work, I highly recommend this book from Marcin Moskała: