Kotlin flows API (Part 2)

Arun Aditya
16 min readDec 8, 2023

--

Part 1 : https://medium.com/@aruncse2k20/kotlin-flows-api-16469c412588

Kotlin Flow Context Preservation(flowOn) + Exception Handling(catch)

fun main(){
GlobalScope.launch(Dispatchers.Main){
producer()
.collect{
Log.d("Arun","Collector Thread - ${Thread.currentThread().name}")
}
}
}

private fun producer():Flow<Int>{
return flow<Int>{
val list = listof(1,2,3,4,5)
list.forEach{
delay(1000)
Log.d("Arun","Emitter Thread - ${Thread.currentThread().name}")
emit(it)
}
}
}

Output:

2023-12-08 23:42:31.407 12814-12814 Arun                     D  Emitter Thread - main
2023-12-08 23:42:31.408 12814-12814 Arun D Collector Thread - main
2023-12-08 23:42:31.474 12814-12833 EGL_emulation D app_time_stats: avg=17.21ms min=4.57ms max=63.33ms count=62
2023-12-08 23:42:32.422 12814-12814 Arun D Emitter Thread - main
2023-12-08 23:42:32.422 12814-12814 Arun D Collector Thread - main
2023-12-08 23:42:32.491 12814-12833 EGL_emulation D app_time_stats: avg=3.62ms min=0.82ms max=27.34ms count=59
2023-12-08 23:42:33.439 12814-12814 Arun D Emitter Thread - main
2023-12-08 23:42:33.439 12814-12814 Arun D Collector Thread - main
2023-12-08 23:42:33.508 12814-12833 EGL_emulation D app_time_stats: avg=1.41ms min=0.82ms max=2.67ms count=61
2023-12-08 23:42:34.455 12814-12814 Arun D Emitter Thread - main
2023-12-08 23:42:34.455 12814-12814 Arun D Collector Thread - main
2023-12-08 23:42:34.524 12814-12833 EGL_emulation D app_time_stats: avg=1.18ms min=0.89ms max=1.82ms count=60
2023-12-08 23:42:35.474 12814-12814 Arun D Emitter Thread - main
2023-12-08 23:42:35.475 12814-12814 Arun D Collector Thread - main

If You analyze the above, you will find that both emitter and collector are running on the main thread. Since we executed collect on the main thread so, it was expected that it would run on the main thread, but the producer also executed on the main thread. This shows that the Thread of execution of the producer depends on the thread of the collector.

But, in real-world scenarios producer is meant to fetch data from remote so ideally it should execute on IO Thread.

To change the context inside the coroutine library we have a method called withcontext Let's see if we can use it here

fun main(){
GlobalScope.launch(Dispatchers.Main){
producer()
.collect{
Log.d("Arun","Collector Thread - ${Thread.currentThread().name}")
}
}
}

private fun producer():Flow<Int>{
return flow<Int>{
withContext(Dispatchers.IO){
val list = listof(1,2,3,4,5)
list.forEach{
delay(1000)
Log.d("Arun","Emitter Thread - ${Thread.currentThread().name}")
emit(it)
}
}
}
}

output:-

FATAL EXCEPTION: main
Process: com.mercedesbenz.calendar, PID: 12942
java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [StandaloneCoroutine{Active}@ceae82d, Dispatchers.Main],
but emission happened in [DispatchedCoroutine{Active}@290c162, Dispatchers.IO].
Please refer to 'flow' documentation or use 'flowOn' instead

So, the above error suggests that we can not use withCotext() to switch threads. Instead, it suggests we should use flowOn

The reason behind the error flow preserves the coroutine context. It assumes that you would emit on the same thread that you used to collect. If you want to switch the context then you have to tell the flow explicitly that you want to execute this portion of the flow in a particular context and a particular portion of the flow in a different context. and to achieve it you can use flowOn operator

fun main(){
GlobalScope.launch(Dispatchers.Main){
producer()
.flowOn(Dispatchers.IO)
.collect{
Log.d("Arun","Collector Thread - ${Thread.currentThread().name}")
}
}
}

private fun producer():Flow<Int>{
return flow<Int>{
val list = listof(1,2,3,4,5)
list.forEach{
delay(1000)
Log.d("Arun","Emitter Thread - ${Thread.currentThread().name}")
emit(it)
}
}
}

