(Android) Flow Flattening 연산자 톺아보기

flatMapConcat, flatMapMerge, flatMapLatest

Jaesung Lee
jaesung dev
9 min readSep 25, 2023

--

Photo by Solen Feyissa on Unsplash

한 화면의 UI를 그리기 위해 하나의 API만 호출한다면 가장 베스트이지만, 꼭 그렇지 않은 경우도 있습니다. 여러 API를 비동기적으로 호출하여 해당 값을 하나하나 엮는 경우도 있지만 (async-await), Flow를 사용한다면 동시성의 이점을 취할 수도 있습니다.

이번 글에서는 동시성의 이점을 살려 설계된 대표적인 API들 중, Flow의 Flattening 연산자들을 정리합니다.

용어 정리

Flow의 flattening 연산자들은 kotlinx.coroutines.flow 패키지 안에있는 Merge.kt 에서 모두 확인할 수 있습니다. 여기서 눈에 띄는 몇가지 용어들부터 짚고 넘어가면 좋을 것 같습니다.

1. flat

Kotlin Collection에도 유사한 네이밍을 갖는 flatten이라는 메서드가 있습니다. flatten은 Collection 안에 Collection이 들어있는 중첩된 형태의 Collection을 펼쳐 하나의 Collection으로 만들어주는 역할을 합니다.

이 원리를 Flow에도 적용하면 이해하기 쉽습니다. 즉, 중첩된 여러 개의 Flow를 하나의 Flow로 만들어준다는 의미입니다.

2. map

개발을 하다보면 map은 정말 많이 사용합니다. 보통 리스트 변환을 할 때 사용하게 되는데, 리스트 내의 각각의 아이템들을 변환 (transform)하여 새로운 아이템으로 mapping시킬 때 많이 사용합니다.

동일하게 Flow에서도 발행부에서 발행하는 데이터들에 대한 변환이 필요할 때 사용할 수 있습니다.

3. concat

concat이라는 단어도 익숙하신 분들이 많을 것입니다. 보통 concatenate의 약어로 많이 사용되고, 프로그래밍에서는 “순차” 라는 한글로 많이 쓰인다고 합니다. 문자열 연산으로 생각해본다면 “눈"과 “사람”을 “눈사람"으로 연결시켜주는 역할을 한다고 이해할 수 있습니다. Android에서도 이와 유사한 ConcatAdapter가 존재합니다.

이 원리를 Flow에 적용해본다면, 여러 개의 Flow를 하나의 Flow로 만들어주는 것으로 이해할 수 있습니다.

약간의 뇌피셜이 있으니 잘못된 정보는 첨언해주시면 감사합니다. ㅠ

flatMapConcat

flatMapConcat 연산자는 Upstream의 데이터에 대한 transform 연산을 수행한 후, Downstream과 연결하고 평탄화 하여 하나의 Flow를 반환하는 연산자입니다.

코드의 흐름을 살펴보면 Upstream의 각각 value에 대한 transform을 적용하고 새로운 Flow를 반환하게 됩니다. 이 후, flattenConcat을 통해 Flow를 flatten 시켜 방출하게 됩니다.

여기서 중요한 점은 flattenConcat 구현부에 있는 emitAll입니다. emitAll은 해당 Flow의 모든 값을 수집한 후에 내보내게 됩니다. 즉, 다음 Flow를 시작하기 전에 현재 Flow의 방출이 모두 완료될 때 까지 기다리게 됩니다. 따라서, flatMapConcat을 사용하게 되면 각 입력을 한번에 하나씩 처리하게 되어 순서를 보장할 수 있게 됩니다.

https://flowmarbles.com/#flatMapConcat

출력된 로그에서도 확인할 수 있듯이 flowA에서 방출하는 Flow를 requestFlow를 통해 map 연산을 수행한 후 flattenConcat을 하게 되면 순서를 보장한 상태로 collect하게 됩니다.

flatMapConcat을 사용하면 동기적으로 스트림을 결합할 수 있게 됩니다. 이로 인해, 오래 걸리는 연산이 Upstream에서 transform 될 경우 최종적으로 Flatten된 데이터를 수집하는데 오랜 시간이 걸릴 것입니다.

flatMapMerge

flatMapConcat은 동기적 연산을 수행하기 때문에 map연산에 오랜 시간이 걸리는 경우 최종 Flatten된 데이터를 수집하는데 오래 걸리는 단점이 있습니다. 이러한 단점을 flatMapMerge를 사용하면 해결할 수 있습니다.

flatMapMerge는 flatMapConcat과 동일한 역할을 합니다. 하지만 내부적으로 다른 구현을 갖고 있습니다.

flatMapMerge에서 유심히 볼 부분은 concurrency에 따른 동작 방식입니다. 기본적인 concurrency는 DEFAULT_CONCURRENCY로 설정되어있고, concurrency 파라미터를 통해 동시에 수집 가능한 Flow의 최대 개수를 제한할 수 있습니다. 따라서, flatMapMerge는 기본적으로 16개의 동시성을 지원합니다.

