코틀린의 코루틴 — 5. Asynchronous Flow 2부

hongbeom
hongbeomi dev
Published in
11 min readJun 21, 2020

코루틴의 비동기 Flow에 대해 알아봅니다.

이 글은 공식 코루틴 가이드 링크의 내용을 기반으로 하여 작성하였습니다. 이 글을 작성할 당시 버전은 1.3.71입니다.

1. Coroutine Basic2. Cancellation and Timeouts3. Composing Suspending Functions4. Coroutine Context and Dispatchers5. Asynchronous Flow 1부6. Asynchronous Flow 2부7. Channels

지난 번 Asynchronous Flow 1부 에 이어서 글을 작성하였습니다.

Buffering

flow의 다른 부분을 다른 코루틴으로 실행하는 것은 장기간 실행되는 비동기 연산이 관련된 경우 flowcollect에 걸리는 전체 시간에 도움이 될 수 있습니다. 예를 들어 foo() flow에 의한 방출 작업이 느리고 원소를 생산하는 데 100ms가 소요되며 collector도 느려서 원소를 처리하는 데 300ms가 소요된 경우를 생각해봅시다. 이런 flow에서 세 개의 숫자를 collect하는데 얼마나 시간이 걸리는지 확인해보겠습니다.

전체 수집에 약 1200ms가 소요된 것을 확인할 수 있습니다.

우리는 flow에서 buffer 연산자를 사용하여 수집 코드를 실행함과 동시에 foo()의 방출 작업 코드를 실행할 수 있습니다.

우리가 효과적으로 처리하는 파이프라인을 만들어주었기 때문에 첫 번째 숫자를 100ms만 기다린 다음 각 숫자를 처리하기 위해 300ms의 시간만 소요했기 때문에 동일한 숫자를 더 빨리 처리합니다. 이렇게 하면 약 1000ms가 소요된 것을 확인할 수 있습니다.

flowOn 연산자는 CoroutineDispather를 변경 할 때 동일한 버퍼링 매커니즘을 사용하지만 여기서는 실행하는 context를 변경하지 않고 버퍼링을 명시적으로 요청하여 사용했다는 것에 유의해야합니다.

Conflation

flow가 실행 또는 상태 업데이트의 부분적인 결과를 나타내는 경우, 각 값을 처리할 필요가 없고 가장 최근의 값만 처리할 필요가 있을 수 있습니다. 이 경우, conflate 연산자를 사용하여 collector가 너무 느려서 중간 값을 처리할 수 없을 때 중간 값을 건너뛸 수 있습니다. 이전 예제를 기반으로 다시 코드를 살펴보겠습니다.

첫 번째 번호가 아직 처리 중이고 세 번째 번호는 이미 생산되었으므로 두 번째 번호는 conflated 되었고 가장 최근의 번호 (세 번째 번호)만 collector에게 전달되는 것을 확인할 수 있습니다.

최근 값 처리

Conflation은 방출하는 작업과 수집 작업이 너무 느릴 때 처리 속도를 높이는 한 가지 방법입니다. Conflation은 방출되는 값을 drop함으로써 작업을 처리합니다. 다른 방법으로는 느린 수집 작업을 취소하고 새로운 값이 나올 때마다 다시 작업을 시작하는 것입니다. xxxLatest 연산자들은 xxx연산자와 동일하게 필수적인 로직을 수행하지만 새로운 값에 대해 블록 안의 코드를 취소하는 경우가 있습니다. 이전 예제에서 collectcollectLatest로 바꾸어 다시 실행해보겠습니다.

collectLatest의 블록에서 300ms가 걸리지만 100ms마다 새로운 값이 방출되기 때문에 블록은 모든 값에 대해 실행되지만 마지막 값에 대해서만 완료 됨을 알 수 있습니다.

다중 flow 구성

여러 가지 flow를 구성하는 방법은 많습니다.

Zip

Sequence.zip 처럼 flow는 두 flow의 해당 값을 결합하는 zip 연산자를 가지고 있습니다.

Combine

flow가 변수나 연산의 가장 최근 값을 나타내는 경우 해당 flow의 가장 최근의 값에 따라 달라지는 계산을 수행하고 upstream flow가 값을 방출할 때마다 이를 다시 계산해야 할 수 있습니다. 해당 연산자 계열을 combine이라고 합니다.

예시는 다음과 같습니다.

우리는 zip과는 상당히 다른 결과값을 확인할 수 있는데, 각 numsstrs의 방출 값에 따라 다시 계산을 수행하여 결과가 출력 됨을 알 수 있습니다.

Flattening flows

flow는 비동기적으로 수신된 값의 시퀀스를 나타내기 때문에 각 값이 다른 값의 시퀀스에 대한 요청을 트리거하는 상황은 상당히 대처하기 쉽습니다. 예를 들어 500ms 간격으로 두 문자열의 flow를 반환하는 함수가 있다고 가정 해봅시다.

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500)
emit("$i: Second")
}

그리고 세 개의 정수를 가지고 있는 flow가 있고 각각에 대해 다음과 같이 requestFlow를 사용할 경우,

(1..3).asFlow().map { requestFlow(it) }

그리고 나서 우리는 추가적인 처리를 위해 하나의 flow로 평탄화되어야 하는 flow(Flow<Flow<String>>)을 가지게 됩니다. 컬렉션과 시퀀스에는 이를 위한 flattenflatMap 연산자가 있습니다. 그러나 flow는 비동기적 특성 때문에 다른 형태의 평탄화를 요구하는데, 이를 위해 아래 키워드 처럼 flow 평탄화 연산자들이 존재합니다.

flatMapConcat

연쇄적인 평탄화 방식은 flatMapConcatflattenConcat 연산자에 의해 구현됩니다. 이들은 다음 예시에서 알 수 있듯이 다음 수집 작업을 시작하기 전에 내부 flow가 완료될 때까지 기다립니다.

flatMapConcat의 순차적인 특성은 출력 결과에서 명확하게 알 수 있습니다.

flatMapMerge

또 다른 평탄화 방식에는 유입되는 모든 flow를 동시에 수집하고 그 값을 단일 flow로 병합하여 가능한 한 빨리 값이 방출되도록 하는 방식이 있습니다. 이것은 flatMapMergeflattenMerge에 의해 구현됩니다. 이들은 동시에 수집되는 동시 flow의 수를 제한하는 선택적인 동시성 매개 변수를 옵션으로 선택할 수 있습니다.(기본 값은 DEFAULT_CONCURRENCY)

flatMapMerge는 코드 블록( { requestFlow(it) } )을 순차적으로 호출하지만 결과 flow를 동시에 수집하며 map { requestFlow(it) }을 먼저 수행한 후 결과에 대해 flatMerge를 호출하는 것과 동일하다는 것에 유의하세요.

flatMapLatest

collectLastest 연산자와 유사한 방식으로, 새로운 flow가 방출되는 동시에 이전 flow의 컬렉션이 취소되는 최신 평탄화 방식이 있습니다.

flatMapLatest는 새 값에 대한 블록 ( { requestFlow(it) } )의 모든 코드를 취소한다는 점에 유의하세요. 이 특별한 예제에서는 차이가 안보이는데, 이는 flow의 요청 자체가 빠르고, 중단되지 않으며, 취소할 수 없기 때문입니다. 하지만 만약 우리가 delay와 같은 정지 기능을 사용한다면, 차이가 나타날 것입니다.

Flow exceptions

flow 수집 작업은 연산자 내부의 방출 작업 또는 코드가 예외를 던지는 경우 예외로 완료할 수 있습니다. 이런 예외를 처리하는 데는 여러 가지 방법이 존재합니다.

Collector try and catch

collector는 코틀린의 try/catch 블록을 사용하여 다음과 같은 예외를 처리할 수 있습니다.

Everything is caught

앞의 예제는 실제로 방출 값 또는 중간 또는 terminal 연산자에서 발생하는 예외를 포착합니다. 예를 들어 방출한 값이 문자열로 매핑되도록 위의 코드를 변경한 코드가 작성해보면 해당 코드는 예외를 발생시킵니다.

Exception transparency

하지만 어떻게 코드가 해당 코드의 예외 처리 행동을 캡슐화 할 수 있을까요?

flow는 예외에 대해 투명해야 하며 flow { … } builder의 값을 try/catch에서 내보내는 것은 예외 투명성을 위반합니다. 이는 앞의 예제처럼 예외를 던지는 collector가 항상 try/catch를 사용하여 예외를 잡을 수 있음을 보장해줍니다.

방출 작업은 이 예외 투명성을 보존하고 예외 처리를 캡슐화 할 수 이는 catch 연산자를 사용할 수 있습니다. catch 연산자의 내부에선 예외를 분석하고 어떤 예외를 포착했는지에 따라 다른 방식으로 대응할 수 있습니다.

  • Exceptions을 다시 던질 수 있습니다.
  • Exceptions은 catch 블록에서 방출 됨을 이용하여 어떤 값의 방출로 변경할 수 있습니다.
  • 일부 다른 코드에서 Exceptions을 무시, 기록 또는 처리할 수 있습니다.

예를 들어 예외를 처리하는 방법에 대한 텍스트 값을 방출한다고 가정해보겠습니다.