output:-

2023-12-09 09:17:17.315 14249-14276 Arun                   D  Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:17:17.339 14249-14249 Arun D Collector Thread - main
2023-12-09 09:17:17.407 14249-14268 EGL_emulation D app_time_stats: avg=3.38ms min=0.87ms max=64.45ms count=45
2023-12-09 09:17:18.321 14249-14276 Arun D Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:17:18.358 14249-14249 Arun D Collector Thread - main
2023-12-09 09:17:18.423 14249-14268 EGL_emulation D app_time_stats: avg=1.50ms min=0.75ms max=15.98ms count=42
2023-12-09 09:17:19.322 14249-14276 Arun D Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:17:19.338 14249-14249 Arun D Collector Thread - main
2023-12-09 09:17:19.438 14249-14268 EGL_emulation D app_time_stats: avg=1.57ms min=1.14ms max=5.82ms count=42
2023-12-09 09:17:20.323 14249-14276 Arun D Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:17:20.354 14249-14249 Arun D Collector Thread - main
2023-12-09 09:17:20.455 14249-14268 EGL_emulation D app_time_stats: avg=1.36ms min=1.04ms max=1.90ms count=42
2023-12-09 09:17:21.324 14249-14276 Arun D Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:17:21.355 14249-14249 Arun D Collector Thread - main
2023-12-09 09:17:21.488 14249-14268 EGL_emulation D app_time_stats: avg=1.29ms min=1.03ms max=1.80ms count=46
2023-12-09 09:17:22.059 14249-14284 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-12-09 09:17:22.525 14249-14268 EGL_emulation D app_time_stats: avg=1.22ms min=1.01ms max=1.45ms count=45

flowOn executes all the code above it on on there context.

In summary whenever you want to switch context in flow then you need to inform it to flow by using the flowOn operator. All the code above flowOn will execute on the context that you passed on flowOn.

fun main(){
GlobalScope.launch(Dispatchers.Main) {
producer()
.map {
delay(1000)
it * 2
Log.d("Arun", "Map thread:- ${Thread.currentThread().name} ")
}
.flowOn(Dispatchers.IO)
.filter {
delay(500)
Log.d("Arun", "Filter Thread:- ${Thread.currentThread().name}")
it < 8
}
.flowOn(Dispatchers.Main)
.collect {
Log.d("Arun", "Collector Thread - ${Thread.currentThread().name} ")
}
}
}

private fun producer():Flow<Int>{
return flow<Int>{
val list = listof(1,2,3,4,5)
list.forEach{
delay(1000)
Log.d("Arun","Emitter Thread - ${Thread.currentThread().name}")
emit(it)
}
}
}

output:-

2023-12-09 09:32:07.859 14436-14463 Arun                  D  Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:32:07.972 14436-14455 EGL_emulation D app_time_stats: avg=3.53ms min=0.82ms max=107.67ms count=62
2023-12-09 09:32:08.861 14436-14463 Arun D Map thread:- DefaultDispatcher-worker-1
2023-12-09 09:32:08.988 14436-14455 EGL_emulation D app_time_stats: avg=1.81ms min=0.89ms max=16.01ms count=60
2023-12-09 09:32:09.362 14436-14436 Arun D Filter Thread:- main
2023-12-09 09:32:09.362 14436-14436 Arun D Collector Thread - main
2023-12-09 09:32:09.862 14436-14463 Arun D Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:32:09.988 14436-14455 EGL_emulation D app_time_stats: avg=1.68ms min=0.94ms max=2.39ms count=60
2023-12-09 09:32:10.863 14436-14463 Arun D Map thread:- DefaultDispatcher-worker-1
2023-12-09 09:32:10.995 14436-14455 EGL_emulation D app_time_stats: avg=1.53ms min=0.90ms max=2.46ms count=60
2023-12-09 09:32:11.366 14436-14436 Arun D Filter Thread:- main
2023-12-09 09:32:11.366 14436-14436 Arun D Collector Thread - main
2023-12-09 09:32:11.866 14436-14463 Arun D Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:32:12.004 14436-14455 EGL_emulation D app_time_stats: avg=1.40ms min=0.96ms max=2.60ms count=61
2023-12-09 09:32:12.054 14436-14470 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-12-09 09:32:12.866 14436-14463 Arun D Map thread:- DefaultDispatcher-worker-1
2023-12-09 09:32:13.025 14436-14455 EGL_emulation D app_time_stats: avg=1.44ms min=0.94ms max=4.95ms count=60
2023-12-09 09:32:13.371 14436-14436 Arun D Filter Thread:- main
2023-12-09 09:32:13.371 14436-14436 Arun D Collector Thread - main
2023-12-09 09:32:13.868 14436-14463 Arun D Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:32:14.871 14436-14463 Arun D Map thread:- DefaultDispatcher-worker-1
2023-12-09 09:32:15.373 14436-14436 Arun D Filter Thread:- main
2023-12-09 09:32:15.373 14436-14436 Arun D Collector Thread - main
2023-12-09 09:32:15.878 14436-14463 Arun D Emitter Thread - DefaultDispatcher-worker-1
2023-12-09 09:32:16.884 14436-14463 Arun D Map thread:- DefaultDispatcher-worker-1
2023-12-09 09:32:17.389 14436-14436 Arun D Filter Thread:- main
2023-12-09 09:32:17.389 14436-14436 Arun D Collector Thread - main

