Coroutine(Flow) 에 대해 알아보자!

차경민
쓱싹팀 이야기
20 min readJul 31, 2022

이 포스터는 코루틴 공식 가이드 자세히 읽기 — Part 9 를 참고하여 만들었습니다.

Photo by Lukas Blazek on Unsplash

Coroutine의 가장 중요한 부분 이 뭐냐고 물어본다면 필자는 Flow라고 생각합니다.
Flow는 연산의 반환값이 여러개 일 때 사용할 수 있습니다.

다수의 여러값 반환하기

Kotlin에서 여러 값을 반환할려면 어떻게 해야할까요?
간단하게 list로 value를 담아 반환할 수 있습니다.

결과값

1
2
3

시퀀스

시퀀스는 연산이 오래걸리는 작업을 처리할 때 유용하게 사용할 수 있습니다.
시퀀스는 최종 메소드가 실행될 때 전체가 실행되어 지기 때문입니다.

결과값

1
2
3

중단 함수

위의 예제와 같은 시간이 오래걸릴 수 있을 경우 메인스레드가 정지될 수 있습니다.
이때 suspend 즉 중단함수를 사용해 정지없이 실행할 수 있고 리스트로 반환할 수 있습니다.

결과값

1
2
3

Flow

List로 반환된다는 것은 모든 연산을 수행한 후 한번에 모든값을 반환한다는 의미입니다.
비동기로 처리될 값들의 스트림으로 나타내기 위해서 위의 예제에서의 시퀀스를 Flow<Int> 로 사용할 수 있습니다.

결과값

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

각각의 수를 출력하기전에 delay로 100ms 를 기다렸다가 출력합니다.
메인스레드의 코루틴의 출력값 I’m not blocked 으로 확인이 가능합니다.

Flow의 특징으로는

  • flow 빌더를 사용합니다.
  • flow로 감싼 코드는 언제든 중단이 가능합니다.
  • flow를 사용하는 곳에서 suspend를 안붙여도 됩니다.
  • 결과값은 flow의 emit 함수로 방출됩니다.
  • 방출된 값들은 flow의 collect 함수로 수집이 됩니다.
foo 함수의 flow 블록에서 delay 대신 Thread.sleep으로 변경하면 메인 스레드가 정지됩니다.

Flow is Cold

플로우는 시퀀스와 유사하게 콜드 스트림입니다.
flow가 collect 될때까지 실행되지 않습니다.

결과값

Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

위의 코드를 보시면 foo 함수가매번 collect 될 때마다 시작됩니다.
그래서 다시 호출할 때마다 Flow started 가 출력됩니다.

Flow의 취소

플로우는 코루틴의 취소 매커니즘을 따르지만 취소 지점을 제공하는 경우가 없습니다.
코루틴과 마찬가지로 중단함수 (ex: delay) 로 취소가 가능하며, 다른 경우는 못합니다.
withTimeoutOrNull 함수로 감싸는 경우 어떻게 취소가 되는지 알아봅시다.

결과값

Emitting 1
1
Emitting 2
2
Done

250ms가 지난경우 collect를 안하기 때문에 Emitting 3 출력 안된것을 확인할 수 있습니다.

코루틴 빌더들

kotlin 에서는 코루틴 빌더를 만들 수 있는 방법이 여러 존재합니다.

  • flowOf() 빌더는 고정된 값들을 방출하는 플로우를 정의합니다.
  • 다양한 컬렉션과 시퀀스들은 asFlow() 확장 함수로 Flow로 변환이 가능합니다.
1
2
3
1

플로우의 중간 연산자

플로우는 시퀀스나 컬렉션처럼 연산자로 변환해서 사용이 가능합니다.
중간 연산자는 업스트림 플로우에 적용되어 다운스트림 플로우로 반환합니다.
이 말은 즉슨 플로우가 중간 연산자에 의해 변환이 된다면 기존의 플로우는 업스트림 변환된 플로우는 다운스트림 입니다.
이런 연산자들도 플로우와 마찬가지로 Cold 스트림 이며 호출한 즉시 반환합니다.

