Kt.Academy Kotlin Coroutines Deep Dive Summary 6부 — Flow 빌더 및 기본 Operator

GodDB
7 min readMar 27, 2022

--

본 게시글은 Kt.Academy의 Kotlin Coroutines DEEP DIVE의 요약본입니다.

목차

  1. 코루틴 원리 및 빌더
  2. 코루틴 컨텍스트 및 취소
  3. 코루틴 예외 처리 및 스코프 함수
  4. 코루틴 디스패처 및 스코프 함수
  5. 코루틴 테스트
  6. Flow 빌더 및 기본 Operator — 현재 게시글
  7. Flow Operator

Flow는 어딘가에서는 실행을 시켜야 합니다. 그리고 Flow를 실행 시키기 위한 여러 방법론이 존재합니다. 본 게시글에서는 가장 중요한 방법론에 대해서 다루도록 하겠습니다.

Flow Builder

flowOf(Elements)

Flow를 생성하기 위한 가장 쉬운 방법은 flowOf() 를 통해 생성합니다 (List 생성을 위한 listOf()와 유사함)

Converters

Iterable, Iterator, SequenceasFlow()를 이용해서 Flow로 변환할 수 있습니다.

함수를 Flow로 변환

람다를 Flow로 변환 할 수 있습니다.

가능한 람다 : () -> T , suspend () -> T

함수 레퍼런스를 이용해서도 Flow로 변환이 가능합니다.

flow()

Flow를 생성하기 가장 기본적인 방법은 flow() 빌더 함수를 활용 하는 것 입니다.

그리고 내부에서 receiver로 FlowCollector가 전달되고, FlowCollectoremit(), emitAll()을 통해 Flow의 값을 방출 할 수 있습니다.

flow()가 동작하는 방식은 매우 간단합니다. flow()를 통해 전달 받은 suspend block을 가지고 있다가, collect()가 호출될 때 실행합니다.

ChannelFlow

Flow는 콜드 스트림이므로 구독 시점 이후에 데이터를 방출합니다.

만약 페이징 처리가 되어있는 API로 부터 page 1번 부터 끝까지, 특정 User 정보를 찾고자 한다면 다음과 같이 만들 수 있을 것입니다.
(FakeUserApi로 서버 대체)

이렇게 했을 때는 생산과 소비가 순차적 입니다.

하지만 이렇게 하지 않고, 생산은 생산대로, 소비는 소비대로 병렬로 이뤄진다면 더 빠르게 조회 할 수도 있습니다.

ChannelFlow는 콜드 스트림인 Flow와 핫 스트림인 Channel의 하이브리드입니다. 구독 이후 시점부터 동작하지만, 생산과 소비의 동기화가 이뤄지지 않습니다.

channelFlow() 말고도 Flow.buffer() 를 통해서 ChannelFlow로 변환 할 수 있습니다.

ChannelFlow를 사용하는 이유는 생산과 소비를 병렬로 처리하기 위함 입니다. 그렇기 때문에 ChannelFlow 내부에 receiver로 전달되는 ProducerScope의 자식 코루틴을 만들어서 병렬 생산을 할 수 있습니다.

CallbackFlow

기존에 사용하는 listener 를 Flow로 변환하기 위한 클래스입니다. CallbackFlowChannelFlow의 자식 클래스입니다.

CallbackFlowProducerScope가 receiver로 전달되며, ProducerScope의 자주 사용하는 함수는 다음과 같습니다.

  • awaitClose() : suspend 함수로 CallbackFlow 내부에 있는 Channelclose될 때, resume 되어 block을 실행 합니다.
  • send() : suspend 함수로 값을 방출합니다. 만약 버퍼 사이즈가 가득 찼을 경우엔 버퍼 전략에 따라 처리됩니다.
  • trySend() : 일반 함수로, 값을 방출합니다. 리턴 값으론 값 방출이 정상적으로 됬는지를 리턴합니다.
    ChannelResult.isFailure(버퍼 가득참), ChannelResult.isSuccess(방출 성공), ChannelResult.isClosed(채널 종료됨)를 리턴합니다.
  • trySendBlocking() : 일반 함수로, 값을 방출합니다. 스레드를 block시키는 send()라고 이해하면 됩니다. trySend()와 다른점은 trySend()는 버퍼가 가득 차면 즉시 실패를 리턴하지만, trySendBlocking()은 버퍼가 가득 차면 스레드를 블락 시키고 대기합니다. 그 후 버퍼에 공간이 생기면 값을 방출합니다.
  • close() : 내부 Channel을 종료합니다.
  • cancel() : 내부 Channel을 종료하고, Exception을 Flow에게 throw합니다.

CallbackFlow는 다음과 같이 자주 사용됩니다.

Flow Operator

FlowRxJava처럼 Operator를 연결하여 다양한 처리를 할 수 있습니다.

onEach

상위 스트림이 방출한 값으로 무언가를 처리하기 위한 Operator입니다.
상위 스트림이 준 값을 가지고 어떤 함수를 호출하거나 로직을 처리하기 위한 목적으로 사용합니다.

onStart

onStart()Flow가 시작 준비가 완료되면 호출됩니다. 또한 receiver로 FlowCollector가 들어오기 때문에 emit()도 가능합니다

onComplete

Flow가 완료 되었을 때 실행되는 Operator입니다. 또한 catch()가 정의 되어 있다면 중간에 Exception이 발생해도 호출됩니다.

catch()가 정의되어 있지 않다면 onComplete()이 실행되지 않습니다.

catch()로 Exception을 잡을 경우엔 onComplete()가 호출된다.

catch

Flow에서 Exception이 발생 했을 때, 처리하기 위한 Operator입니다.
FlowCollector가 receiver로 들어오기 때문에 값을 emit() 할 수 있습니다.

catch()로 Exception을 잡으면 나머지 하위 스트림 호출 후에 Flow는 종료됩니다.

flowOn

Flow는 코루틴에 의해 실행됩니다. 그리고 부모 코루틴의 CoroutineContext가 적용됩니다.

flowOn()을 이용하여 Flow 자체에 CoroutineContext를 지정할 수 있습니다.

flowOn()은 상위 스트림의 CoroutineContext를 지정합니다. flowOn()을 지정하지 않으면 부모 코루틴의 CoroutineContext를 사용합니다.

출처 : https://kt.academy/article/cc-flow-lifecycle

launchIn

Flow를 실행 시키기 위해선 collect() 를 호출해야 합니다. 그리고 collect()suspend 함수 이기 때문에 코루틴 빌더로 감싸서 호출 해야 합니다.

감싸서 호출 해도 되지만, launchIn()을 통해 scope를 전달해서 좀 더 깔끔하게 Flow를 실행 시킬 수 있습니다.

응용하기

앱에서 서버 데이터를 요청하고 그 사이에 LoadingView를 보여주고, Error 처리 하는 것을 Flow를 이용하면 아주 간단하게 처리할 수 있습니다.

참고자료

--

--