In the above example, the Emitter and map execute on the worker thread, and the filter and collector execute on the main thread.

flowOn work upstream.

Exception Handling(catch)

In flow, there are two scenarios when exceptions can occur.

  1. During Emission
  2. During collection

Example:-

fun main(){
GlobalScope.launch(Dispatchers.Main) {
try {
producer()
.collect {
Log.d("Arun", "Collector Thread - ${Thread.currentThread().name} ")
}
} catch (e: Exception) {
Log.d("Arun", e.message.toString())
}
}
}

private fun producer(): Flow<Int> {
return flow<Int> {
val list = listOf(1, 2, 3, 4, 5)
list.forEach {
delay(1000)
Log.d("Arun", "Emitter Thread - ${Thread.currentThread().name}")
emit(it)
throw Exception("Error in Emitter")
}
}
}

Output:-

2023-12-09 09:47:25.596 14574-14574 Arun         D  Emitter Thread - main
2023-12-09 09:47:25.596 14574-14574 Arun D Collector Thread - main
2023-12-09 09:47:25.597 14574-14574 Arun D Error in Emitter

Example 2:-

fun main(){
GlobalScope.launch(Dispatchers.Main) {
try {
producer()
.collect {
Log.d("Arun", "Collector Thread - ${Thread.currentThread().name} ")
throw Exception("Error in Collector")
}
} catch (e: Exception) {
Log.d("Arun", e.message.toString())
}
}
}

private fun producer(): Flow<Int> {
return flow<Int> {
val list = listOf(1, 2, 3, 4, 5)
list.forEach {
delay(1000)
Log.d("Arun", "Emitter Thread - ${Thread.currentThread().name}")
emit(it)
}
}
}

Output:-

2023-12-09 09:50:18.908 14668-14668 Arun             D  Emitter Thread - main
2023-12-09 09:50:18.908 14668-14668 Arun D Collector Thread - main
2023-12-09 09:50:18.908 14668-14668 Arun D Error in Collector

If you see the previous 2 examples exception of both emitter and collector can be handled by collector try-catch

But we want to handle the exception separately then we have to use catch operator of flowOn

Example:-

fun main(){
GlobalScope.launch(Dispatchers.Main) {
try {
producer()
.collect {
Log.d("Arun", "Collector Thread - ${Thread.currentThread().name} ")
}
} catch (e: Exception) {
Log.d("Arun", e.message.toString())
}
}
}

private fun producer(): Flow<Int> {
return flow<Int> {
val list = listOf(1, 2, 3, 4, 5)
list.forEach {
delay(1000)
Log.d("Arun", "Emitter Thread - ${Thread.currentThread().name}")
emit(it)
throw Exception("Error in Emitter")
}.catch {
Log.d("Arun", "Emitter Catch - ${it.message} ")
}
}
}

Output:-

2023-12-09 11:40:24.988 15105-15105 Arun        D  Emitter Thread - main
2023-12-09 11:40:24.988 15105-15105 Arun D Collector Thread - main
2023-12-09 11:40:24.988 15105-15105 Arun D Emitter Catch - Error in Emitter

