Kt.Academy Kotlin Coroutines Deep Dive Summary 6부 — Flow 빌더 및 기본 Operator
본 게시글은 Kt.Academy의 Kotlin Coroutines DEEP DIVE의 요약본입니다.
목차
- 코루틴 원리 및 빌더
- 코루틴 컨텍스트 및 취소
- 코루틴 예외 처리 및 스코프 함수
- 코루틴 디스패처 및 스코프 함수
- 코루틴 테스트
- Flow 빌더 및 기본 Operator — 현재 게시글
- Flow Operator
Flow
는 어딘가에서는 실행을 시켜야 합니다. 그리고 Flow
를 실행 시키기 위한 여러 방법론이 존재합니다. 본 게시글에서는 가장 중요한 방법론에 대해서 다루도록 하겠습니다.
Flow Builder
flowOf(Elements)
Flow
를 생성하기 위한 가장 쉬운 방법은 flowOf()
를 통해 생성합니다 (List
생성을 위한 listOf()
와 유사함)
Converters
Iterable
, Iterator
, Sequence
를 asFlow()
를 이용해서 Flow
로 변환할 수 있습니다.
함수를 Flow로 변환
람다를 Flow
로 변환 할 수 있습니다.
가능한 람다 : () -> T
, suspend () -> T
함수 레퍼런스를 이용해서도 Flow로 변환이 가능합니다.
flow()
Flow
를 생성하기 가장 기본적인 방법은 flow()
빌더 함수를 활용 하는 것 입니다.
그리고 내부에서 receiver로 FlowCollector
가 전달되고, FlowCollector
의 emit()
, emitAll()
을 통해 Flow의 값을 방출 할 수 있습니다.
flow()
가 동작하는 방식은 매우 간단합니다. flow()
를 통해 전달 받은 suspend
block을 가지고 있다가, collect()
가 호출될 때 실행합니다.
ChannelFlow
Flow
는 콜드 스트림이므로 구독 시점 이후에 데이터를 방출합니다.
만약 페이징 처리가 되어있는 API로 부터 page 1번 부터 끝까지, 특정 User 정보를 찾고자 한다면 다음과 같이 만들 수 있을 것입니다.
(FakeUserApi
로 서버 대체)
이렇게 했을 때는 생산과 소비가 순차적 입니다.
하지만 이렇게 하지 않고, 생산은 생산대로, 소비는 소비대로 병렬로 이뤄진다면 더 빠르게 조회 할 수도 있습니다.
ChannelFlow
는 콜드 스트림인 Flow
와 핫 스트림인 Channel
의 하이브리드입니다. 구독 이후 시점부터 동작하지만, 생산과 소비의 동기화가 이뤄지지 않습니다.
channelFlow()
말고도 Flow.buffer()
를 통해서 ChannelFlow
로 변환 할 수 있습니다.
ChannelFlow
를 사용하는 이유는 생산과 소비를 병렬로 처리하기 위함 입니다. 그렇기 때문에 ChannelFlow
내부에 receiver로 전달되는 ProducerScope
의 자식 코루틴을 만들어서 병렬 생산을 할 수 있습니다.
CallbackFlow
기존에 사용하는 listener 를 Flow
로 변환하기 위한 클래스입니다. CallbackFlow
는 ChannelFlow
의 자식 클래스입니다.
CallbackFlow
는 ProducerScope
가 receiver로 전달되며, ProducerScope
의 자주 사용하는 함수는 다음과 같습니다.
awaitClose()
:suspend
함수로CallbackFlow
내부에 있는Channel
이close
될 때, resume 되어 block을 실행 합니다.send()
:suspend
함수로 값을 방출합니다. 만약 버퍼 사이즈가 가득 찼을 경우엔 버퍼 전략에 따라 처리됩니다.trySend()
: 일반 함수로, 값을 방출합니다. 리턴 값으론 값 방출이 정상적으로 됬는지를 리턴합니다.ChannelResult.isFailure
(버퍼 가득참),ChannelResult.isSuccess
(방출 성공),ChannelResult.isClosed
(채널 종료됨)를 리턴합니다.trySendBlocking()
: 일반 함수로, 값을 방출합니다. 스레드를 block시키는send()
라고 이해하면 됩니다.trySend()
와 다른점은trySend()
는 버퍼가 가득 차면 즉시 실패를 리턴하지만,trySendBlocking()
은 버퍼가 가득 차면 스레드를 블락 시키고 대기합니다. 그 후 버퍼에 공간이 생기면 값을 방출합니다.close()
: 내부Channel
을 종료합니다.cancel()
: 내부Channel
을 종료하고, Exception을Flow
에게throw
합니다.
CallbackFlow
는 다음과 같이 자주 사용됩니다.
Flow Operator
Flow
는 RxJava
처럼 Operator를 연결하여 다양한 처리를 할 수 있습니다.
onEach
상위 스트림이 방출한 값으로 무언가를 처리하기 위한 Operator입니다.
상위 스트림이 준 값을 가지고 어떤 함수를 호출하거나 로직을 처리하기 위한 목적으로 사용합니다.
onStart
onStart()
는 Flow
가 시작 준비가 완료되면 호출됩니다. 또한 receiver로 FlowCollector
가 들어오기 때문에 emit()
도 가능합니다
onComplete
Flow
가 완료 되었을 때 실행되는 Operator입니다. 또한 catch()
가 정의 되어 있다면 중간에 Exception이 발생해도 호출됩니다.
catch()
가 정의되어 있지 않다면 onComplete()
이 실행되지 않습니다.
catch()
로 Exception을 잡을 경우엔 onComplete()
가 호출된다.
catch
Flow
에서 Exception
이 발생 했을 때, 처리하기 위한 Operator입니다. FlowCollector
가 receiver로 들어오기 때문에 값을 emit()
할 수 있습니다.
catch()
로 Exception을 잡으면 나머지 하위 스트림 호출 후에 Flow
는 종료됩니다.
flowOn
Flow
는 코루틴에 의해 실행됩니다. 그리고 부모 코루틴의 CoroutineContext
가 적용됩니다.
flowOn()
을 이용하여 Flow
자체에 CoroutineContext
를 지정할 수 있습니다.
flowOn()
은 상위 스트림의 CoroutineContext
를 지정합니다. flowOn()
을 지정하지 않으면 부모 코루틴의 CoroutineContext
를 사용합니다.
launchIn
Flow
를 실행 시키기 위해선 collect()
를 호출해야 합니다. 그리고 collect()
는 suspend
함수 이기 때문에 코루틴 빌더로 감싸서 호출 해야 합니다.
감싸서 호출 해도 되지만, launchIn()
을 통해 scope를 전달해서 좀 더 깔끔하게 Flow
를 실행 시킬 수 있습니다.
응용하기
앱에서 서버 데이터를 요청하고 그 사이에 LoadingView를 보여주고, Error 처리 하는 것을 Flow
를 이용하면 아주 간단하게 처리할 수 있습니다.