중간 연산자는 컬렉션의 함수 filter와 map 같은 연산자로 사용할 수 있습니다.
차이점도 있는데요, 바로 중간에 사용할 수 있다는 점입니다.

위의 예제는 map 연산자를 사용해 전달했습니다.

결과값

response 1
response 2
response 3

변환 연산자

플로우에서 변환연산자 중 일반적인 것은 transform 연산자 입니다.
다른 연사자와 똑같이 map이나 filter 같은 단순한 변환이나 복잡한 변환할 때 사용됩니다.
transform 연산자는 값들을 커스텀하여 방출할 수 있습니다.

결과값

Making request 1
response 1
Making request 2
response 2
Making request 3

크기 제한 연산자

플로우에서 take 연산자로 크기를 제한해 취소할 수 있습니다.
이런 경우 예외를 발생시키며 try ~ finally와 같은 예외처리 문으로 정상적으로 작동할 수 있습니다.

take(2)의 연산자로 인해 2를 방출하고 finally에 있는 출력문이 출력된것을 확인할 수 있습니다.

결과값

1
2
Finally in numbers

종단 연산자

종단 연산자는 플로우를 수집하는 연산자 입니다.
대표적으로 collect가 있으며 여러 연산자들이 있습니다.

  • toList 나 toSet 같은 컬렉션으로 변환
  • 첫번째 값만 방출하여 플로우는 단일 값 만 방출함을 보장
  • reduce 나 fold로 변환

결과값

55

플로우는 순차적이다.

플로우는 특별한 연산을 수행하지 않은 이상 순차적으로 수행됩니다.
수집은 종단 연산자를 호출한 코루틴에서 실행되며 새로운 코루틴을 만들지 않습니다.
각각의 방출된 값들은 업스트림에서 중간 연산자로 다운스트림으로 변환되며 종단 연산자로 방출됩니다.

짝수 를 찾아 String으로 변환하는 코드

결과값

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

플로우 컨텍스트

플로우의 수집은 방출한 코루틴 컨텍스트에서 수행됩니다.

작성자가 명시한 컨텍스트안에서 수행됩니다.

종단 함수가 메인 스레드가 호출되었기 때문에 플로우도 메인 스레드에서 호출되었습니다.
이 방법이 빠른 실행을 보장하고, 호출자를 블록하지 않고 실행 컨텍스트에 관계없이 비동기를 수행합니다.

withContext 를 통한 잘못된 방출

CPU 작업등 오래 걸리는 작업은 Dispatchers.Default와 같이 별도의스레드에서 수행될 필요가 있고 UI 관련된 작업은 Dispatchers.Main의 전용 스레드에 수행될 필요가 있습니다.
flow{…}는 컨텍스트의 보존성을 지켜야 하기 때문에 다른 컨텍스트에 값을 방출하는 것이 허용되지 않습니다.

위의 예제를 실행시키면 예외가 발생합니다.

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@7e14e38d, BlockingEventLoop@23534177],
but emission happened in [DispatchedCoroutine{Active}@429f06a5, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:88)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:74)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)
at FlowKt$foo$1$1.invokeSuspend(flow.kt:173)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)

flowOn 연산자

위의 예외를 살펴보면 flowOn연산자를 사용하라고 나와있습니다.
이 방법이 플로우에서 컨텍스트를 변경하는 방법입니다.

결과값