If you see the above program exceptions of producers are caught in the producer section only. Although we have wrapped the collector to try and catch

One other important benefit of the catch keyword is we can emit something in case of an exception as a fallback

Example:-

fun main(){
GlobalScope.launch(Dispatchers.Main) {
try {
producer()
.collect {
Log.d("Arun", "Collector Thread - $it ${Thread.currentThread().name} ")
}
} catch (e: Exception) {
Log.d("Arun", e.message.toString())
}
}
}

private fun producer(): Flow<Int> {
return flow<Int> {
val list = listOf(1, 2, 3, 4, 5)
list.forEach {
delay(1000)
Log.d("Arun", "Emitter Thread - ${Thread.currentThread().name}")
emit(it)
throw Exception("Error in Emitter")
}
}.catch {
Log.d("Arun", "Emitter Catch - ${it.message} ")
emit(-999)
}
}

Output:-

2023-12-09 12:29:59.887 15575-15575 Arun          D  Emitter Thread - main
2023-12-09 12:29:59.888 15575-15575 Arun D Collector Thread - 1 main
2023-12-09 12:29:59.888 15575-15575 Arun D Emitter Catch - Error in Emitter
2023-12-09 12:29:59.888 15575-15575 Arun D Collector Thread - -999 main

Note :- Like flowOn catch also works upstream and we can use multiple catch

Shared Flow

Shared flow is a hot flow. Hot Flow means the producer will not wait for the consumer, The Consumer will receive data that is produced then it joined not from the start like Cold flow.

Just for analogy Hot flow is like Cinema Hall and Cold flow is like OTT.

Normal Flow:-(Recape)

fun main(){
GlobalScope.launch(Dispatchers.Main) {
GlobalScope.launch(Dispatchers.Main) {
val result = producer()
result.collect {
Log.d("Arun", "Collector1 - $it")
}
}

GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(1500)
result.collect {
Log.d("Arun", "Collector2 - $it")
}
}

GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(2500)
result.collect {
Log.d("Arun", "Collector3 - $it")
}
}
}

private fun producer(): Flow<Int> {
return flow<Int> {
val list = listOf(1, 2, 3, 4, 5)
list.forEach {
delay(1000)
emit(it)
}
}
}

Output:-

2023-12-09 12:55:18.339 15849-15849 Arun                   D  Collector1 - 1
2023-12-09 12:55:18.472 15849-15868 EGL_emulation D app_time_stats: avg=4.05ms min=0.89ms max=128.52ms count=63
2023-12-09 12:55:19.340 15849-15849 Arun D Collector1 - 2
2023-12-09 12:55:19.488 15849-15868 EGL_emulation D app_time_stats: avg=2.10ms min=1.01ms max=27.31ms count=61
2023-12-09 12:55:19.839 15849-15849 Arun D Collector2 - 1
2023-12-09 12:55:20.341 15849-15849 Arun D Collector1 - 3
2023-12-09 12:55:20.504 15849-15868 EGL_emulation D app_time_stats: avg=1.68ms min=0.93ms max=2.78ms count=61
2023-12-09 12:55:20.838 15849-15849 Arun D Collector3 - 1
2023-12-09 12:55:20.839 15849-15849 Arun D Collector2 - 2
2023-12-09 12:55:21.342 15849-15849 Arun D Collector1 - 4
2023-12-09 12:55:21.520 15849-15868 EGL_emulation D app_time_stats: avg=1.52ms min=0.94ms max=3.65ms count=61
2023-12-09 12:55:21.840 15849-15849 Arun D Collector3 - 2
2023-12-09 12:55:21.841 15849-15849 Arun D Collector2 - 3
2023-12-09 12:55:22.343 15849-15849 Arun D Collector1 - 5
2023-12-09 12:55:22.355 15849-15884 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-12-09 12:55:22.537 15849-15868 EGL_emulation D app_time_stats: avg=1.30ms min=0.83ms max=2.63ms count=61
2023-12-09 12:55:22.843 15849-15849 Arun D Collector3 - 3
2023-12-09 12:55:22.843 15849-15849 Arun D Collector2 - 4
2023-12-09 12:55:23.537 15849-15868 EGL_emulation D app_time_stats: avg=1.56ms min=0.85ms max=5.34ms count=60
2023-12-09 12:55:23.847 15849-15849 Arun D Collector3 - 4
2023-12-09 12:55:23.847 15849-15849 Arun D Collector2 - 5
2023-12-09 12:55:24.855 15849-15849 Arun D Collector3 - 5

