Kotlin flows API (Part 1)

Arun Aditya
7 min readNov 27, 2023

--

Part 2 : https://medium.com/@aruncse2k20/kotlin-flows-api-part-2-460242d449fb

  1. Traditional ways
class MainActivity : ComponentActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContent {
Text(text = "Kotlin Flow")
}
CoroutineScope(Dispatchers.Main).launch {
getUserNames().forEach {
Log.d("Arun", it)
}
}
}

private suspend fun getUserNames(): List<String> {
val list = mutableListOf<String>()
list.add(getUser(1))
list.add(getUser(2))
list.add(getUser(3))
list.add(getUser(4))
list.add(getUser(5))
list.add(getUser(6))
list.add(getUser(7))
list.add(getUser(8))
list.add(getUser(9))
list.add(getUser(10))
list.add(getUser(11))
list.add(getUser(12))
list.add(getUser(13))
list.add(getUser(14))
list.add(getUser(15))
list.add(getUser(16))
list.add(getUser(17))

return list
}

private suspend fun getUser(id: Int): String {
delay(1000)
return "User$id"
}

}

In the above example output will come after 17 sec. and in the output whole list will be printed.

2. Channel

It is use to implement hot stream

class MainActivity : ComponentActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContent {
Text(text = "Kotlin Flow")
}

producer()
consumer1()
consumer2()
}

fun producer() {
CoroutineScope(Dispatchers.Main).launch {
channel.send(1)
channel.send(2)
channel.send(3)
channel.send(4)
channel.send(5)
channel.send(6)
channel.send(7)
channel.send(8)
channel.send(9)
channel.send(10)
channel.send(11)
channel.send(12)
channel.send(13)
channel.send(14)
channel.send(15)
channel.send(16)
channel.send(17)
}
}
fun consumer1() {
CoroutineScope(Dispatchers.Main).launch {
Log.d("Arun 1",channel.receive().toString())//1
Log.d("Arun 1",channel.receive().toString())//2
}
}

fun consumer2() {
CoroutineScope(Dispatchers.Main).launch {
Log.d("Arun 2",channel.receive().toString())//1
Log.d("Arun 2",channel.receive().toString())//2
Log.d("Arun 2",channel.receive().toString())//3
Log.d("Arun 2",channel.receive().toString())//4
Log.d("Arun 2",channel.receive().toString())//5
Log.d("Arun 2",channel.receive().toString())//6
Log.d("Arun 2",channel.receive().toString())//7
Log.d("Arun 2",channel.receive().toString())//8
Log.d("Arun 2",channel.receive().toString())//9
Log.d("Arun 2",channel.receive().toString())//10
Log.d("Arun 2",channel.receive().toString())//11
}
}
}

output:-

  8973-8973  Arun 1                  com.arun.app            D  1
8973-8973 Arun 1 com.arun.app D 2
8973-8973 Arun 2 com.arun.app D 3
8973-8973 Arun 2 com.arun.app D 4
8973-8973 Arun 2 com.arun.app D 5
8973-8973 Arun 2 com.arun.app D 6
8973-8973 Arun 2 com.arun.app D 7
8973-8973 Arun 2 com.arun.app D 8
8973-8973 Arun 2 com.arun.app D 9
8973-8973 Arun 2 com.arun.app D 10
8973-8973 Arun 2 com.arun.app D 11
8973-8973 Arun 2 com.arun.app D 12
8973-8973 Arun 2 com.arun.app D 13

Note:- if there are multiple consumer then each consumer will consume different data.

Flow

Kotlin Flow Api solve below problems

3. Flows

class MainActivity : ComponentActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContent {
Text(text = "Kotlin Flow")
}
GlobalScope.launch{
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun", it.toString())
}
}
}
fun producer() = flow<Int> {
val list = listOf(1,2,3,4,5,6,7,8,9,10)
list.forEach {
delay(1000)
emit(it)
}
}
}

Out-put:- output will print every sececond.

2023-11-26 15:25:20.645  9353-9375  Arun                               D  1
2023-11-26 15:25:21.650 9353-9375 Arun D 2
2023-11-26 15:25:22.652 9353-9375 Arun D 3
2023-11-26 15:25:23.655 9353-9375 Arun D 4
2023-11-26 15:25:24.658 9353-9375 Arun D 5
2023-11-26 15:25:25.644 9353-9387 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-11-26 15:25:25.660 9353-9375 Arun D 6
2023-11-26 15:25:26.664 9353-9375 Arun D 7
2023-11-26 15:25:27.671 9353-9375 Arun D 8
2023-11-26 15:25:28.676 9353-9375 Arun D 9
2023-11-26 15:25:29.679 9353-9375 Arun D 10

