Kotlin Flow In Android[Part-1]

Harshushri
6 min readFeb 15, 2024

--

In coroutines, a flow is a type that can emit multiple values sequentially, as opposed to suspend functions that return only a single value. For example, you can use a flow to receive live updates from a database.

Coroutines and Suspended Function:

  • Coroutines help to implement asynchronous , non- blocking code.
  • We use coroutine for implementing the Api calling function using Suspend function which will help in thread to make free for other work until IO operation does not give response.
  • Either you fire and forget using launch and wait for data (i.e single object) using async.
    Launch is used where there is no expectation of returning data.
    Async and Await are used for waiting for the response until an IO operation performed inside the Async function coroutine waits for a response.
  • Suspend function only returns a single object. It works fine on following operations:
    — Storing some value in the database.
    — Network calls.
    — Doing tasks that return a single value.
  • But there are some scenarios where you have to stream of data –
    — Video Streaming
    — Fm Radio
    — Mobile sending audio signals to bluetooth speakers.

For these scenarios, we need Streams.

Kotlin has Asynchronous stream support using Channels and Flows :

  • Channels (send and receive)
  • Flows (Emit and Collect)

Concept of Streams:

Basic example of coroutine:

class MainActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super. onCreate(savedInstancestate)
setContentView(R.Layout.activity_main)
CoroutineScope(Dispatchers.Main).launch {
getUserNames().forEach {
Log.d( "KOTLIN_FLOW", it)
}
}
}


private suspend fun getUserNames():List<String>{
val List = mutableListof<String>()
List.add(getUser(1))
List.add(getUser(2))
List.add(getUser(3))
return List
}
private suspend fun getUser(id:Int):String{
delay(1000)
return "User$id"
}
}

Above code has one disadvantage is that it runs only when the whole data is fetched , due to this it takes some time to execute another process.

Note → I want to run the thread whenever the value is fetched. It doesn’t matter if it’s one or more values , run the thread and if possible to work then work for it.
Because of this the performance of code is improved. (This is done by Streams)

Coroutines Flow

Stream Flow

*** We can use Streams where only the data is coming in multiple***

Difference between Channels and Flow

Channels

  1. Channels are hot.
  2. In hot cases the producer produces data , whether it’s consumed or not.
  3. Examples → if we book a movie from 4–7 but someone goes to watch a movie at 4:30 then the 30 min. I am not able to watch it again.

Flows

  1. Flows are mostly cold.
  2. In cold cases the producer doesn’t produce data until the consumer is not active.
  3. Examples: → if someone is watching on netflix then whenever the user clicks on start then the data is streamed until no data is streamed.

Code Example of Channel:

class MainActivity : AppCompatActivity() {
val channel : Channel<Int>()
override fun onCreate(savedInstanceState: Bundle?) {
super. onCreate(savedInstancestate)
setContentView(R.Layout.activity_main)
producer()
consumer()
}
fun producer(){
CoroutineScope(Dispatchers.Main).launch{
channel.send(1)
channel.send(2)
}
}
fun consumer(){
CoroutineScope(Dispatchers.Main).launch{
Log.d( "KOTLIN_FLOW", channel.recieve().toString())
Log.d( "KOTLIN_FLOW", channel.recieve().toString())
}
}

Important Points :

  • What is Streams?
    Stream is actually a continuous flow of data in which the Producer produces the data and the consumer will continuously update when data changes.
  • Cold Stream are preferred over Hot Stream

Because of Resource Wastage and
Example → if my android application is in hot stream and continuously calls the api and gets its data but there is no consumer to consume that makes the app heavy.

Manual close
Example : → For hot streams we require the producer to notify that there is no need of producing data because there is no consumer of that process.

— In case of Cold Stream automatically knows that it’s consumed or not , if not consumed it stops the producer from producing data.

Note → Hot streams are used in Gps locations , Radio stations , and event buses where we require continuous events.

