Coroutine 탐험일지(4)
Asynchronous Flow
공식 가이드와 코루틴 공식 가이드 자세히 읽기- Part9 을 따라 코루틴을 탐험해봅시다.
코루틴 적용 버전은 1.6.3입니다.
값 하나를 반환하는 중단함수를 정의하여 비동기로 실행할 수 있습니다.
그렇다면 여러 개의 값을 반환하는 중단함수를 만드는 방법은 무엇일까요?
플로우를 사용하면 가능합니다!
Representing multiple values
1
2
3
코틀린에서 다수의 값은 위 코드와 같이 컬렉션을 사용해 나타낼 수 있습니다.
Sequences
1
2
3
위 코드와 같이 각 연산에 100ms 시간이 소요되는 수들을 나타낼 때 시퀀스를 사용합니다.
즉, 각각의 수에 CPU 연산이 요구되는 수들을 나타낼 때 시퀀스를 사용합니다.
Suspending functions
시퀀스에서 사용된 연산 코드는 메인 스레드를 정지 시킵니다.
이런 연산은 비동기 코드에서 실행될 때 우리는 suspend
키워드를 함수에 붙여 중단함수로 정의할 수 있습니다.
정의한 중단함수를 코루틴 스코프에서 호출하면 호출 스레드의 정지없이 실행할 수 있습니다.
Flows
List<Int>
타입을 리턴하는 것은 값을 한번에 return하는 것을 의미합니다.
동기로 처리될 값들에 Sequence<Int>
를 사용하듯, 비동기로 처리될 값들의 스트림을 나타내기 위해 Flow<Int>
타입을 사용합니다.
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
메인 스레드에서 실행되는 별도의 코루틴에서 100ms마다 I’m not locked가 출력된 이후 각 숫자가 출력됩니다.
이전 예제들과 다르게 Flow
는 아래 차이점들을 가집니다.
- Flow 타입에 대한 builder 함수는
flow{}
입니다. flow {...}
블록은suspend
될 수 있습니다.simple()
함수는 이제suspend
키워드를 사용하지 않아도 됩니다.- 값은
emit
함수를 사용해 flow에서 방출됩니다. - 값은
collect
함수를 사용해 flow에서 수집됩니다.
위 함수에서 delay(100)
을 Thread.sleep(100)
으로 변경하면 결과가 아래와 같이 변경됩니다.
I'm not blocked 1
I'm not blocked 2
I'm not blocked 3
1
2
3
main thread가 block되는 것을 확인할 수 있습니다.
Flows are cold
플로우는 시퀀스와 유사하게 cold streams 입니다. flow{}
빌더 내부 코드는 flow가 수집될 때 까지 작동하지 않습니다.
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
플로우를 반환하는 simple()
함수가 suspend
를 표시하지 않는 핵심 이유입니다. simple
함수는 호출되면 아무것도 기다리지 않고 바로 리턴됩니다.
flow는 위 코드의 결과와 같이 collect 될 때 마다 시작되는 것을 볼 수 있습니다.
Flow cancellation basics
플로우는 코루틴의 일반적인 취소 방식을 따릅니다.
일반적인 코루틴과 같이 플로우도 delay
와 같은 취소 가능한 중단 함수가 호출 되어 중단되면 취소가 가능합니다.
Emitting 1
1
Emitting 2
2
Done
Flow builders
지금까지 살펴본 flow {...}
빌더는 가장 기본적이었습니다.
플로우를 선언하기 쉬운 다른 빌더들도 있습니다.
flowOf
빌더는 고정된 값들을 emit 합니다..asFlow()
확장 함수를 사용해 다양한 컬렉션과 시퀀스를flow
로 변경할 수 있습니다.
Intermediate flow operators
플로우는 컬렉션과 시퀀스와 같이 연산자를 통해 변경될 수 있습니다.
중간 연산자는 업스트림 플로우에 적용해 다운스트림 플로우를 반환합니다.
이런 연산자들은 플로우와 같이 cold 입니다. 이런 연산자의 호출은 중단 함수가 아니기에 새로 변경된 플로우를 즉시 반환합니다.
기본 연산자들은 익숙한 map
과 filter
같이 익숙한 이름을 가집니다.
시퀀스와 중요한 차이는 이 연산자들은 실행되고 있는 코드 블록에서 중단 함수를 호출할 수 있습니다.
예를 들어, 요청된 플로우가 map 연산자로 매핑될 수 있습니다.
작업이 긴 시간의 중단함수에 의해 요청된 플로우도 매핑됩니다.
response 1
response 2
response 3
Transform operator
Flow 변환 연산자 중 가장 일반적인 연산자는 transform
입니다.
이 연산자는 map
이나 filter
같은 단순한 변환이나, 복잡한 변환들을 구현하기 위해 사용됩니다. transform
연산자를 사용해 임의의 값을 임의의 횟수로 방출할 수 있습니다.
예를 들어, transform
연산자를 사용해 오래 걸리는 비동기 요청을 수행하기 전에 기본 문자열을 먼저 방출하고 요청에 대한 응답이 도착하면 그 결과를 방출할 수 있습니다.
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
Size-limiting operators
take
와 같은 크기 제한 중간 연산자는 플로우의 실행을 정의된 한계에 도달하면 취소합니다. 코루틴에서 취소는 항상 예외가 발생합니다. 그래서 try {...} finally {...}
같은 자원 관리 함수들이 취소가 발생한 경우 정상적이게 동작합니다.
1
2
Finally in numbers
위 코드로 보아 take(2)
로 인해 2번 방출되고 멈춘 것을 확인할 수 있습니다.
Terminal flow operators
플로우의 Terminal 연산자는 수집을 시작하는 중단 함수입니다.
collect 연산자는 가장 기본적인 연산자입니다. 아래와 같이 collect를 쉽게 하는 연산자들도 있습니다.
toList
와toSet
같은 다양한 컬렉션으로의 변환- 연산자는 첫번째 값을 얻고 플로우가 단일값만 방출함을 보장
reduce
와fold
를 사용해 플로우 값을 줄일 수 있음
55
Flows are sequential
플로우의 독립적인 collect는 여러 플로우를 사용하는 특별한 연산자가 없는 한 순차적으로 수행됩니다.
수집은 터미널 연산자를 호출한 코루틴에서 직접 실행됩니다. 기본적으로 새로운 코루틴은 실행되지 않고, 방출된 각 값은 업 스트림에서 다운 스트림으로 모든 중간 연산자들이 처리해 마지막으로 터미널 연산자로 전달됩니다.
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow context
flow
의 collection은 항상 코루틴을 호출하는 context
에서 발생합니다.
예를 들어, simple
이라는 flow가 있을 때 다음과 같은 코드는 명시된 컨텍스트 상에서 수행됩니다.
withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}
이런 플로우의 특징을 컨텍스트 보존이라고 부릅니다.
기본적으로 flow {...}
빌더에 제공된 코드 블록은 플로우 collect를 실행한 코루틴의 컨텍스트에서 실행됩니다.
예를 들어, 호출 스레드를 출력하며 3개의 숫자를 방출하는 함수를 생각해 봅시다.
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
simple().collect{}
가 메인 스레드에서 호출되어 simple
의 플로우 또한 메인 스레드에서 호출되는 것을 볼 수 있습니다.
이 특징은 빠른 실행을 보장하고, 호출자를 블록하지 않고 실행 컨텍스트에 관계 없이 비동기 작업을 수행하는 완벽한 기본 방법입니다.
Wrong emission withContext
그러나, CPU를 오래쓰는 코드는 Dispatchers.Default
context에서 실행되고 UI 업데이트 코드는 Dispatchers.Main
context에서 실행됩니다.
보통 withContext
는 코틀린 코루틴을 사용하는 코드에서 컨텍스트 전환을 위해 사용되지만, flow {...}
builder는 context preservation(컨텍스트 보존) 특징으로 인해 다른 context
에서 값을 방출할 수 없습니다.
Exception in thread "main" java.lang.IllegalStateException:
Flow invariant is violated:
Flow was collected in [CoroutineId(1),"coroutine#1":BlockingCoroutine{Active}@3060aa8,BlockingEventLoop@7572d78a],
but emission happened in [CoroutineId(1),"coroutine#1":DispatchedCoroutine{Active}@211173fb, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead at ...
context를 변경해 값을 방출하려 해 예외가 발생한다.
flowOn operator
위 코드의 결과의 마지막을 보면 flowOn
을 사용하라는 것을 볼 수 있습니다.
flow의 context를 변경하는 방법은 아래 코드와 같습니다.
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
flow {...}
는 백그라운드 스레드에서 작동하고, collect는 메인 스레드에서 작동하는 것을 확인할 수 있습니다.
flowOn
연산자는 context
말고도 flow
의 순차적인 특성을 변경합니다.
값 들의 수집이 @coroutine#1
에서 발생하고 방출은 다른 스레드에서 실행되는 @coroutine#2
에서 발생합니다.flowOn
연산자는 컨텍스트 내에서 CoroutineDispatcher
를 변경할 경우 업스트림 플로우를 위한 다른 코루틴을 생성합니다.
Buffering
플로우의 다른 부분을 다른 코루틴에서 실행하는 것은 해당 플로우를 수집하는데 걸리는 전체 시간으로 생각하면 도움이 될 수 있습니다. 특히, 오래 걸리는 비동기 연산이 관련되어 있으면 더 좋습니다.
예를 들어, 아래 simple
flow의 emit이 느리고 원소 생산에 100ms가 걸리는 경우를 생각해봅시다. 심지어 collect도 300ms씩 매번 걸리는 경우입니다.
이 경우 1~3을 처리하는 데 얼마나 걸리나 확인해 봅시다.
1
2
3
Collected in 1235 ms
약 1200ms가 걸리는 것을 확인할 수 있습니다.
Buffer
연산자를 사용해 방출과 수집을 동시에 수행되도록 할 수 있습니다.
1
2
3
Collected in 1089 ms
첫번째 수를 위해 100ms를 기다린 다음 각 수를 처리하기 위해 300ms씩 기다리도록 처리해 약 1000ms만 소요된 것을 확인할 수 있습니다.
flowOn
연산자는CoroutineDispatcher
를 변경할 때와 같은 버퍼링 메커니즘을 사용하지만,buffer
연산자를 사용해 실행 컨텍스트의 변경 없이 버퍼링을 했습니다.
conflation
flow가 연산의 일부나 연산 상태 업데이트를 방출하는 경우 각각의 값을 처리하는 것은 불필요하고 가장 최근의 값 하나만 처리하면 됩니다. 이 경우,conflate
연산자가 collector의 처리가 너무 느린 경우 중간 값들을 스킵할 수 있습니다.
1
3
Collected in 780 ms
첫번째 수가 처리중일 때 세번째 수가 생산되어 두번째 수는 skip되는 것을 확인할 수 있습니다.
Processing the latest value
Conflation
은 emit과 collect가 둘다 느릴 때 속도를 올리는 법 중 하나 입니다. 중간 값들을 skip하는 방법입니다.
다른 방법은 slow collector를 취소하고 새로운 값이 emit될 때마다 collector를 재시작하는 방법입니다. xxxLatest
연산자들은 xxx
연산자와 동일하게 필수 로직을 수행하지만 새로운 값이 emit되면 블록 안의 코드를 취소하는 경우가 있습니다.
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 699 ms
값이 수집되면 뒤에 블록을 취소하고 새로 시작하는 것을 볼 수 있습니다.
마지막 값의 경우 새로 수집되는 값이 없어 끝까지 실행되는 것을 볼 수 있습니다.
Composing multiple flows
다중 flow를 합치는 방법은 여러가지입니다.
Zip
코틀린 표준 라이브러리에 있는 Sequence.zip
확장함수와 같이
flow는 두 flow의 값을 합치는 zip
연산자를 가집니다.
1 -> one
2 -> two
3 -> three
Combine
flow가 가장 최근 값을 나타내면 flow의 가장 최근 값에 따라 달라지는 계산을 수행하고 업스트림 플로우가 값을 방출할 때마다 그 추가 연산을 다시 계산해야 할 수 있습니다. 이와 관련된 연산자를 combine
이라고 합니다.
nonCombine
1 -> one at 486 ms from start
2 -> two at 886 ms from start
3 -> three at 1288 ms from startcombine
1 -> one at 453 ms from start
2 -> one at 664 ms from start
2 -> two at 858 ms from start
3 -> two at 978 ms from start
3 -> three at 1262 ms from start
zip을 사용할 때와 combine을 사용할 때의 결과가 다른 것을 확인할 수 있습니다. combine
을 사용할 경우 nums
와 strs
플로우에서 emit이 될 때 마다 최신 값을 가지고 처리하는 것을 확인할 수 있습니다.
Flattening flows
플로우는 비동기로 얻는 값들의 시퀀스를 나타냅니다.
그래서 어떤 플로우에서 수신되는 일련의 값들이 다른 값들의 시퀀스를 요청하는 일은 자주 발생합니다.
1~3의 정수가 emit될 때마다 requestFlow
가 호출됩니다.
이 결과는 추가적인 처리가 필요한 Flow<Flow<String>>
타입을 가지는 flow가 반환됩니다. 컬렉션이나 시퀀스는 이를 위해 flatten
이나 flatMap
연산자를 가집니다. 플로우는 비동기적 특징으로 인해 다른 플래트닝 연산이 필요하며 이를 위한 연산자들이 별도로 정의되어 있습니다.
flatMapConcat
하나로 합치(Concatenating)는 모드는 flatMapConcat
과 flattenConcat
연산자로 구현됩니다. 이 연산자들은 하는 시퀀스에 정의된 비슷한 기능의 연산자와 가장 유사한 연산자입니다. 이 연산자들은 다음 예제와 같이 다음 플로우의 수집은 현재 플로우가 끝난 후에 합니다.
1: First at 155 ms from start
1: Second at 666 ms from start
2: First at 771 ms from start
2: Second at 1280 ms from start
3: First at 1385 ms from start
3: Second at 1894 ms from start
순차적인 특징을 가집니다
flatMapMerge
다른 flattening 모드에는 들어오는 플로우들을 모두 동시에 수집하고 그 값들을 단일 플로우로 합쳐서 값들이 가능한 빠르게 방출하는 모드가 있습니다.
이들은 flatMapMerge
와 flattenMerge
연산자로 구현됩니다. 이 연산자들은 concurrency
파라미터를 선택할 수 있습니다. 해당 파라미터는 동시에 수집할 플로우의 개수를 제한합니다.
1: First at 182 ms from start
2: First at 286 ms from start
3: First at 391 ms from start
1: Second at 689 ms from start
2: Second at 793 ms from start
3: Second at 911 ms from start
requestFlow(it)
의 실행은 순차적이지만 플로우의 수집은 동시에 이뤄집니다. 이는 순차적으로map { requestFlow(it) }
을 호출하고, 그 결과에flattenMerge
를 호출하는 것과 동일합니다.
flatMapLatest
위의 collectLatest
와 유사하게 플래트닝 모드가 정의된 연산자가 있습니다.flatMapLatest
연산자로 구현되었으며 새로운 플로우가 방출될 때 마다 직전 플로우를 취소시킵니다.
1: First at 187 ms from start
2: First at 292 ms from start
3: First at 395 ms from start
3: Second at 906 ms from start
flatMapLatest
는 새로운 값이 방출되면 그 실행 블록 전체를 취소합니다.
(위 코드에서는 { requestFlow(it) }
가 해당) 해당 예제로는 큰 차이가 없습니다. requestFlow
자체가 빠르고, 중단되지 않고, 취소되지 않았기 때문입니다. delay
와 같은 중단 함수를 사용하면 큰 차이가 보일 것입니다.
Flow exceptions
플로우의 collection
는 emiter
나 code
내부의 연산자가 예외가 발생될 때 예외 상태로 완료할 수 있습니다.
Collector try and catch
collector는 try/catch
블록을 사용하면 예외를 처리할 수 있습니다.
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
위 코드는 예외를 잡은 후 어떤 값도 방출하지 않습니다.
Everything is caught
위 예제는 방출기나 중단, 종단 연산자에서 발생한 어떤 예를 잡습니다.
예를 들어, 방출된 값을 string에 mapping하는 코드로 변경하고자 하지만 아래 코드는 예외를 발생합니다.
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
Exception transparency
예외 처리를 캡슐화 해봅시다.
플로우는 예외에 투명해야하고 flow { … }
빌더의 값을 try/catch
에서 방출하는 것은 예외 투명성을 위반합니다. 이는 위의 예제처럼 예외를 던지는 collector가 항상 try/catch
를 사용해 예외를 잡음을 보장합니다.
emitter는 catch
연산자를 사용해 예외 투명성과 예외 처리 캡슐화를 보장할 수 있습니다. catch
연산자 내부에서는 예외를 분석하고 발생한 예외의 타입에 따라 다른 대응이 가능합니다.
throw
연산자를 통한 예외 다시 던지기catch
에서emit
을 사용해 값 타입으로 방출- 다른 코드를 통한 예외 무시, 로깅, 기타 처리
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
try/catch
블록을 사용하지 않았지만 결과는 이전 예제와 같습니다.
Transparent catch
예외의 투명성을 보장하는 catch
연산자는 upstream 예외만 잡습니다.
다운 스트림에서 발생한 예외는 잡지 않습니다.
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at AsynchronousFlowKt$transparentCatch$1$2.emit(AsynchronousFlow.kt:317)
at AsynchronousFlowKt$transparentCatch$1$2.emit(AsynchronousFlow.kt:316)
...
출력 결과를 보면 catch 연산자가 있어도 Caught $e
의 메시지가 출력되지 않음을 확인할 수 있습니다.
Catching declaratively
위 예제에서 collect
의 내부 코드들을 onEach
로 옮겨 catch 연산자 앞에 두면 모든 예외 처리 부분과 catch 연산자의 동작을 결합시킬 수 있습니다. 이 flow에서 collect는 매개 변수 없이 collect()
를 호출하여 작동해야 합니다.
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
Flow completion
플로우가 정상적이든 예외든 collection
이 완료되면 다른 동작을 실행할 필요가 있을 수도 있습니다. 이런 방법에는 두가지가 있습니다.
- 명령적 방법(imperative)
- 선언적 방법(declarative)
Imperative finally block
try/catch
외에 collector는 finally
를 사용해 수집의 완료 이후 동작을 취할 수 있습니다. try/finally
1
2
3
Done
Declarative handling
선언적인 접근 방법을 위해 플로우는 onCompletion
연산자를 가집니다. 이 연산자는 플로우의 수집이 완료되면 실행됩니다.
이전 예제를 onCompletion
을 사용해 바꾸면 그전과 같은 결과가 나옵니다.
onCompletion
을 사용해 얻는 핵심 이점은 람다에 nullable로 정의되는 Throwable
파라미터를 통해 플로우 수집이 성공적 종료거나 예외가 발생했는지 알 수 있는 것입니다. 다음 예제는 simple()
플로우가 숫자 1을 방출한 후 예외를 발생시킵니다.
1
Flow completed exceptionally
Caught exception
onCompletion
연산자는 catch
와 다르게 예외를 처리하지 않습니다.
위 코드에서 알 수 있듯 예외는 다운 스트림으로 전달 됩니다. 결국 예외는 onCompletion
연산자에서 catch
연산자로 처리됩니다.
Successful completion
catch
연산자와 다른 차이점은 onCompletion
은 모든 예외를 확인하고 업스트림 플로우가 성공적으로 완료된 경우(취소나 예외 없이) null
예외를 받습니다.
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at AsynchronousFlowKt$successfulCompletion$1$2.emit(AsynchronousFlow.kt:365)
at AsynchronousFlowKt$successfulCompletion$1$2.emit(AsynchronousFlow.kt:364)
completion의 cause
가 null 이 아닌 것을 확인할 수 있습니다.
플로우가 다운스트림 예외 때문에 중단되었기 때문입니다.
Imperative vs declarative
flow를 collect하는 방법과 플로우의 완료와 예외를 선언적, 명령적으로 다루는 방법을 알았습니다. 무슨 방법이 더 좋은지는 따지기 어려우니 본인의 코딩 스타일에 따라 결정하면 됩니다.
Launching flow
어떤 소스에서 발생하는 비동기 이벤트는 플로우로 나타내기 쉽습니다.
이런 경우 addEventListener
함수를 등록해 이후 필요한 일을 하면 됩니다.
플로우는 onEach
연산자가 이런 역할을 합니다. 그러나, onEach
는 중간 연산자라서 flow를 collect 할 종단 연산자가 필요합니다. 종단 연산자 없이 onEach
를 호출하면 효과가 없습니다.
만약 onEach
연산자 이후 collect
종단 연산자를 하면 이후 코드는 플로우가 수집될 때 까지 기다립니다.
Event: 1
Event: 2
Event: 3
Done
launchIn
종단 연산자는 이럴 때 유용합니다. collect 를 launchIn으로 바꿔 플로우의 수집을 다른 코루틴에서 할 수 있습니다. 이를 통해 이후 작성된 코드들이 바로 실행됩니다.
Done
Event: 1
Event: 2
Event: 3
launchIn
은 flow
를 collect
할 코루틴의 CoroutineScope
를 반드시 파라미터로 요구합니다. 위 코드에선 runBlocking
코루틴 빌더로부터 전달됩니다.
이로 인해 플로우가 실행되는 동안 runBlocking
스코프는 자식 코루틴의 종료를 기다리고 메인 함수가 반환되어 프로그램이 종료되는 것을 방지합니다.
실제 어플에서 스코프는 생명주기가 제한된 엔티티에서 나옵니다.
엔티티의 생명주기가 취소되어 종료되면 해당 스코프가 취소되고 스코프에 속한 플로우의 수집 또한 취소됩니다.
이런 방식으로 onEach{ … }.launchIn(scope)
는 addEventListener
와 동일하게 수행됩니다. 하지만, removeEventListener
함수는 필요하지 않습니다.
취소와 구조화된 동시성이 이를 대신 수행해주기 때문입니다.
launchIn
또한 Job
을 반환합니다. 이 Job
은 전체 스코프를 취소하거나 특정 Job
에 대한 join
을 사용할 필요 없이 그에 속한 플로우 수집 코루틴을 취소할 수 있습니다.
Flow cancellation checks
편의를 위해 flow
빌더는 방출된 각 값에 대한 취소에 대해 추가적으로 ensureActive
검사를 합니다.
즉, loop 문에서 방출은 취소가능 하다는 말입니다.
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@1b68ddbd
4가 방출될 때 취소되는 것을 확인할 수 있습니다.
Flow and Reactive Streams
Reactive에 익숙하신 분들은 플로우가 친숙할 것입니다.
플로우의 디자인은 리액티브 스트림과 그의 다양한 구현에 영향을 받았습니다. 하지만, 플로우의 주요 목표는 구조화된 동시성을 따르며 코틀린과 중단 함수를 이용하여 가능한 단순화된 디자인을 가집니다.
개념적으로 다르지만 플로우는 리액티브 스트림이기에플로우 <-> 리액티브 퍼블리셔
간 변환이 가능합니다.