If you see the above example although all the consumers joined at different points of time. They received the same data. Because normal flow is cold flow.

To implement shared flow we have two class

  1. MutableSharedFlow(can be mutated)
  2. SharedFlow (Read Only)

Example:-

fun main(){
GlobalScope.launch(Dispatchers.Main) {
GlobalScope.launch(Dispatchers.Main) {
val result = producer()
result.collect {
Log.d("Arun", "Collector1 - $it")
}
}

GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(1500)
result.collect {
Log.d("Arun", "Collector2 - $it")
}
}

GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(2500)
result.collect {
Log.d("Arun", "Collector3 - $it")
}
}
}
private fun producer(): Flow<Int> {
val mutableSharedFlow = MutableSharedFlow<Int>()
GlobalScope.launch {
val list = listOf<Int>(1, 2, 3, 4, 5)
list.forEach {
mutableSharedFlow.emit(it)
delay(1000)
}
}
return mutableSharedFlow
}

Output:-

2023-12-09 13:09:03.521 15966-15966 Arun              D  Collector1 - 1
2023-12-09 13:09:04.523 15966-15966 Arun D Collector1 - 2
2023-12-09 13:09:04.588 15966-15989 EGL_emulation D app_time_stats: avg=17.66ms min=7.68ms max=88.70ms count=62
2023-12-09 13:09:05.503 15966-15966 Arun D Collector2 - 3
2023-12-09 13:09:05.540 15966-15966 Arun D Collector1 - 3
2023-12-09 13:09:05.604 15966-15989 EGL_emulation D app_time_stats: avg=7.61ms min=0.68ms max=37.58ms count=60
2023-12-09 13:09:06.521 15966-15966 Arun D Collector2 - 4
2023-12-09 13:09:06.522 15966-15966 Arun D Collector3 - 4
2023-12-09 13:09:06.555 15966-15966 Arun D Collector1 - 4
2023-12-09 13:09:06.605 15966-15989 EGL_emulation D app_time_stats: avg=1.54ms min=1.15ms max=3.98ms count=60
2023-12-09 13:09:07.536 15966-15966 Arun D Collector2 - 5
2023-12-09 13:09:07.536 15966-15966 Arun D Collector3 - 5
2023-12-09 13:09:07.573 15966-15966 Arun D Collector1 - 5

If you see the out of the previous program you will find collector 1 gets all the data and collector gets the least data as it joined at the end.

Note:- In the previous example we have returned normal flow instead of shared flow because we don’t want other functions should mutate it.

Replay:-

Replay gives the option to emit previous data as well to the newly joined consumer according to the value.

Example:-

fun main(){
GlobalScope.launch(Dispatchers.Main) {
GlobalScope.launch(Dispatchers.Main) {
val result = producer()
result.collect {
Log.d("Arun", "Collector1 - $it")
}
}

GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(1500)
result.collect {
Log.d("Arun", "Collector2 - $it")
}
}

GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(2500)
result.collect {
Log.d("Arun", "Collector3 - $it")
}
}
}
private fun producer(): Flow<Int> {
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 2)
GlobalScope.launch {
val list = listOf<Int>(1, 2, 3, 4, 5)
list.forEach {
mutableSharedFlow.emit(it)
delay(1000)
}
}
return mutableSharedFlow
}

Output:-