Example 2

fun main(){
val userList = getUserList()
//if we comment below code then getShippingDetails will not emmit data.
// means flow will be autometically cancel if there will be no consumer.
val shippingDetails: Flow<ShippingDetail> = getShippingDetails(userList)
}
fun getShippingDetails(userList: List<User>):Flow<ShippingDetail>{
return flow {
userList.forEach {
val user = getUser(it.id)
val shippingAddress = getShippingAddress(user.address)
val shippingDetails = calculateShippingCharges(shippingAddress)
}
}
}

4. Cancel consumer

If we cancel consumer then in case of flow producer will stop producing data.

fun main(){
val job = GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d(TAG, it.toString())
}
}

GlobalScope.launch {
delay(3500)
job.cancel()
}
}

fun producer() = flow<Int>{
val list = listOf(1,2,3,4,5,6,7,8,9,10)
list.forEach {
delay(1000)
emit(it)
}
}

Note:- In the above example coroutine associated with the consumer is getting canceled after 3.5 sec So,producer will also stop producing the data.

2023-11-26 16:09:24.760  9497-9524  Arun                               D  1
2023-11-26 16:09:25.765 9497-9524 Arun D 2
2023-11-26 16:09:26.770 9497-9524 Arun D 3

4. Multiple consumer

fun main(){

GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun - 1 ", it.toString())
}
}

GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun - 2", it.toString())
}
}
}

fun producer() = flow<Int>{
val list = listOf(1,2,3,4,5)
list.forEach {
delay(1000)
emit(it)
}
}

output:-

2023-11-26 16:23:56.771  9788-9815  Arun - 2                           D  1
2023-11-26 16:23:56.771 9788-9814 Arun - 1 D 1
2023-11-26 16:23:57.772 9788-9814 Arun - 2 D 2
2023-11-26 16:23:57.772 9788-9815 Arun - 1 D 2
2023-11-26 16:23:58.779 9788-9815 Arun - 2 D 3
2023-11-26 16:23:58.779 9788-9814 Arun - 1 D 3
2023-11-26 16:23:59.786 9788-9815 Arun - 2 D 4
2023-11-26 16:23:59.787 9788-9814 Arun - 1 D 4
2023-11-26 16:24:00.794 9788-9815 Arun - 2 D 5
2023-11-26 16:24:00.794 9788-9814 Arun - 1 D 5

Example-2 (Delay 2nd consumer by 2500 mili sec.)

fun main(){
GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun - 1 ", it.toString())
}
}

GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun - 2", it.toString())
}
}
}

fun producer() = flow<Int>{
val list = listOf(1,2,3,4,5)
list.forEach {
delay(1000)
emit(it)
}
}

output:-

2023-11-26 16:29:19.798  9926-9952  Arun - 1                           D  1
2023-11-26 16:29:20.804 9926-9952 Arun - 1 D 2
2023-11-26 16:29:21.808 9926-9952 Arun - 1 D 3
2023-11-26 16:29:22.308 9926-9952 Arun - 2 D 1
2023-11-26 16:29:22.810 9926-9952 Arun - 1 D 4
2023-11-26 16:29:23.311 9926-9952 Arun - 2 D 2
2023-11-26 16:29:23.816 9926-9952 Arun - 1 D 5
2023-11-26 16:29:23.979 9926-9964 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-11-26 16:29:24.316 9926-9952 Arun - 2 D 3
2023-11-26 16:29:25.319 9926-9952 Arun - 2 D 4
2023-11-26 16:29:26.323 9926-9952 Arun - 2 D 5

5. Flow Events:- onStart,onEach,onCompletion

fun main(){
GlobalScope.launch(Dispatchers.Main) {
producer()
.onStart {
emit(-1)
Log.d(TAG, "Starting out ")
}
.onCompletion {
emit(6)
Log.d(TAG, "Completed")
}
.onEach {
Log.d(TAG, "About to emit - $it")
}
.collect {
Log.d(TAG, "$it")
}
}
}
fun producer() = flow<Int> {
val list = listOf(1, 2, 3)
list.forEach {
delay(1000)
emit(it)
}
}
2023-11-26 16:59:27.135 Arun                               D  About to emit - -1
2023-11-26 16:59:27.135 Arun D -1
2023-11-26 16:59:27.135 Arun D Starting out
2023-11-26 16:59:28.138 Arun D About to emit - 1
2023-11-26 16:59:28.139 Arun D 1
2023-11-26 16:59:29.146 Arun D About to emit - 2
2023-11-26 16:59:29.146 Arun D 2
2023-11-26 16:59:30.148 Arun D About to emit - 3
2023-11-26 16:59:30.148 Arun D 3
2023-11-26 16:59:30.148 Arun D About to emit - 6
2023-11-26 16:59:30.148 Arun D 6
2023-11-26 16:59:30.148 Arun D Completed

