Fandom Engineering
Published in

Fandom Engineering

Coroutines Flow

Photo by Martin Jernberg on Unsplash

Multiple values problem

fun handleCollection() {
val values = listOf(1, 2, 3)
values.forEach {
// do something
}
}
fun handleSequence() {
val values = sequenceOf(1, 2, 3)
values.forEach {
// do something
}
}
fun handleSuspendFun() = runBlocking {
val values = multipleValues()
// wait here to calculate all of them
values.forEach {
// do something
}
}
suspend fun multipleValues(): List<Int> {
// do some time consuming calculations
return listOf(1, 2, 3) // all of them at once
}

Flow to the rescue

fun useFlow() = runBlocking<Unit> {
val notBlocked = launch {
repeat(3) {
delay(10)
}
}

val flow = flowValues()
// just get the flow, nothing has happened

flow.collect { value -> // now the flow runs
// handle each value
}
// collect is a suspend fun so the coroutine before
// is not blocked and work simultaneously with flow
val blocked = launch {
repeat(3) {
delay(10)
}
}
// but the coroutine after is blocked by collecting the flow
// and waits until collect completes
}
fun flowValues(): Flow<Int> = flow { // not suspend
for (value in 1..3) {
delay(10) // mock some computations
emit(value)
}
}

Builders

fun flowBuilder() = flow {
emit(1)
emit(2)
emit(3)
}
fun flowOfBuilder() = flowOf(1, 2, 3)fun flowFromCollection() = listOf(1, 2, 3).asFlow()

Launching

fun invokeByOnEach() = runBlocking<Unit> {
flowValues()
.onEach { value ->
// do something here
}
.collect()
// still have to wait for collect completion
val blocked = launch {
repeat(3) {
delay(10)
}
}
}
fun launchFlow() = runBlocking<Unit> {
// create a new coroutine by launching it
val job = flowValues()
.onEach {
// do something here
}
.launchIn(this) // launch in CoroutineScope
// don't have to wait for collect completion
val notBlocked = launch {
repeat(3) {
delay(10)
}
}
}

Cancellation

fun cancelFlow() = runBlocking<Unit> {
val job = flowValues()
.onEach {
// do something here
}
.launchIn(this)
delay(10)
job.cancel()
// it's Job so can be cancelled by cancel function
withTimeoutOrNull(10) {
flowValues().collect {
// do something here
}
}
// set timeout like we do for coroutines also
}
fun cancelDifferentBuilders() = runBlocking<Unit> {
flowValues().collect { value ->
if (value == 2) {
cancel()
}
// do something
}
// flow was cancelled because of ensureActive embedded
// in flow builder, collected elements: 1, 2
listOf(1, 2, 3).asFlow().collect { value ->
if (value == 2) {
cancel()
}
// do something
}
// flow wasn't cancelled, collected elements: 1, 2, 3
listOf(1, 2, 3).asFlow()
// a shortcut for
// .onEach { currentCoroutineContext().ensureActive() }
.cancellable()
.collect { value ->
if (value == 2) {
cancel()
}
// do something
}
// flow was cancelled because of cancellable
// collected elements: 1, 2
}
fun completeFlowImperative() = runBlocking<Unit> {
try {
flowValues().collect {
// some work
}
} finally {
// complete the work
}
}
fun completeFlowDeclarative() = runBlocking<Unit> {
flowValues()
.onCompletion {
// complete the work
}
.collect {
// some work
}
}

Context

fun flowWithContext() = flow {
withContext(Dispatchers.IO) {
repeat(3) { value ->
emit(value)
}
}
}
fun withContextThrowsException() = runBlocking<Unit> {
flowWithContext().collect {
// can't collect because exception is thrown
}
}
fun changeContextByFlowOn() = runBlocking<Unit> {
flowValues()
.flowOn(Dispatchers.IO) // emit on IO thread
.collect {
// some work on coroutine's context
}
}

Operators

fun filterAndMapFlow() = runBlocking<Unit> {
flowValues() // 1, 2, 3
.filter {
it % 2 == 1
}
.map {
mapValue(it) // can run suspend fun inside
}
.collect {
// collect following values:
// value = 1
// value = 3
}
}
fun takeAndTransformFlow() = runBlocking<Unit> {
flowValues() // 1, 2, 3
.take(2) // take two values, ignore the rest of them
.transform { value ->
emit("Start emitting $value")
emit(mapValue(value))
}
.collect {
// collect following values:
// Start emitting 1
// value = 1
// Start emitting 2
// value = 2
}
}
suspend fun mapValue(value: Int): String {
delay(10) // mock time consuming work
return "value = $value"
}
fun flowTerminalOperators() = runBlocking<Unit> {
val values = flowValues() // 1, 2, 3
val list = values.toList() // opposite to List.asFlow()
// result [1, 2, 3]
val first = values.first() // get first and cancel flow's collection
// result 1
val reduce = values.reduce { a, b -> a + b }
// result 6
}

Buffering