concurrency가 1일 경우에는 flattenConcat으로 동작하고 그 외에는 ChannelFlowMerge의 구현에 따라 동작합니다. ChannelFlowMerge는 아래와 같은 구현을 갖고있습니다.

flatMapConcat과 달리 flatMapMerge는 버퍼 정책이 적용되어있습니다. 기본적으로 64개의 기본 버퍼 크기와 함께 BufferOverflow.SUSPEND가 적용됩니다.

구현부의 핵심 로직은 ChannelFlowMerge#collectTo에 있습니다. 여기서는 허용된 동시성의 개수만큼의 자원을 갖는 Semaphore 객체를 사용합니다. 약간의 CS 지식이지만 Semaphore는 여러 개의 공유자원에 대한 동시성 문제가 생겼을 경우 이를 방지하는 방법입니다. 구현에서 볼 수 있듯이 Flow를 flatten 하는 과정에서 Semaphore의 자원을 획득한 상태에서 collect하고 최종적으로 자원을 릴리즈하는 것을 볼 수 있습니다.

즉, Flow의 각각의 데이터가 순서에 따른 flatten을 대기하지 않고 주어진 concurrency만큼 동시에 flatten될 수 있는 것을 의미합니다.

https://flowmarbles.com/#flatMapMerge

출력된 로그에서도 확인할 수 있듯이 flowA에서 방출하는 Flow를 requestFlow를 통해 map 연산을 수행한 후 flattenMerge을 하게 되면 순서 보장 없이 빠른 순서대로 collect하게 됩니다.

flatMapConcat과 달리 flatMapMerge를 사용하면 비동기적으로 스트림을 결합할 수 있다는 장점이 있게 됩니다.

flatMapLatest

flatMapLatest는 앞서 살펴본 flatMapConcat, flatMapMerge와 크게 다르지 않지만 쓰임새가 조금 다릅니다. collect와 collectLatest의 차이점을 알고 계신다면 이름만으로도 이미 이해하셨다고 봐도 무방합니다.

flatMapLatest도 두 Flow를 flatten하는 것은 동일합니다. 앞서 살펴본 연산자들의 특징을 다시 봐보겠습니다.

  • flatMapConcat : Flow가 갖는 delay 만큼 모두 대기하여 순차적으로 flatten 진행
  • flatMapMerge : delay 하지 않지만 발행하는 모든 데이터를 빠르게 flatten 진행

두 연산자와 달리 flatMapLatest는 방출된 데이터들을 추적하여 새로운 데이터가 방출되면 기존의 방출을 취소합니다. 즉, 최신 데이터만 추적하게 됩니다.

flatMapLatest에서 유심히 볼 부분은 transformLatest 입니다. 바로 내부 구현을 살펴보겠습니다.

flatMapMerge와 동일하게 flatMapLatest도 동일한 버퍼 정책이 적용되어 있습니다. 기본 64개의 버퍼 크기BufferOverflow.SUSPEND 정책이 적용됩니다.

구현부의 핵심로직은 ChannelFlowTransformLatest#flowCollect 에 있습니다. 코드가 나름대로 친절(?)하게 작성되서 그런지 새로운 데이터가 방출될 때마다 기존 previeousFlow 를 취소하고 새로운 데이터에 대한 transform을 즉각 실행하는 것을 볼 수 있습니다.

https://flowmarbles.com/#flatMapLatest

출력된 로그에서도 확인할 수 있듯이 flowA에서 방출하는 Flow를 requestFlow와 함께 transformLatest를 하게 되면 최신 데이터가 방출되었을 때 앞선 데이터는 cancel되어 출력되지 않는 것을 볼 수 있습니다.

flatMapLatest는 Flow들이 발행하는 데이터에 대한 flatten 여부와 관계없이 기존 데이터는 취소하여 빠르게 flatten된 최신 데이터를 수집한다는 장점을 갖습니다.

정리

Flow 중간 연산자들을 사용해오면서 flatten 연산자들의 활용법에 대해 많은 고민을 했었습니다. 그동안은 단순한 Flow-Chaining 역할로만 생각하고 사용했었지만 각각의 연산자가 갖는 더 중요한 특징들에 대해 깊이 있게 고민해볼 수 있었습니다.

참고한 레퍼런스에 있는 각각의 연산자들이 갖는 주요한 특징들을 아래에 남겨놓았습니다. 어떤 이유 때문이었는지 떠올려보면 좋을 것 같습니다.

  • flatMapConcat 은 발행 순서와 발행 갯수를 보장합니다.
  • flatMapMerge은 발행 갯수는 보장하지만 발행 순서는 보장하지 않습니다.
  • flatMapLatest은 발행 순서는 보장하지만 발행 갯수를 보장하지 않습니다.

--

--