[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1 
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

flow {…} 가 백그라운드에서 동작하는 것을 확인해 봅시다.

flowOn 연산자가 플로우의 특성중 하나인 순차성을 일부 포기했다는 것입니다.
수집은 coroutine#1에서 발생하고 방출은 coroutine#2에서 발생합니다.
flowOn 연산자는 컨텍스트내에서 Dispatchers를 변경해야할 경우 업스트림 플로우를 위한 다른 코루틴을 생성합니다.

버퍼링

방출하는 함수에서 오래걸리는 경우를 생각해 봅시다.
방출하는 함수에서 오래 걸리는 경우 수집하는 쪽도 오래 걸립니다.

위의 코드에서의 전체 수집기간은 약 1200ms 가 걸렸습니다.

1
2
3
Collected in 1278 ms

이 때 buffer 연산자를 사용하면서 방출됨과 동시에 수집이 동시에 수행되도록 만들 수 있습니다.

위의 예제는 동일한 수들을 더 빠르게 처리가 가능합니다.
첫번째 수를 위해서만 100ms를 기다리고 수 처리를 위해 300ms 를 기다리도록 파이프라인을 정했기 때문에 가능합니다.

결과값

1
2
3
Collected in 1064 ms

flowOn 연산자도 buffer를 사용합니다.

병합

플로우가 연산의 일부분이나 상태의 업데이트를 방출하는 경우 각각의 방출된 값을 처리하는것은 불필요하며 최신 값만 처리해야 할 것 입니다.
이 때 conflate 연산자를 사용하여 수집하는데 너무 오래걸릴경우 스킵할 수 있습니다.

결과값

1
3
Collected in 750 msㅊ

첫번 째 수가 처리중인데 두번 째, 세번째 수가 방출 됨에 따라 두번 째 값을 스킵하고 세번 째 값이 수집된 것을 확인할 수 있습니다.

최신 값 처리

속도를 높이는 방법중 하나인 새로운 값이 방출될 때마다 느린 수집를 취소하고 재시작 하는 것입니다.
연산자 마다 연산자Last 가 존재하며(mapLatest, collectLatest 등 ) 새로운 값들이 방출되면 코드블록을 취소합니다.

collectLast의 코드 블럭이 300ms를 소모하고 새로운 값들은 매 100ms 마다 방출되기 때문에 재 때 실행된 것을 확인할 수 있으며, 마지막 값에 대해서 끝까지 수행 됨을 확인할 수 있습니다.

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 673 ms

다중 플로우의 합성

Zip

Kotlin의 확장함수와 동일하게 플로우에도 Zip 함수로 값들을 병합할 수 있습니다.

결과값

1 -> one
2 -> two
3 -> three

Combine

플로우의 연산이나 상태의 최근값에 어떤 연산을 추가적으로 더 수행하거나 별도의 업 스트림 플로우가 값을 방출할 때마다 추가 연산을 수행해야 할 수 있습니다.
이와 관련된 연산자들을 combine 이라고 합니다.

zip을 통해 플로우를 합치면 가장 느린 Flow인 400ms 마다 출력됩니다.

이 때 zip 연산자 대신 combine 연산자를 사용하면

플로우가 방출될 때마다 다른 플로우의 최신값을 가지고 병합하여 출력됩니다.

결과값

1 -> one at 448 ms from start
2 -> one at 650 ms from start
2 -> two at 854 ms from start
3 -> two at 962 ms from start
3 -> three at 1258 ms from start

플로우의 예외처리 방법

수집기의 try & catch

try/catch 블록을 사용해 예외처리를 할 수 있습니다.

결과값

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

모든 예외 처리

위 예제는 방출, 종단 또는 중간 연산자에 대한 예외처리를 잡아냈지만
모든 예외처리를 할 수 있습니다.

결과값

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

예외 캡슐화

플로우에 있어서 예외는 반드시 투명해야 합니다.
try/catch를 사용해 예외처리 한후 방출하는 것은 예외 투명성에 위반됩니다.

해결방법은 catch 연산자를 사용하면 캡슐화가 가능합니다.
또한 여러 방법들이 있습니다.

  • throw 연산자로 예외처리
  • catch 연산자에 emit 던지기
  • 다른 코드를 통한 예외 무시, 로깅, 기타 처리

try/catch와 동일하게 예외처리 할 수 있습니다.

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

catch 예외 투명성

catch 연산자는 오로지 업 스트림에서만 동작하며 다운 스트림에서는 동작하지 않습니다.

결과값

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at FlowKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:135)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$$inlined$collect$1.emit(Collect.kt:136)
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:77)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)
at FlowKt$foo$1.invokeSuspend(flow.kt:290)
at FlowKt$foo$1.invoke(flow.kt)
at FlowKt$foo$1.invoke(flow.kt)
at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:61)
at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:212)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt.catchImpl(Errors.kt:230)
at kotlinx.coroutines.flow.FlowKt.catchImpl(Unknown Source)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catch$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:113)
at FlowKt$main$1.invokeSuspend(flow.kt:302)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:87)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:61)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:40)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at FlowKt.main(flow.kt:294)
at FlowKt.main(flow.kt)