6. Flows Operators:-

  1. Terminal :- toList,first,collect to start a flow we need terminal operator. without terminal operator flow will not start.
  2. No Terminal :- filter,map etc.
fun main(){
GlobalScope.launch(Dispatchers.Main) {

val firstElement = producer().first()
Log.d(TAG, "first: $firstElement")

producer().filter { it % 2 == 0 }.collect {
Log.d(TAG, "filter: $it")
}

val listOfElements = producer().toList()
Log.d(TAG, "list: $listOfElements")

producer()
.map {
it * 2
}
.filter {
it < 8
}
.collect {
Log.d(TAG, "Chain:- $it")
}
}
}

fun producer() = flow<Int> {
val list = listOf(1, 2, 3, 4, 5, 6)
list.forEach {
delay(1000)
emit(it)
}
}

Example:- 2

fun main(){
GlobalScope.launch(Dispatchers.Main){
getNotes()
.map {
FormattedNote(it.isActive,it.title.uppercase(),it.description)
}
.filter {
it.isActive
}
.collect{
Log.d(TAG, it.toString())
}
}
}

private fun getNotes(): Flow<Note> {
val list = listOf(
Note(1,true,"First","First Description"),
Note(2,true,"Second","Second Description"),
Note(3,false,"Third","Third Description")
)
return list.asFlow()
}

data class Note(val id: Int, val isActive:Boolean,val title:String,val description:String)
data class FormattedNote(val isActive: Boolean,val title: String,val description: String)
2023-11-26 17:34:13.276 10632-10632 Arun      D  FormattedNote(isActive=true, title=FIRST, description=First Description)
2023-11-26 17:34:13.276 10632-10632 Arun D FormattedNote(isActive=true, title=SECOND, description=Second Description)

7. Buffer :- It is used when producer is fast and consumer is slow to store the produced item into the buffer.

  1. With out buffer:-
fun main(){
GlobalScope.launch(Dispatchers.Main) {
val time = measureTimeMillis {
producer()
.collect{
delay(1500)
Log.d(TAG, it.toString())
}
}
Log.d(TAG, time.toString())
}
}

private fun producer(): Flow<Int> {
return flow<Int>{
val list = listOf(1,2,3,4,5)
list.forEach {
delay(1000)
emit(it)
}
}
}

output:- without buffer it took ~12.5sec (2.5*5) to compelte the execution

2023-11-26 17:44:33.652 Arun                               D  1
2023-11-26 17:44:36.158 Arun D 2
2023-11-26 17:44:36.864 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-11-26 17:44:38.668 Arun D 3
2023-11-26 17:44:41.179 Arun D 4
2023-11-26 17:44:43.696 Arun D 5
2023-11-26 17:44:43.696 Arun D 12549

2. With buffer:-

fun main(){
GlobalScope.launch(Dispatchers.Main) {
val time = measureTimeMillis {
producer()
.buffer(3)// store 3 item in buffer.
.collect{
delay(1500)
Log.d(TAG, it.toString())
}
}
Log.d(TAG, time.toString())
}
}

private fun producer(): Flow<Int> {
return flow<Int>{
val list = listOf(1,2,3,4,5)
list.forEach {
delay(1000)
emit(it)
}
}
}

output:- with buffer it took ~8.5sec (1+1.5 *5) to compelete the execution

2023-11-26 17:53:06.920 10841-10841 Arun                               D  1
2023-11-26 17:53:08.423 10841-10841 Arun D 2
2023-11-26 17:53:09.789 10841-10874 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-11-26 17:53:09.925 10841-10841 Arun D 3
2023-11-26 17:53:11.429 10841-10841 Arun D 4
2023-11-26 17:53:12.931 10841-10841 Arun D 5
2023-11-26 17:53:12.932 10841-10841 Arun D 8544

Example :- if in the above example if total item will be 8 then total time to execute the task will be

Ans:- ~13 (1 + 1.5*8)

Part 2 : https://medium.com/@aruncse2k20/kotlin-flows-api-part-2-460242d449fb

--

--

Arun Aditya

Passionate Android Dev Building intuitive apps & sharing knowledge on ✍️Medium. Let's craft the future of mobile, together! Connect: