Coroutine 탐험일지(4)

LeeChangMin
쓱싹팀 이야기
23 min readAug 6, 2022

Asynchronous Flow

Photo by Carl Heyerdahl on Unsplash

공식 가이드코루틴 공식 가이드 자세히 읽기- Part9 을 따라 코루틴을 탐험해봅시다.
코루틴 적용 버전은 1.6.3입니다.

값 하나를 반환하는 중단함수를 정의하여 비동기로 실행할 수 있습니다.
그렇다면 여러 개의 값을 반환하는 중단함수를 만드는 방법은 무엇일까요?
플로우를 사용하면 가능합니다!

Representing multiple values

1
2
3

코틀린에서 다수의 값은 위 코드와 같이 컬렉션을 사용해 나타낼 수 있습니다.

Sequences

1
2
3

위 코드와 같이 각 연산에 100ms 시간이 소요되는 수들을 나타낼 때 시퀀스를 사용합니다.
즉, 각각의 수에 CPU 연산이 요구되는 수들을 나타낼 때 시퀀스를 사용합니다.

Suspending functions

시퀀스에서 사용된 연산 코드는 메인 스레드를 정지 시킵니다.
이런 연산은 비동기 코드에서 실행될 때 우리는 suspend키워드를 함수에 붙여 중단함수로 정의할 수 있습니다.

정의한 중단함수를 코루틴 스코프에서 호출하면 호출 스레드의 정지없이 실행할 수 있습니다.

Flows

List<Int>타입을 리턴하는 것은 값을 한번에 return하는 것을 의미합니다.
동기로 처리될 값들에 Sequence<Int>를 사용하듯, 비동기로 처리될 값들의 스트림을 나타내기 위해 Flow<Int>타입을 사용합니다.

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

메인 스레드에서 실행되는 별도의 코루틴에서 100ms마다 I’m not locked가 출력된 이후 각 숫자가 출력됩니다.

이전 예제들과 다르게 Flow는 아래 차이점들을 가집니다.

  • Flow 타입에 대한 builder 함수는 flow{}입니다.
  • flow {...}블록은 suspend될 수 있습니다.
  • simple()함수는 이제 suspend키워드를 사용하지 않아도 됩니다.
  • 값은 emit함수를 사용해 flow에서 방출됩니다.
  • 값은 collect함수를 사용해 flow에서 수집됩니다.

위 함수에서 delay(100)Thread.sleep(100)으로 변경하면 결과가 아래와 같이 변경됩니다.

I'm not blocked 1
I'm not blocked 2
I'm not blocked 3
1
2
3

main thread가 block되는 것을 확인할 수 있습니다.

Flows are cold

플로우는 시퀀스와 유사하게 cold streams 입니다. flow{}빌더 내부 코드는 flow가 수집될 때 까지 작동하지 않습니다.

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

플로우를 반환하는 simple()함수가 suspend를 표시하지 않는 핵심 이유입니다. simple함수는 호출되면 아무것도 기다리지 않고 바로 리턴됩니다.

flow는 위 코드의 결과와 같이 collect 될 때 마다 시작되는 것을 볼 수 있습니다.

Flow cancellation basics

플로우는 코루틴의 일반적인 취소 방식을 따릅니다.
일반적인 코루틴과 같이 플로우도 delay와 같은 취소 가능한 중단 함수가 호출 되어 중단되면 취소가 가능합니다.

Emitting 1
1
Emitting 2
2
Done

Flow builders

지금까지 살펴본 flow {...}빌더는 가장 기본적이었습니다.
플로우를 선언하기 쉬운 다른 빌더들도 있습니다.

  • flowOf빌더는 고정된 값들을 emit 합니다.
  • .asFlow()확장 함수를 사용해 다양한 컬렉션과 시퀀스를 flow로 변경할 수 있습니다.

Intermediate flow operators

플로우는 컬렉션과 시퀀스와 같이 연산자를 통해 변경될 수 있습니다.
중간 연산자는 업스트림 플로우에 적용해 다운스트림 플로우를 반환합니다.
이런 연산자들은 플로우와 같이 cold 입니다. 이런 연산자의 호출은 중단 함수가 아니기에 새로 변경된 플로우를 즉시 반환합니다.

기본 연산자들은 익숙한 mapfilter같이 익숙한 이름을 가집니다.
시퀀스와 중요한 차이는 이 연산자들은 실행되고 있는 코드 블록에서 중단 함수를 호출할 수 있습니다.

예를 들어, 요청된 플로우가 map 연산자로 매핑될 수 있습니다.
작업이 긴 시간의 중단함수에 의해 요청된 플로우도 매핑됩니다.

response 1
response 2
response 3

Transform operator

Flow 변환 연산자 중 가장 일반적인 연산자는 transform입니다.
이 연산자는 map이나 filter같은 단순한 변환이나, 복잡한 변환들을 구현하기 위해 사용됩니다. transform연산자를 사용해 임의의 값을 임의의 횟수로 방출할 수 있습니다.

예를 들어, transform연산자를 사용해 오래 걸리는 비동기 요청을 수행하기 전에 기본 문자열을 먼저 방출하고 요청에 대한 응답이 도착하면 그 결과를 방출할 수 있습니다.

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

Size-limiting operators

take와 같은 크기 제한 중간 연산자는 플로우의 실행을 정의된 한계에 도달하면 취소합니다. 코루틴에서 취소는 항상 예외가 발생합니다. 그래서 try {...} finally {...}같은 자원 관리 함수들이 취소가 발생한 경우 정상적이게 동작합니다.

1
2
Finally in numbers

위 코드로 보아 take(2)로 인해 2번 방출되고 멈춘 것을 확인할 수 있습니다.

Terminal flow operators

플로우의 Terminal 연산자는 수집을 시작하는 중단 함수입니다.
collect 연산자는 가장 기본적인 연산자입니다. 아래와 같이 collect를 쉽게 하는 연산자들도 있습니다.

  • toListtoSet같은 다양한 컬렉션으로의 변환
  • 연산자는 첫번째 값을 얻고 플로우가 단일값만 방출함을 보장
  • reducefold를 사용해 플로우 값을 줄일 수 있음
55

Flows are sequential

플로우의 독립적인 collect는 여러 플로우를 사용하는 특별한 연산자가 없는 한 순차적으로 수행됩니다.
수집은 터미널 연산자를 호출한 코루틴에서 직접 실행됩니다. 기본적으로 새로운 코루틴은 실행되지 않고, 방출된 각 값은 업 스트림에서 다운 스트림으로 모든 중간 연산자들이 처리해 마지막으로 터미널 연산자로 전달됩니다.

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Flow context

flow의 collection은 항상 코루틴을 호출하는 context에서 발생합니다.
예를 들어, simple이라는 flow가 있을 때 다음과 같은 코드는 명시된 컨텍스트 상에서 수행됩니다.

withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}

이런 플로우의 특징을 컨텍스트 보존이라고 부릅니다.

기본적으로 flow {...}빌더에 제공된 코드 블록은 플로우 collect를 실행한 코루틴의 컨텍스트에서 실행됩니다.
예를 들어, 호출 스레드를 출력하며 3개의 숫자를 방출하는 함수를 생각해 봅시다.

[main @coroutine#1] Started simple flow 
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

simple().collect{}가 메인 스레드에서 호출되어 simple의 플로우 또한 메인 스레드에서 호출되는 것을 볼 수 있습니다.

이 특징은 빠른 실행을 보장하고, 호출자를 블록하지 않고 실행 컨텍스트에 관계 없이 비동기 작업을 수행하는 완벽한 기본 방법입니다.

Wrong emission withContext

그러나, CPU를 오래쓰는 코드는 Dispatchers.Default context에서 실행되고 UI 업데이트 코드는 Dispatchers.Maincontext에서 실행됩니다.
보통 withContext는 코틀린 코루틴을 사용하는 코드에서 컨텍스트 전환을 위해 사용되지만, flow {...}builder는 context preservation(컨텍스트 보존) 특징으로 인해 다른 context에서 값을 방출할 수 없습니다.

Exception in thread "main" java.lang.IllegalStateException: 
Flow invariant is violated:
Flow was collected in [CoroutineId(1),"coroutine#1":BlockingCoroutine{Active}@3060aa8,BlockingEventLoop@7572d78a],
but emission happened in [CoroutineId(1),"coroutine#1":DispatchedCoroutine{Active}@211173fb, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead at ...

context를 변경해 값을 방출하려 해 예외가 발생한다.

flowOn operator

위 코드의 결과의 마지막을 보면 flowOn을 사용하라는 것을 볼 수 있습니다.
flow의 context를 변경하는 방법은 아래 코드와 같습니다.

[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1 
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

flow {...}는 백그라운드 스레드에서 작동하고, collect는 메인 스레드에서 작동하는 것을 확인할 수 있습니다.

flowOn연산자는 context 말고도 flow의 순차적인 특성을 변경합니다.
값 들의 수집이 @coroutine#1에서 발생하고 방출은 다른 스레드에서 실행되는 @coroutine#2에서 발생합니다.
flowOn 연산자는 컨텍스트 내에서 CoroutineDispatcher를 변경할 경우 업스트림 플로우를 위한 다른 코루틴을 생성합니다.

Buffering

플로우의 다른 부분을 다른 코루틴에서 실행하는 것은 해당 플로우를 수집하는데 걸리는 전체 시간으로 생각하면 도움이 될 수 있습니다. 특히, 오래 걸리는 비동기 연산이 관련되어 있으면 더 좋습니다.
예를 들어, 아래 simple flow의 emit이 느리고 원소 생산에 100ms가 걸리는 경우를 생각해봅시다. 심지어 collect도 300ms씩 매번 걸리는 경우입니다.
이 경우 1~3을 처리하는 데 얼마나 걸리나 확인해 봅시다.

1
2
3
Collected in 1235 ms

약 1200ms가 걸리는 것을 확인할 수 있습니다.

Buffer연산자를 사용해 방출과 수집을 동시에 수행되도록 할 수 있습니다.

1 
2
3
Collected in 1089 ms

첫번째 수를 위해 100ms를 기다린 다음 각 수를 처리하기 위해 300ms씩 기다리도록 처리해 약 1000ms만 소요된 것을 확인할 수 있습니다.

flowOn 연산자는 CoroutineDispatcher를 변경할 때와 같은 버퍼링 메커니즘을 사용하지만, buffer 연산자를 사용해 실행 컨텍스트의 변경 없이 버퍼링을 했습니다.

conflation

flow가 연산의 일부나 연산 상태 업데이트를 방출하는 경우 각각의 값을 처리하는 것은 불필요하고 가장 최근의 값 하나만 처리하면 됩니다. 이 경우,
conflate 연산자가 collector의 처리가 너무 느린 경우 중간 값들을 스킵할 수 있습니다.

1
3
Collected in 780 ms

첫번째 수가 처리중일 때 세번째 수가 생산되어 두번째 수는 skip되는 것을 확인할 수 있습니다.

Processing the latest value

Conflation은 emit과 collect가 둘다 느릴 때 속도를 올리는 법 중 하나 입니다. 중간 값들을 skip하는 방법입니다.
다른 방법은 slow collector를 취소하고 새로운 값이 emit될 때마다 collector를 재시작하는 방법입니다. xxxLatest연산자들은 xxx연산자와 동일하게 필수 로직을 수행하지만 새로운 값이 emit되면 블록 안의 코드를 취소하는 경우가 있습니다.

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 699 ms

값이 수집되면 뒤에 블록을 취소하고 새로 시작하는 것을 볼 수 있습니다.
마지막 값의 경우 새로 수집되는 값이 없어 끝까지 실행되는 것을 볼 수 있습니다.

Composing multiple flows

다중 flow를 합치는 방법은 여러가지입니다.

Zip

코틀린 표준 라이브러리에 있는 Sequence.zip 확장함수와 같이
flow는 두 flow의 값을 합치는 zip 연산자를 가집니다.

1 -> one
2 -> two
3 -> three

Combine

flow가 가장 최근 값을 나타내면 flow의 가장 최근 값에 따라 달라지는 계산을 수행하고 업스트림 플로우가 값을 방출할 때마다 그 추가 연산을 다시 계산해야 할 수 있습니다. 이와 관련된 연산자를 combine이라고 합니다.

nonCombine
1 -> one at 486 ms from start
2 -> two at 886 ms from start
3 -> three at 1288 ms from start
combine
1 -> one at 453 ms from start
2 -> one at 664 ms from start
2 -> two at 858 ms from start
3 -> two at 978 ms from start
3 -> three at 1262 ms from start

zip을 사용할 때와 combine을 사용할 때의 결과가 다른 것을 확인할 수 있습니다. combine을 사용할 경우 numsstrs플로우에서 emit이 될 때 마다 최신 값을 가지고 처리하는 것을 확인할 수 있습니다.

Flattening flows

플로우는 비동기로 얻는 값들의 시퀀스를 나타냅니다.
그래서 어떤 플로우에서 수신되는 일련의 값들이 다른 값들의 시퀀스를 요청하는 일은 자주 발생합니다.

1~3의 정수가 emit될 때마다 requestFlow가 호출됩니다.

이 결과는 추가적인 처리가 필요한 Flow<Flow<String>>타입을 가지는 flow가 반환됩니다. 컬렉션이나 시퀀스는 이를 위해 flatten이나 flatMap연산자를 가집니다. 플로우는 비동기적 특징으로 인해 다른 플래트닝 연산이 필요하며 이를 위한 연산자들이 별도로 정의되어 있습니다.

flatMapConcat

하나로 합치(Concatenating)는 모드는 flatMapConcatflattenConcat 연산자로 구현됩니다. 이 연산자들은 하는 시퀀스에 정의된 비슷한 기능의 연산자와 가장 유사한 연산자입니다. 이 연산자들은 다음 예제와 같이 다음 플로우의 수집은 현재 플로우가 끝난 후에 합니다.

1: First at 155 ms from start
1: Second at 666 ms from start
2: First at 771 ms from start
2: Second at 1280 ms from start
3: First at 1385 ms from start
3: Second at 1894 ms from start

순차적인 특징을 가집니다

flatMapMerge

다른 flattening 모드에는 들어오는 플로우들을 모두 동시에 수집하고 그 값들을 단일 플로우로 합쳐서 값들이 가능한 빠르게 방출하는 모드가 있습니다.
이들은 flatMapMergeflattenMerge 연산자로 구현됩니다. 이 연산자들은 concurrency 파라미터를 선택할 수 있습니다. 해당 파라미터는 동시에 수집할 플로우의 개수를 제한합니다.

1: First at 182 ms from start
2: First at 286 ms from start
3: First at 391 ms from start
1: Second at 689 ms from start
2: Second at 793 ms from start
3: Second at 911 ms from start

requestFlow(it)의 실행은 순차적이지만 플로우의 수집은 동시에 이뤄집니다. 이는 순차적으로 map { requestFlow(it) } 을 호출하고, 그 결과에 flattenMerge를 호출하는 것과 동일합니다.

flatMapLatest

위의 collectLatest와 유사하게 플래트닝 모드가 정의된 연산자가 있습니다.
flatMapLatest 연산자로 구현되었으며 새로운 플로우가 방출될 때 마다 직전 플로우를 취소시킵니다.

1: First at 187 ms from start
2: First at 292 ms from start
3: First at 395 ms from start
3: Second at 906 ms from start

flatMapLatest는 새로운 값이 방출되면 그 실행 블록 전체를 취소합니다.
(위 코드에서는 { requestFlow(it) } 가 해당) 해당 예제로는 큰 차이가 없습니다. requestFlow 자체가 빠르고, 중단되지 않고, 취소되지 않았기 때문입니다. delay와 같은 중단 함수를 사용하면 큰 차이가 보일 것입니다.

Flow exceptions

플로우의 collectionemitercode내부의 연산자가 예외가 발생될 때 예외 상태로 완료할 수 있습니다.

Collector try and catch

collector는 try/catch 블록을 사용하면 예외를 처리할 수 있습니다.

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

위 코드는 예외를 잡은 후 어떤 값도 방출하지 않습니다.

Everything is caught

위 예제는 방출기나 중단, 종단 연산자에서 발생한 어떤 예를 잡습니다.
예를 들어, 방출된 값을 string에 mapping하는 코드로 변경하고자 하지만 아래 코드는 예외를 발생합니다.

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

Exception transparency

예외 처리를 캡슐화 해봅시다.

플로우는 예외에 투명해야하고 flow { … } 빌더의 값을 try/catch 에서 방출하는 것은 예외 투명성을 위반합니다. 이는 위의 예제처럼 예외를 던지는 collector가 항상 try/catch를 사용해 예외를 잡음을 보장합니다.

emitter는 catch 연산자를 사용해 예외 투명성과 예외 처리 캡슐화를 보장할 수 있습니다. catch 연산자 내부에서는 예외를 분석하고 발생한 예외의 타입에 따라 다른 대응이 가능합니다.

  • throw 연산자를 통한 예외 다시 던지기
  • catch 에서 emit을 사용해 값 타입으로 방출
  • 다른 코드를 통한 예외 무시, 로깅, 기타 처리
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

try/catch 블록을 사용하지 않았지만 결과는 이전 예제와 같습니다.

Transparent catch

예외의 투명성을 보장하는 catch 연산자는 upstream 예외만 잡습니다.
다운 스트림에서 발생한 예외는 잡지 않습니다.

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at AsynchronousFlowKt$transparentCatch$1$2.emit(AsynchronousFlow.kt:317)
at AsynchronousFlowKt$transparentCatch$1$2.emit(AsynchronousFlow.kt:316)
...

출력 결과를 보면 catch 연산자가 있어도 Caught $e의 메시지가 출력되지 않음을 확인할 수 있습니다.

Catching declaratively

위 예제에서 collect 의 내부 코드들을 onEach로 옮겨 catch 연산자 앞에 두면 모든 예외 처리 부분과 catch 연산자의 동작을 결합시킬 수 있습니다. 이 flow에서 collect는 매개 변수 없이 collect()를 호출하여 작동해야 합니다.

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

Flow completion

플로우가 정상적이든 예외든 collection이 완료되면 다른 동작을 실행할 필요가 있을 수도 있습니다. 이런 방법에는 두가지가 있습니다.

  1. 명령적 방법(imperative)
  2. 선언적 방법(declarative)

Imperative finally block

try/catch 외에 collector는 finally를 사용해 수집의 완료 이후 동작을 취할 수 있습니다. try/finally

1
2
3
Done

Declarative handling

선언적인 접근 방법을 위해 플로우는 onCompletion 연산자를 가집니다. 이 연산자는 플로우의 수집이 완료되면 실행됩니다.

이전 예제를 onCompletion 을 사용해 바꾸면 그전과 같은 결과가 나옵니다.

onCompletion을 사용해 얻는 핵심 이점은 람다에 nullable로 정의되는 Throwable 파라미터를 통해 플로우 수집이 성공적 종료거나 예외가 발생했는지 알 수 있는 것입니다. 다음 예제는 simple() 플로우가 숫자 1을 방출한 후 예외를 발생시킵니다.

1
Flow completed exceptionally
Caught exception

onCompletion 연산자는 catch와 다르게 예외를 처리하지 않습니다.
위 코드에서 알 수 있듯 예외는 다운 스트림으로 전달 됩니다. 결국 예외는 onCompletion 연산자에서 catch 연산자로 처리됩니다.

Successful completion

catch 연산자와 다른 차이점은 onCompletion은 모든 예외를 확인하고 업스트림 플로우가 성공적으로 완료된 경우(취소나 예외 없이) null 예외를 받습니다.

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at AsynchronousFlowKt$successfulCompletion$1$2.emit(AsynchronousFlow.kt:365)
at AsynchronousFlowKt$successfulCompletion$1$2.emit(AsynchronousFlow.kt:364)

completion의 cause가 null 이 아닌 것을 확인할 수 있습니다.
플로우가 다운스트림 예외 때문에 중단되었기 때문입니다.

Imperative vs declarative

flow를 collect하는 방법과 플로우의 완료와 예외를 선언적, 명령적으로 다루는 방법을 알았습니다. 무슨 방법이 더 좋은지는 따지기 어려우니 본인의 코딩 스타일에 따라 결정하면 됩니다.

Launching flow

어떤 소스에서 발생하는 비동기 이벤트는 플로우로 나타내기 쉽습니다.
이런 경우 addEventListener 함수를 등록해 이후 필요한 일을 하면 됩니다.
플로우는 onEach 연산자가 이런 역할을 합니다. 그러나, onEach는 중간 연산자라서 flow를 collect 할 종단 연산자가 필요합니다. 종단 연산자 없이 onEach를 호출하면 효과가 없습니다.

만약 onEach연산자 이후 collect종단 연산자를 하면 이후 코드는 플로우가 수집될 때 까지 기다립니다.

Event: 1
Event: 2
Event: 3
Done

launchIn 종단 연산자는 이럴 때 유용합니다. collect 를 launchIn으로 바꿔 플로우의 수집을 다른 코루틴에서 할 수 있습니다. 이를 통해 이후 작성된 코드들이 바로 실행됩니다.

Done
Event: 1
Event: 2
Event: 3

launchInflowcollect할 코루틴의 CoroutineScope를 반드시 파라미터로 요구합니다. 위 코드에선 runBlocking 코루틴 빌더로부터 전달됩니다.
이로 인해 플로우가 실행되는 동안 runBlocking 스코프는 자식 코루틴의 종료를 기다리고 메인 함수가 반환되어 프로그램이 종료되는 것을 방지합니다.

실제 어플에서 스코프는 생명주기가 제한된 엔티티에서 나옵니다.
엔티티의 생명주기가 취소되어 종료되면 해당 스코프가 취소되고 스코프에 속한 플로우의 수집 또한 취소됩니다.
이런 방식으로 onEach{ … }.launchIn(scope)addEventListener와 동일하게 수행됩니다. 하지만, removeEventListener 함수는 필요하지 않습니다.
취소와 구조화된 동시성이 이를 대신 수행해주기 때문입니다.

launchIn 또한 Job을 반환합니다. 이 Job은 전체 스코프를 취소하거나 특정 Job에 대한 join 을 사용할 필요 없이 그에 속한 플로우 수집 코루틴을 취소할 수 있습니다.

Flow cancellation checks

편의를 위해 flow 빌더는 방출된 각 값에 대한 취소에 대해 추가적으로 ensureActive 검사를 합니다.
즉, loop 문에서 방출은 취소가능 하다는 말입니다.

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@1b68ddbd

4가 방출될 때 취소되는 것을 확인할 수 있습니다.

Flow and Reactive Streams

Reactive에 익숙하신 분들은 플로우가 친숙할 것입니다.

플로우의 디자인은 리액티브 스트림과 그의 다양한 구현에 영향을 받았습니다. 하지만, 플로우의 주요 목표는 구조화된 동시성을 따르며 코틀린과 중단 함수를 이용하여 가능한 단순화된 디자인을 가집니다.

개념적으로 다르지만 플로우는 리액티브 스트림이기에
플로우 <-> 리액티브 퍼블리셔간 변환이 가능합니다.

--

--