코드 상에 try/catch 가 없음에도 불구하고 예제의 출력은 동일합니다.

Transparent catch

예외의 투명성을 보장하는 catch 중간 연산자는 upstream 예외만 포착합니다. collect {...} (catch 아래에 위치) 블록이 예외를 발생시킨 경우 다음과 같이 동작합니다.

catch 연산자가 있음에도 불구하고 “Caught …” 메시지는 출력되지 않습니다.

Catching declaratively

collect 연산자 바디를 onEach로 옮겨서 그것을 catch 연산자 앞에 두면 모든 예외를 처리하는 부분과 catch 연산자의 동작 부분을 결합시킬 수 있습니다. 이 flow에서 collect는 매개 변수 없이 collect()를 호출하여 작동시켜야 합니다.

이제 “Caught …” 메시지가 출력되어 try/catch 블록을 명시적으로 사용하지 않고도 모든 예외 사항을 파악할 수 있게 되었습니다.

Flow completion

flowcollect가 완료되었을 때 어떤 조치가 필요할 수 있습니다. 이것을 명령과 선언 두 가지 방법으로 구현할 수 있습니다.

명령적인 final block

try/catch 외에도 collector는 collect 완료 시 final 블록을 사용하여 작업을 실행할 수 있습니다.

위 코드는 1부터 3까지 출력한 후 마지막으로 Done을 출력합니다.

선언적인 처리

선언적 접근방식의 경우 flowflow가 완전히 collect 되었을 때 호출되는 onCompletion 이라는 중간 연산자를 가지고 있습니다.

이전 예제는 onCompletion 연산자를 사용하여 다시 작성할 수 있으며 동일한 출력을 보여줍니다.

onCompletion의 주요 장점은 flow collect가 정상적으로 완료되었는지 또는 예외적으로 완료되었는지 여부를 결정하는 데 사용되는 람다의 nullableThrowable 매개변수입니다. 다음 예제에서 foo() flow는 숫자 1을 방출한 후 예외를 발생시킵니다.

onCompletion 연산자는 catch와 달리 예외를 처리하지 않습니다. 위의 예제 코드에서 알 수 있듯이 예외는 여전히 downstream으로 흘러갑니다. 예외는onCompletion에서 catch 블록으로 전달되어 처리됩니다.

Successful completion

catch 연산자와의 또 다른 차이점은 onCompletion은 모든 예외를 확인하고 (취소 또는 실패 없이) upstream flow가 성공적으로 완료될 때만 null 예외를 받는다는 것입니다.

downstream의 예외로 인해 flow가 중단되었기 때문에 완료 원인이 null이 아님을 알 수 있습니다.

Imperative VS declarative

이제 우리는 flowcollect하는 방법을 알았고, 그것의 완료나 예외를 명령적이고 선언적인 방식으로 처리할 수 있게 되었습니다. 둘 중 어떤 접근법이 좋다 이런 것은 없지만 코드 스타일이나 선호도에 따라 입맛대로 선택하여 유용하게 사용하시면 됩니다.

Launching flow

만일 우리가 onEach 다음에 collect terminal 연산자를 사용하는 경우, 그 후의 코드는 flowcollect될 때까지 기다리게 됩니다.

launchIn terminal 연산자는 이 때 유용하게 사용할 수 있습니다. collectlaunchIn으로 대체함으로써 별도의 코루틴에서 flow의 collection을 실행할 수 있게되므로 추가적인 코드의 실행이 즉각적으로 실행됩니다.

launchIn에 사용되는 파라미터는 flowcollect할 코루틴이 시작되는 CoroutineScope에 지정됩니다. 위의 예제에서 이 스코프는 runBlocking coroutine builder에서 시작되므로 flow가 실행되는 동안 이 runBlocking 스코프는 하위 코루틴의 완료를 기다리며 main 함수가 이 예제를 리턴하거나 종료하지 않도록 합니다.

실제 애플리케이션에서 스코프는 라이프사이클이 제한된 엔티티에서 나올 것입니다. 이 엔티티의 라이프사이클이 종료되는 즉시 해당 스코프가 취소되고 해당 flowcollect가 취소됩니다. 이렇게 하면 onEach { ... }.launchIn(scope) 블록은 addEventListener 처럼 작동하게 됩니다. 하지만 removeEventListener 같은 기능은 필요하지 않게 됩니다.

launchIn 또한 Job을 반환하는데, 이는 전체 스코프를 취소하거나 결합하지 않고 해당 flow collect 코루틴만 취소하는 데 사용할 수 있습니다.

읽어주셔서 감사합니다!🙌

다음 글에선 Channels 라는 주제로 이어서 작성하겠습니다.

--

--