Kotlin flows API (Part 1)
Part 2 : https://medium.com/@aruncse2k20/kotlin-flows-api-part-2-460242d449fb
- Traditional ways
class MainActivity : ComponentActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContent {
Text(text = "Kotlin Flow")
}
CoroutineScope(Dispatchers.Main).launch {
getUserNames().forEach {
Log.d("Arun", it)
}
}
}
private suspend fun getUserNames(): List<String> {
val list = mutableListOf<String>()
list.add(getUser(1))
list.add(getUser(2))
list.add(getUser(3))
list.add(getUser(4))
list.add(getUser(5))
list.add(getUser(6))
list.add(getUser(7))
list.add(getUser(8))
list.add(getUser(9))
list.add(getUser(10))
list.add(getUser(11))
list.add(getUser(12))
list.add(getUser(13))
list.add(getUser(14))
list.add(getUser(15))
list.add(getUser(16))
list.add(getUser(17))
return list
}
private suspend fun getUser(id: Int): String {
delay(1000)
return "User$id"
}
}
In the above example output will come after 17 sec. and in the output whole list will be printed.
2. Channel
It is use to implement hot stream
class MainActivity : ComponentActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContent {
Text(text = "Kotlin Flow")
}
producer()
consumer1()
consumer2()
}
fun producer() {
CoroutineScope(Dispatchers.Main).launch {
channel.send(1)
channel.send(2)
channel.send(3)
channel.send(4)
channel.send(5)
channel.send(6)
channel.send(7)
channel.send(8)
channel.send(9)
channel.send(10)
channel.send(11)
channel.send(12)
channel.send(13)
channel.send(14)
channel.send(15)
channel.send(16)
channel.send(17)
}
}
fun consumer1() {
CoroutineScope(Dispatchers.Main).launch {
Log.d("Arun 1",channel.receive().toString())//1
Log.d("Arun 1",channel.receive().toString())//2
}
}
fun consumer2() {
CoroutineScope(Dispatchers.Main).launch {
Log.d("Arun 2",channel.receive().toString())//1
Log.d("Arun 2",channel.receive().toString())//2
Log.d("Arun 2",channel.receive().toString())//3
Log.d("Arun 2",channel.receive().toString())//4
Log.d("Arun 2",channel.receive().toString())//5
Log.d("Arun 2",channel.receive().toString())//6
Log.d("Arun 2",channel.receive().toString())//7
Log.d("Arun 2",channel.receive().toString())//8
Log.d("Arun 2",channel.receive().toString())//9
Log.d("Arun 2",channel.receive().toString())//10
Log.d("Arun 2",channel.receive().toString())//11
}
}
}
output:-
8973-8973 Arun 1 com.arun.app D 1
8973-8973 Arun 1 com.arun.app D 2
8973-8973 Arun 2 com.arun.app D 3
8973-8973 Arun 2 com.arun.app D 4
8973-8973 Arun 2 com.arun.app D 5
8973-8973 Arun 2 com.arun.app D 6
8973-8973 Arun 2 com.arun.app D 7
8973-8973 Arun 2 com.arun.app D 8
8973-8973 Arun 2 com.arun.app D 9
8973-8973 Arun 2 com.arun.app D 10
8973-8973 Arun 2 com.arun.app D 11
8973-8973 Arun 2 com.arun.app D 12
8973-8973 Arun 2 com.arun.app D 13
Note:- if there are multiple consumer then each consumer will consume different data.
Flow
Kotlin Flow Api solve below problems
3. Flows
class MainActivity : ComponentActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContent {
Text(text = "Kotlin Flow")
}
GlobalScope.launch{
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun", it.toString())
}
}
}
fun producer() = flow<Int> {
val list = listOf(1,2,3,4,5,6,7,8,9,10)
list.forEach {
delay(1000)
emit(it)
}
}
}
Out-put:- output will print every sececond.
2023-11-26 15:25:20.645 9353-9375 Arun D 1
2023-11-26 15:25:21.650 9353-9375 Arun D 2
2023-11-26 15:25:22.652 9353-9375 Arun D 3
2023-11-26 15:25:23.655 9353-9375 Arun D 4
2023-11-26 15:25:24.658 9353-9375 Arun D 5
2023-11-26 15:25:25.644 9353-9387 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-11-26 15:25:25.660 9353-9375 Arun D 6
2023-11-26 15:25:26.664 9353-9375 Arun D 7
2023-11-26 15:25:27.671 9353-9375 Arun D 8
2023-11-26 15:25:28.676 9353-9375 Arun D 9
2023-11-26 15:25:29.679 9353-9375 Arun D 10
Example 2
fun main(){
val userList = getUserList()
//if we comment below code then getShippingDetails will not emmit data.
// means flow will be autometically cancel if there will be no consumer.
val shippingDetails: Flow<ShippingDetail> = getShippingDetails(userList)
}
fun getShippingDetails(userList: List<User>):Flow<ShippingDetail>{
return flow {
userList.forEach {
val user = getUser(it.id)
val shippingAddress = getShippingAddress(user.address)
val shippingDetails = calculateShippingCharges(shippingAddress)
}
}
}
4. Cancel consumer
If we cancel consumer then in case of flow producer will stop producing data.
fun main(){
val job = GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d(TAG, it.toString())
}
}
GlobalScope.launch {
delay(3500)
job.cancel()
}
}
fun producer() = flow<Int>{
val list = listOf(1,2,3,4,5,6,7,8,9,10)
list.forEach {
delay(1000)
emit(it)
}
}
Note:- In the above example coroutine associated with the consumer is getting canceled after 3.5 sec So,producer will also stop producing the data.
2023-11-26 16:09:24.760 9497-9524 Arun D 1
2023-11-26 16:09:25.765 9497-9524 Arun D 2
2023-11-26 16:09:26.770 9497-9524 Arun D 3
4. Multiple consumer
fun main(){
GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun - 1 ", it.toString())
}
}
GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun - 2", it.toString())
}
}
}
fun producer() = flow<Int>{
val list = listOf(1,2,3,4,5)
list.forEach {
delay(1000)
emit(it)
}
}
output:-
2023-11-26 16:23:56.771 9788-9815 Arun - 2 D 1
2023-11-26 16:23:56.771 9788-9814 Arun - 1 D 1
2023-11-26 16:23:57.772 9788-9814 Arun - 2 D 2
2023-11-26 16:23:57.772 9788-9815 Arun - 1 D 2
2023-11-26 16:23:58.779 9788-9815 Arun - 2 D 3
2023-11-26 16:23:58.779 9788-9814 Arun - 1 D 3
2023-11-26 16:23:59.786 9788-9815 Arun - 2 D 4
2023-11-26 16:23:59.787 9788-9814 Arun - 1 D 4
2023-11-26 16:24:00.794 9788-9815 Arun - 2 D 5
2023-11-26 16:24:00.794 9788-9814 Arun - 1 D 5
Example-2 (Delay 2nd consumer by 2500 mili sec.)
fun main(){
GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun - 1 ", it.toString())
}
}
GlobalScope.launch {
val data: Flow<Int> = producer()
data.collect{
Log.d("Arun - 2", it.toString())
}
}
}
fun producer() = flow<Int>{
val list = listOf(1,2,3,4,5)
list.forEach {
delay(1000)
emit(it)
}
}
output:-
2023-11-26 16:29:19.798 9926-9952 Arun - 1 D 1
2023-11-26 16:29:20.804 9926-9952 Arun - 1 D 2
2023-11-26 16:29:21.808 9926-9952 Arun - 1 D 3
2023-11-26 16:29:22.308 9926-9952 Arun - 2 D 1
2023-11-26 16:29:22.810 9926-9952 Arun - 1 D 4
2023-11-26 16:29:23.311 9926-9952 Arun - 2 D 2
2023-11-26 16:29:23.816 9926-9952 Arun - 1 D 5
2023-11-26 16:29:23.979 9926-9964 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-11-26 16:29:24.316 9926-9952 Arun - 2 D 3
2023-11-26 16:29:25.319 9926-9952 Arun - 2 D 4
2023-11-26 16:29:26.323 9926-9952 Arun - 2 D 5
5. Flow Events:- onStart,onEach,onCompletion
fun main(){
GlobalScope.launch(Dispatchers.Main) {
producer()
.onStart {
emit(-1)
Log.d(TAG, "Starting out ")
}
.onCompletion {
emit(6)
Log.d(TAG, "Completed")
}
.onEach {
Log.d(TAG, "About to emit - $it")
}
.collect {
Log.d(TAG, "$it")
}
}
}
fun producer() = flow<Int> {
val list = listOf(1, 2, 3)
list.forEach {
delay(1000)
emit(it)
}
}
2023-11-26 16:59:27.135 Arun D About to emit - -1
2023-11-26 16:59:27.135 Arun D -1
2023-11-26 16:59:27.135 Arun D Starting out
2023-11-26 16:59:28.138 Arun D About to emit - 1
2023-11-26 16:59:28.139 Arun D 1
2023-11-26 16:59:29.146 Arun D About to emit - 2
2023-11-26 16:59:29.146 Arun D 2
2023-11-26 16:59:30.148 Arun D About to emit - 3
2023-11-26 16:59:30.148 Arun D 3
2023-11-26 16:59:30.148 Arun D About to emit - 6
2023-11-26 16:59:30.148 Arun D 6
2023-11-26 16:59:30.148 Arun D Completed
6. Flows Operators:-
- Terminal :- toList,first,collect to start a flow we need terminal operator. without terminal operator flow will not start.
- No Terminal :- filter,map etc.
fun main(){
GlobalScope.launch(Dispatchers.Main) {
val firstElement = producer().first()
Log.d(TAG, "first: $firstElement")
producer().filter { it % 2 == 0 }.collect {
Log.d(TAG, "filter: $it")
}
val listOfElements = producer().toList()
Log.d(TAG, "list: $listOfElements")
producer()
.map {
it * 2
}
.filter {
it < 8
}
.collect {
Log.d(TAG, "Chain:- $it")
}
}
}
fun producer() = flow<Int> {
val list = listOf(1, 2, 3, 4, 5, 6)
list.forEach {
delay(1000)
emit(it)
}
}
Example:- 2
fun main(){
GlobalScope.launch(Dispatchers.Main){
getNotes()
.map {
FormattedNote(it.isActive,it.title.uppercase(),it.description)
}
.filter {
it.isActive
}
.collect{
Log.d(TAG, it.toString())
}
}
}
private fun getNotes(): Flow<Note> {
val list = listOf(
Note(1,true,"First","First Description"),
Note(2,true,"Second","Second Description"),
Note(3,false,"Third","Third Description")
)
return list.asFlow()
}
data class Note(val id: Int, val isActive:Boolean,val title:String,val description:String)
data class FormattedNote(val isActive: Boolean,val title: String,val description: String)
2023-11-26 17:34:13.276 10632-10632 Arun D FormattedNote(isActive=true, title=FIRST, description=First Description)
2023-11-26 17:34:13.276 10632-10632 Arun D FormattedNote(isActive=true, title=SECOND, description=Second Description)
7. Buffer :- It is used when producer is fast and consumer is slow to store the produced item into the buffer.
- With out buffer:-
fun main(){
GlobalScope.launch(Dispatchers.Main) {
val time = measureTimeMillis {
producer()
.collect{
delay(1500)
Log.d(TAG, it.toString())
}
}
Log.d(TAG, time.toString())
}
}
private fun producer(): Flow<Int> {
return flow<Int>{
val list = listOf(1,2,3,4,5)
list.forEach {
delay(1000)
emit(it)
}
}
}
output:- without buffer it took ~12.5sec (2.5*5) to compelte the execution
2023-11-26 17:44:33.652 Arun D 1
2023-11-26 17:44:36.158 Arun D 2
2023-11-26 17:44:36.864 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-11-26 17:44:38.668 Arun D 3
2023-11-26 17:44:41.179 Arun D 4
2023-11-26 17:44:43.696 Arun D 5
2023-11-26 17:44:43.696 Arun D 12549
2. With buffer:-
fun main(){
GlobalScope.launch(Dispatchers.Main) {
val time = measureTimeMillis {
producer()
.buffer(3)// store 3 item in buffer.
.collect{
delay(1500)
Log.d(TAG, it.toString())
}
}
Log.d(TAG, time.toString())
}
}
private fun producer(): Flow<Int> {
return flow<Int>{
val list = listOf(1,2,3,4,5)
list.forEach {
delay(1000)
emit(it)
}
}
}
output:- with buffer it took ~8.5sec (1+1.5 *5) to compelete the execution
2023-11-26 17:53:06.920 10841-10841 Arun D 1
2023-11-26 17:53:08.423 10841-10841 Arun D 2
2023-11-26 17:53:09.789 10841-10874 ProfileInstaller D Installing profile for com.mercedesbenz.calendar
2023-11-26 17:53:09.925 10841-10841 Arun D 3
2023-11-26 17:53:11.429 10841-10841 Arun D 4
2023-11-26 17:53:12.931 10841-10841 Arun D 5
2023-11-26 17:53:12.932 10841-10841 Arun D 8544
Example :- if in the above example if total item will be 8 then total time to execute the task will be
Ans:- ~13 (1 + 1.5*8)
Part 2 : https://medium.com/@aruncse2k20/kotlin-flows-api-part-2-460242d449fb