2023-12-09 13:19:18.590 16110-16110 Arun                     D  Collector1 - 1
2023-12-09 13:19:19.588 16110-16110 Arun D Collector1 - 2
2023-12-09 13:19:19.754 16110-16129 EGL_emulation D app_time_stats: avg=4.90ms min=0.96ms max=182.99ms count=59
2023-12-09 13:19:20.123 16110-16110 Arun D Collector2 - 1
2023-12-09 13:19:20.123 16110-16110 Arun D Collector2 - 2
2023-12-09 13:19:20.587 16110-16110 Arun D Collector2 - 3
2023-12-09 13:19:20.587 16110-16110 Arun D Collector1 - 3
2023-12-09 13:19:20.756 16110-16129 EGL_emulation D app_time_stats: avg=1.68ms min=0.92ms max=15.79ms count=50
2023-12-09 13:19:21.121 16110-16110 Arun D Collector3 - 2
2023-12-09 13:19:21.121 16110-16110 Arun D Collector3 - 3
2023-12-09 13:19:21.586 16110-16110 Arun D Collector2 - 4
2023-12-09 13:19:21.586 16110-16110 Arun D Collector1 - 4
2023-12-09 13:19:21.603 16110-16110 Arun D Collector3 - 4
2023-12-09 13:19:21.771 16110-16129 EGL_emulation D app_time_stats: avg=1.25ms min=1.00ms max=1.82ms count=49
2023-12-09 13:19:22.587 16110-16110 Arun D Collector2 - 5
2023-12-09 13:19:22.587 16110-16110 Arun D Collector1 - 5
2023-12-09 13:19:22.603 16110-16110 Arun D Collector3 - 5

If you see the output of the previous example then you will find every consumer is getting the previous two values as well.

StateFlow:-

State flow is also a variant of shared flow. It is also a hot flow. It also can have multiple consumers, But in this case, flow maintains the state i.e. maintains the last value of the flow as a state.

Example Shared Flow:-

fun main(){
GlobalScope.launch(Dispatchers.Main) {
GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(1500)
result.collect {
Log.d("Arun", "Collector1 - $it")
}
}

GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(2500)
result.collect {
Log.d("Arun", "Collector2 - $it")
}
}

GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(3500)
result.collect {
Log.d("Arun", "Collector3 - $it")
}
}
}
private fun producer(): Flow<Int> {
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 2)
GlobalScope.launch {
val list = listOf<Int>(1, 2, 3, 4, 5)
list.forEach {
mutableSharedFlow.emit(it)
Log.d("Arun", "producer: $it")
delay(1000)
}
}
return mutableSharedFlow
}
2023-12-09 13:42:27.000 16391-16419 Arun               D  producer: 1
2023-12-09 13:42:27.001 16391-16421 Arun D producer: 1
2023-12-09 13:42:27.002 16391-16419 Arun D producer: 1
2023-12-09 13:42:28.002 16391-16421 Arun D producer: 2
2023-12-09 13:42:28.002 16391-16420 Arun D producer: 2
2023-12-09 13:42:28.003 16391-16420 Arun D producer: 2
2023-12-09 13:42:28.138 16391-16411 EGL_emulation D app_time_stats: avg=3.77ms min=0.98ms max=127.81ms count=62
2023-12-09 13:42:29.003 16391-16420 Arun D producer: 3
2023-12-09 13:42:29.003 16391-16420 Arun D producer: 3
2023-12-09 13:42:29.020 16391-16391 Arun D Collector1 - 3
2023-12-09 13:42:29.020 16391-16420 Arun D producer: 3
2023-12-09 13:42:29.155 16391-16411 EGL_emulation D app_time_stats: avg=1.64ms min=0.91ms max=16.30ms count=61
2023-12-09 13:42:30.004 16391-16420 Arun D producer: 4
2023-12-09 13:42:30.021 16391-16391 Arun D Collector2 - 4
2023-12-09 13:42:30.021 16391-16420 Arun D producer: 4
2023-12-09 13:42:30.036 16391-16391 Arun D Collector1 - 4
2023-12-09 13:42:30.036 16391-16420 Arun D producer: 4
2023-12-09 13:42:30.170 16391-16411 EGL_emulation D app_time_stats: avg=1.21ms min=0.96ms max=1.81ms count=61
2023-12-09 13:42:31.020 16391-16391 Arun D Collector3 - 5
2023-12-09 13:42:31.020 16391-16420 Arun D producer: 5
2023-12-09 13:42:31.037 16391-16391 Arun D Collector2 - 5
2023-12-09 13:42:31.037 16391-16420 Arun D producer: 5
2023-12-09 13:42:31.054 16391-16391 Arun D Collector1 - 5
2023-12-09 13:42:31.054 16391-16420 Arun D producer: 5