// the work must be done synchronously
fun synchronousCollecting() = runBlocking {
// flowValues emits each values every 10ms
flowValues() // wait for collect complete to emit next one
.collect { // wait for emission complete to collect next one
delay(25)
// consume emitted value
}

// the whole work takes about 105ms
// 10ms - emit(1), 35ms - collect(1)
// 45ms - emit(2), 70ms - collect(2)
// 80ms - emit(3), 105ms - collect(3)
}
// the work can be done asynchronously
fun bufferOperator() = runBlocking {
flowValues()
.buffer() // emit values ASAP, don't wait for collecting
.collect { // wait for the first emission to start
delay(25)
// consume emitted value
}

// the whole work takes about 85ms
// 10ms - emit(1), 20ms - emit(2), 30ms - emit(3)
// 35ms - collect(1), 60ms - collect(2), 85ms - collect(3)
}
// a flow represents a part or state of the work
// so only the newest one should be invoked
fun conflateOperator() = runBlocking {
flowValues()
.conflate() // conflate emission
.collect { // process only the recent emission
delay(25)
// consume emitted value
}

// the whole work takes about 60ms
// 10ms - emit(1), start collect(1)
// 20ms - emit(2)
// 30ms - emit(3)
// 35ms - complete collect(1), start collect(3)
// 60ms - complete collect(3)
}
// similar to conflate
// but cancel collecting instead of dropping emitted values
fun collectLatestOperator() = runBlocking {
flowValues()
.collectLatest { // cancel current consuming on new emission
delay(25)
// consume emitted value
}

// the whole work takes about 55ms
// 10ms - emit(1), start collect(1)
// 20ms - emit(2), cancel collect(1), start collect(2)
// 30ms - emit(3), cancel collect(2), start collect(3)
// 55ms - complete collect(3)
}

Composing and flattening

// combine each values from different flows one to one
fun zipFlows() = runBlocking {
val dictionary = flowOf("st", "nd", "rd", "th")
flowValues() // 1, 2, 3
.zip(dictionary) { number, word ->
"$number$word"
}
.collect {
// collected values:
// 1st
// 2nd
// 3rd
}
// the fourth value from dictionary was ignored
// because flowValues emitted only 3
}
// combine the latest values from each flow
fun combineFlows() = runBlocking {
val dictionary =
flowOf("player1", "player2", "player3").onEach {
delay(25)
}
flowValues() // delay 10ms before emit
.combine(dictionary) { number, word ->
"$number is $word"
}
.collect {
// collected values:
// 2 is player1 (25ms)
// 3 is player1 (30ms)
// 3 is player2 (50ms)
// 3 is player3 (75ms)
}
}
// concat one to one
// external flow waits for completion of internal flow
fun flatMapConcatFlows() = runBlocking {
flowValues()
.flatMapConcat { value ->
subworkFlow(value)
}
.collect {
// collected values
// First part of 1 task (10ms)
// Second part of 1 task (35ms)
// First part of 2 task (45ms)
// Second part of 2 task (70ms)
// First part of 3 task (80ms)
// Second part of 3 task (105ms)
}
}
// collect and merge flows concurrently ASAP
fun flatMapMergeFlows() = runBlocking {
flowValues()
.flatMapMerge { value ->
// cancel current internal consuming
// on new external emission
subworkFlow(value)
}
.collect {
// collected values
// First part of 1 task (10ms)
// First part of 2 task (20ms)
// First part of 3 task (30ms)
// Second part of 1 task (35ms)
// Second part of 2 task (45ms)
// Second part of 2 task (55ms)
}
}
// merge only the latests values from both flows
fun flatMapLatestFlows() = runBlocking {
flowValues()
.flatMapLatest { value ->
// cancel current internal consuming
// on new external emission
subworkFlow(value)
}
.collect {
// collected values
// First part of 1 task (10ms)
// First part of 2 task (20ms)
// First part of 3 task (30ms)
// Second part of 3 task (25ms)
}
}
fun subworkFlow(value: Int) = flow {
emit("First part of $value task")
delay(25)
emit("Second part of $value task")
}

Exception

fun catchExceptionFromEmitter() = runBlocking<Unit> {
flow {
emit(1)
emit(2)
throw NullPointerException()
emit(3) // won't be emitted
}
.catch { exception ->
// do something like throw, emit or just ignore
}
.collect {
// some work
}
// NPE was caught by catch block
}
fun tryCatchExceptionFromEmitter() = runBlocking<Unit> {
flow {
try {
emit(1)
emit(2)
throw NullPointerException()
emit(3) // won't be emitted
} catch(e: Exception) {
// NPE is caught here but it breaks the transparency
}
}
.catch { exception ->
// it knows nothing about the exception thrown
}
.collect {
// some work
}
// NPE was caught by try - catch block
}
fun notCatchExceptionFromCollector() = runBlocking<Unit> {
flowValues()
.catch { exception ->
// do something like throw, emit or just ignore
}
.collect { value ->
check(value <= 2) { }
// some work
}
// an exception wasn't caught by catch block
// because it happened downstream
}
fun catchExceptionFromCollector() = runBlocking<Unit> {
flowValues()
.onEach { value ->
check(value <= 2) { }
// some work
}
.catch { exception ->
// do something like throw, emit or just ignore
}
.collect()
// an exception was caught by catch block
// because it happened upstream
}

Conclusion

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store