  • Buffering → it is used where the producer produces the data fast and the consumer is not able to hold it and at a certain point of time it creates the BottleNeck problem.and vice versa for the producer, till now we solve that problem by Thread Blocking in java but in Kotlin we use coroutines to suspend the function .

In that case Buffering came into picture to store the data anywhere so that it can be consumed according to the consumer.

To solve above problem Kotlin provides Flows API

Flows

— It requires Producer and Consumer.
There is no BottleNeck on either end of Producer and Consumer.
Produce Asynchronous data
— It use Cold Streams

Implementation of Flows :

class MainActivity : AppCompatActivity() {
val channel : Channel<Int>()
override fun onCreate(savedInstanceState: Bundle?) {
super. onCreate(savedInstancestate)
setContentView(R.Layout.activity_main)
GlobalScope.launch{
val data:flow<Int> = producer()
data.collect{
Log.d( "KOTLIN_FLOW", it.toString())
}
}
}
fun producer()= flow<Int>{
val list = listof(1,2,3,4,5,6,7,8,9,10)
list.foreach{
delay(1000)
emit(it)
}
}

In above code flow creates a coroutine scope and producer function is automatically a Suspend function.

Implementation of Flows with Multiple consumers :

class MainActivity : AppCompatActivity() {
val channel : Channel<Int>()
override fun onCreate(savedInstanceState: Bundle?) {
super. onCreate(savedInstancestate)
setContentView(R.Layout.activity_main)
GlobalScope.launch{
val data:flow<Int> = producer()
data.collect{
Log.d( "KOTLIN_FLOW-1", it.toString())
}
}
GlobalScope.launch{
val data:flow<Int> = producer()
delay(2500)
data.collect{
Log.d( "KOTLIN_FLOW-2", it.toString())
}
}

}
fun producer()= flow<Int>{
val list = listof(1,2,3,4,5,6,7,8,9,10)
list.foreach{
delay(1000)
emit(it)
}
}

In the above code example both consumers work independently and get whole data. There is no effect of second consumer delay , it gets data from starting.

Kotlin Flow Functions

  • Flow Consumer have following functions:
    — onStart{} →
    Starting of emitting a first value from the producer.
    onCompletion{} → After Emitting all values from the producer.
    onEach{} → Before emitting each value it executes.
    Collect{} → It collects all producer emitting values.

Example of Flow Functions

class MainActivity : AppCompatActivity() {
val channel : Channel<Int>()
override fun onCreate(savedInstanceState: Bundle?) {
super. onCreate(savedInstancestate)
setContentView(R.Layout.activity_main)
GlobalScope.launch{
producer()
.onStart{
Log.d( "KOTLIN_FLOW","Starting out")
}
.onCompletion{
Log.d( "KOTLIN_FLOW","Completed")
}
.onEach{
Log.d( "KOTLIN_FLOW","About to emit -$it")
}
.collect{
Log.d( "KOTLIN_FLOW", it.toString())
}
}
}
fun producer()= flow<Int>{
val list = listof(1,2,3,4,5,6,7,8,9,10)
list.foreach{
delay(1000)
emit(it)
}
}

Kotlin Flow Operators

  • Operators are two types
  1. Terminal Operators → The consumption of data is due to these operators and Collect{} function is known as terminal Operator and those are terminal operators all functions start with Suspend Keyword.
  2. Non-Terminal Operators (or intermediate operators)→ There are Some non — terminal functions. Such as :
    1. First() → This function returns the first value of Flow.
    2. toList() → This function returns the flow data in the form of List.
    3. Map{}
    4. Filter{}

Map and filter used for doing some operation filter is just like adding condition and map helps in mapping or casting the value before it collects, if filter has some condition and it returns false that means consumer(i.e collect{} function) not consume that value.

Map and filter return flow after mapping and filtering the data.

GlobalScope.launch(Dispatchers.Main) {
producer ()
.map {
it * 2
}
.filter { //--> these two returns flow
it < 8
}
.collect{ //--> this is terminal operator which collects the above modified flow
Log. d( tag: "KOTLIN_FLOW",it.toString())
}
}

5. Buffer(int:size) → this function helps to store the data when the consumer is slow in processing the producer’s data, its size depends on the producer’s data capacity.

--

--