플로우의 종료

플로우의 수집이 종료 될때 그 동작을 수행하는 방법에 대해 ㅏㅇㄹ아봅시다.

Imperative finally block

try/catch에서 수집이 종료시 finally 구문을 통해 작성할 수 있습니다.

결과값

1
2
3
Done

선언적 처리

onCompletion 연산자를 통해 완전히 수집되었을 때의 실행될 로직에 처리할 수 있습니다.

결과값

1
2
3
Done

onCompletion의 장점은 nullable로 정의되는 Throwable 피라미터를 통해 성공,실패 로 수집되었는지 확인될 수 있다는 겁니다.

결과값

1 
Flow completed exceptionally
Caught exception

onCompleted 연산자는 예외처리를 하지 않고 다운스트림으로 전달됩니다.
결국 catch 연산자로 처리됩니다.

업 스트림 예외만 가능

onCompleted 연산자도 catch연산자와 똑같이 업스트림 에서의 예외처리만 가능하고 다운 스트림의 예외는 알수 없습니다.

결과값

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at FlowKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:135)
at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect(SafeCollector.common.kt:115)
at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:114)
at FlowKt$main$1.invokeSuspend(flow.kt:333)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:87)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:61)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:40)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at FlowKt.main(flow.kt:325)
at FlowKt.main(flow.kt)

선언적 vs 명령적

선호는 코딩 스타일에 따라 사용하면 됩니다.

플로우 실행

어떤 소스로부터 발생하는 이벤트는 flow로 표현이 가능합니다,

플로우에서는 onEach가 그런 역할을 합니다. onEach는 중단함수 이므로 종단함수가 없으면 아무 소용이 없습니다.

결과값

Event: 1
Event: 2
Event: 3
Done

launchIn 종단 연산자를 사용하면 플로우의 수집을 다른 코루틴에서 수행이 가능하며 곧바로 실행할 수 있습니다.

결과값

Event: 1
Event: 2
Event: 3
Done

laucnIn 연산자에 필요한 피라미터는 수집할 코루틴의 스코프입니다.
위 예제에서는 runBlocking 코루틴 빌더의 스코프로 전달되었고 runBlocking은 자식 코루틴의 종료를 기다리고 메인함수가 반환되어 프로그램이 종료되는것을 방지합니다.

실제 어플리케이션은 제한된 생명주기를 갖는 엔티티로부터 전달될 수 있습니다.
그 엔티티의 생명주기가 종료되면 스코프도 취소되며 스코프에 속한 플로우도 수집을 취소합니다.

laucnIn도 Job을 반환하기 때문에 스코프를 취소하거나 join 할 필요없이 플로우 수집 코루틴을 취소할 수 있습니다.

플로우와 Rx

플로우는 Rx에 영향을 많이 받았습니다. 차이점으로는 플로우의 주요 목적은 구조화된 동시성을 따르며 코틀린의 중단함수를 이용하여 가능한 단순하게 만드는 것입니다.

개념적으로 다르지만 플로우는 리액티브 스트림이며 리액티브 퍼블리셔로 변환하거나 반대로 변환할 수 있습니다.
이를 사용하려면

  • kotlin-coroutines-reactive : Reactive Streams
  • kotlin-coroutines-reactor : Project Reactor
  • kotlin-coroutines-rx2 : RxJava2

모듈을 사용하면 됩니다.

--

--