If you see the above output producer has produced 1 and 2 but no consumer received that data because the consumer joined late.

Similarly in State Flow.

Example 2:-

Joined the consumer after emission completed

fun main(){ 
GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(7000)
result.collect {
Log.d("Arun", "Collector1 - $it")
}
}
}
private fun producer(): Flow<Int> {
val mutableSharedFlow = MutableSharedFlow<Int>()
GlobalScope.launch {
val list = listOf<Int>(1, 2, 3, 4, 5)
list.forEach {
mutableSharedFlow.emit(it)
Log.d("Arun", "producer: $it")
delay(1000)
}
}
return mutableSharedFlow
}

Output:-

2023-12-09 13:54:49.995 16501-16529 Arun             D  producer: 1
2023-12-09 13:54:50.998 16501-16529 Arun D producer: 2
2023-12-09 13:54:51.091 16501-16521 EGL_emulation D app_time_stats: avg=5.76ms min=0.92ms max=104.56ms count=45
2023-12-09 13:54:52.002 16501-16529 Arun D producer: 3
2023-12-09 13:54:52.106 16501-16521 EGL_emulation D app_time_stats: avg=1.83ms min=0.96ms max=15.91ms count=42
2023-12-09 13:54:53.004 16501-16529 Arun D producer: 4
2023-12-09 13:54:53.122 16501-16521 EGL_emulation D app_time_stats: avg=1.21ms min=0.94ms max=1.74ms count=41
2023-12-09 13:54:54.005 16501-16529 Arun D producer: 5

Example State Flow:-

func main(){
GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(7000)
result.collect {
Log.d("Arun", "Collector1 - $it")
}
}
}

private fun producer(): Flow<Int> {
val mutableStateFlow = MutableStateFlow(10)
GlobalScope.launch {
val list = listOf<Int>(1, 2, 3, 4, 5)
list.forEach {
mutableStateFlow.emit(it)
Log.d("Arun", "producer: $it")
delay(1000)
}
}
return mutableStateFlow
}

Output:-

2023-12-09 13:57:36.272 16580-16611 Arun                  D  producer: 1
2023-12-09 13:57:37.281 16580-16611 Arun D producer: 2
2023-12-09 13:57:37.388 16580-16603 EGL_emulation D app_time_stats: avg=13.89ms min=0.95ms max=117.52ms count=62
2023-12-09 13:57:38.284 16580-16611 Arun D producer: 3
2023-12-09 13:57:38.404 16580-16603 EGL_emulation D app_time_stats: avg=2.08ms min=1.23ms max=18.07ms count=61
2023-12-09 13:57:39.285 16580-16611 Arun D producer: 4
2023-12-09 13:57:39.405 16580-16603 EGL_emulation D app_time_stats: avg=1.71ms min=1.12ms max=2.29ms count=60
2023-12-09 13:57:40.285 16580-16611 Arun D producer: 5
2023-12-09 13:57:40.421 16580-16603 EGL_emulation D app_time_stats: avg=1.63ms min=0.97ms max=2.60ms count=61
2023-12-09 13:57:41.437 16580-16603 EGL_emulation D app_time_stats: avg=1.46ms min=0.86ms max=2.40ms count=61
2023-12-09 13:57:41.715 16580-16618 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-12-09 13:57:42.437 16580-16603 EGL_emulation D app_time_stats: avg=1.35ms min=0.81ms max=2.53ms count=60
2023-12-09 13:57:43.271 16580-16580 Arun D Collector1 - 5

If you see the above example although the consumer joined after the 7th sec and the producer finished production at the 5th sec only the still consumer managed to receive the last emitted value because state flows maintain the state of the last element produced.

One another important property of state flow is we can ask the value of state flow at any time without collecting it.

Example:-

fun main(){
GlobalScope.launch(Dispatchers.Main) {
val result = producer()
Log.d("Arun", "Collector1 - ${result.value.toString()}")
}
}
private fun producer(): StateFlow<Int> {
val mutableStateFlow = MutableStateFlow(10)
GlobalScope.launch {
val list = listOf<Int>(1, 2, 3, 4, 5)
list.forEach {
mutableStateFlow.emit(it)
Log.d("Arun", "producer: $it")
delay(1000)
}
}
return mutableStateFlow
}

Output:-

2023-12-09 14:07:36.767 16757-16757 Arun              D  Collector1 - 10
2023-12-09 14:07:36.768 16757-16785 Arun D producer: 1
2023-12-09 14:07:37.775 16757-16786 Arun D producer: 2
2023-12-09 14:07:37.870 16757-16777 EGL_emulation D app_time_stats: avg=3.57ms min=0.89ms max=94.48ms count=59
2023-12-09 14:07:38.776 16757-16786 Arun D producer: 3
2023-12-09 14:07:38.872 16757-16777 EGL_emulation D app_time_stats: avg=1.59ms min=0.86ms max=18.42ms count=52
2023-12-09 14:07:39.778 16757-16786 Arun D producer: 4
2023-12-09 14:07:39.887 16757-16777 EGL_emulation D app_time_stats: avg=1.20ms min=0.89ms max=1.65ms count=59
2023-12-09 14:07:40.779 16757-16786 Arun D producer: 5

collected initial value.

Example 2:- if we want to see the value of state flow after 7 sec.

fun main(){
GlobalScope.launch(Dispatchers.Main) {
val result = producer()
delay(7000)
Log.d("Arun", "Collector1 - ${result.value.toString()}")
}
}
private fun producer(): StateFlow<Int> {
val mutableStateFlow = MutableStateFlow(10)
GlobalScope.launch {
val list = listOf<Int>(1, 2, 3, 4, 5)
list.forEach {
mutableStateFlow.emit(it)
Log.d("Arun", "producer: $it")
delay(1000)
}
}
return mutableStateFlow
}

Output:-

2023-12-09 14:12:30.954 16873-16900 Arun                 D  producer: 1
2023-12-09 14:12:31.959 16873-16900 Arun D producer: 2
2023-12-09 14:12:32.022 16873-16892 EGL_emulation D app_time_stats: avg=10.10ms min=0.98ms max=62.29ms count=60
2023-12-09 14:12:32.959 16873-16900 Arun D producer: 3
2023-12-09 14:12:33.024 16873-16892 EGL_emulation D app_time_stats: avg=2.33ms min=0.97ms max=29.15ms count=58
2023-12-09 14:12:33.960 16873-16900 Arun D producer: 4
2023-12-09 14:12:34.037 16873-16892 EGL_emulation D app_time_stats: avg=2.04ms min=1.34ms max=3.74ms count=61
2023-12-09 14:12:34.962 16873-16900 Arun D producer: 5
2023-12-09 14:12:35.056 16873-16892 EGL_emulation D app_time_stats: avg=1.58ms min=0.82ms max=2.82ms count=61
2023-12-09 14:12:36.070 16873-16892 EGL_emulation D app_time_stats: avg=1.41ms min=0.86ms max=2.11ms count=61
2023-12-09 14:12:36.388 16873-16909 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-12-09 14:12:37.087 16873-16892 EGL_emulation D app_time_stats: avg=1.42ms min=0.82ms max=2.67ms count=61
2023-12-09 14:12:37.959 16873-16873 Arun D Collector1 - 5

collected the last value.

What is the difference between live data and state flow?

  1. Transformations on Main Thread.

In the case of live data, all the transformations happened on the main thread means all the operators like map, filter, and Switch map execute on the main thread. If some heavy operation happens in the operator then it will freeze the main thread.

since state flow is a flow and it has a property of flowOn.So, we can execute it on any thread.

2. Operators

Live data has a limited number of operators since state flow is also a flow so it has more number of operators as compared to live data.

3. LifeCycle Dependent

Live data is lifecycle-aware so, it needs a lifecycle to work either activity lifecycle or fragment lifecycle, etc. We can't use it without a cycle for example in the case of a repository we don’t have a lifecycle it is a simple class. in that scenario, we should use flow.

So, In the case of live data, we need lifecycle but in the case of flow, we just need coroutine scope. and coroutine scope is easily manageable and efficient too.

so that was the difference between live data and flow.

--

--

Arun Aditya

Passionate Android Dev Building intuitive apps & sharing knowledge on ✍️Medium. Let's craft the future of mobile, together! Connect: