Kotlin — Coroutine Flow

hongbeom
hongbeomi dev
Published in
18 min readOct 28, 2020

코틀린 코루틴 플로우 공식 블로그 [번역]

본 글은 코틀린 공식 블로그 글을 한국어로 번역한 글입니다. 원문 링크👇

Flow

코틀린의 플로우는 순차적으로 값을 내보내고 정상적으로 또는 예외로 완료되는 비동기적인 데이터 스트림입니다.

map, filter, take, zip 등과 같은 flow의 중간 연산자는 업스트림 flow나 flow에 적용되어 추가 연산자를 적용할 수 있는 다운스트림 flow를 반환하는 함수입니다. 중간 연산자는 flow에서 코드를 실행하지 않고 함수 자체를 일시 중단하지 않습니다. 이들은 향후 실행과 신속한 복귀를 위해 일련의 작업을 설정할 뿐입니다. 이를 콜드 flow 프로퍼티라고 부릅니다.

flow의 터미널 연산자는 collect, single, reduce, toList 등과 같은 일시 중단 함수이거나 지정된 스코프에서 flow 수집을 시작하는 launchIn 연산자 입니다. 이는 업스트림 flow에 적용되며 모든 작업의 실행을 트리거합니다. flow의 실행은 flow 수집(collect)라고도 하며 실제로 차단없이 항상 일시 중단하는 방식으로 수행됩니다. 터미널 연산자는 업스트림에서 모든 flow 작업의 성공 또는 실패에 따라 정상적으로 또는 예외적으로 완료됩니다. 가장 기본적인 터미널 연산자는 collect 입니다. 아래는 예시입니다.

try {
flow.collect { value ->
println("Received $value")
}
} catch (e : Exception) {
println("The flow has thrown an exception : $e")
}

기본적으로 flow는 순차적이고 모든 flow 작업은 동일한 코루틴에서 순차적으로 실행됩니다. 단 bufferflatMapMerge와 같은 flow 실행해 동시성을 도입하도록 특별히 설계된 몇 가지 작업은 예외입니다.

Flow 인터페이스는 flow가 반복적으로 수집되고 수집될 때마다 동일한 코드의 실행을 트리거할 수 있는 콜드 스트림인지 또는 각 수집의 동일한 실행 소스와 다른 값을 방출하는 핫 스트림인지 여부에 대한 정보를 전달하지 않습니다. 일반적으로 flow는 콜드 스트림을 나타내지만 SharedFlow 하위 타입은 핫 스트림을 나타냅니다. 또한 stateInshareIn 연산자에 의해 또는 productIn 연산자를 통해 flow를 핫 채널로 변환하여 모든 flow를 핫 채널로 변환할 수 있습니다.

Flow builders

flow를 만드는 기본 방법은 다음과 같습니다.

  • flowOf(..) 함수를 사용하여 고정된 값 집합에서 flow를 만듭니다.
  • 다양한 타입에 대한 asFlow() 익스텐션 함수를 사용하여 flow로 변환합니다.
  • flow { .. } 빌더 함수로 순차적 호출에서 emit 함수로 임의의 flow를 구성합니다.
  • channelFlow { .. } 빌더 함수를 통해 잠재적으로 동시 호출에서 send 함수로의 임의의 flow를 구성합니다.
  • MutableStateFlowMutableSharedFlow는 해당 생성자 함수를 정의하여 직접 업데이트 할 수 있는 핫 flow를 생성합니다.

Flow constraints

Flow 인터페이스의 모든 구현은 아래에 자세히 설명된 두 가지 주요 속성을 준수해야 합니다.

  • context 보존.
  • Exception transparency (예외 투명성)

이런 특성은 flow를 사용하여 코드에 대한 로컬 판단을 수행하고 업스트림 flow emitter가 다운 스트림 flow collector와 별도로 개발 할 수 있는 방식으로 코드를 모듈화하는 기능을 보장합니다. flow의 사용자는 flow에서 사용하는 업스트림의 구현 세부 정보를 알 필요가 없습니다.

Context preservation

flow는 context 보존 특성을 가지고 있습니다. 즉, 자체적으로 실행하는 context를 캡슐화하고 다운스트림에서 전파하거나 누출하지 않으므로 특정 변환 또는 터미널 연산자의 실행 context에 대한 판단을 간단하게 만듭니다.

flow의 context를 변경하는 유일한 방법은 업스트림 context를 변경하는 flowOn 연산자입니다.

val flowA = flowOf(1, 2, 3)
.map { it + 1 } // ctxA에서 실행됩니다.
.flowOn(ctxA) // 업스트림 context 변경

// 이제 context 보존 특성을 가진 flow가 있습니다. - 어딘가에서 실행되지만 이 정보는 flow 자체에 캡슐화됩니다.

val filtered = flowA // ctxA는 flowA에서 캡슐화됩니다.
.filter { it == 3 } // 아직 context가 없는 순수 연산자

withContext(Dispatchers.Main) {
// 캡슐화되지 않은 모든 연산자는 Main에서 실행됩니다.
val result = filtered.single()
myUi.text = result
}

구현의 관점에서 모든 flow 구현은 동일한 코루틴에서만 방출되어야 한다는 것을 의미합니다. 이 제약 조건은 기본 flow 빌더에 의해 효과적으로 적용됩니다. flow의 구현이 어떤 코루틴을 시작하지 않는 경우 빌더를 사용해야 합니다. 이를 구현하면 대부분의 개발 실수를 방지할 수 있습니다.

val myFlow = flow {     
// GlobalScope.launch { // 금지됨
// launch (Dispatchers.IO) { // 금지됨
// withContext(CoroutineName( "myFlow")) // 금지됨
emit(1) // OK
coroutineScope {
emit(2) // OK - 여전히 동일한 코루틴
}
}

flow의 수집과 방출이 여러 코루틴으로 분리되어야하는 경우 channelFlow를 사용하면 됩니다. 모든 context 보존 작업을 캡슐화하여 구현의 세부 사항이 아닌 도메인 별 문제에 집중할 수 있습니다. channelFlow 내에서 코루틴 빌더를 조합하여 사용할 수 있습니다.

성능을 개선하고 싶고 동시에 방출 및 context 점프가 발생하지 않는다고 확신하는 경우 flow 빌더 대신 coroutineScope 또는 supervisorScope와 함께 사용할 수 있습니다.

Exception transparency

flow 구현은 다운스트림 flow에서 발생하는 예외를 포착하거나 처리하지 않습니다. 구현 관점에서 보면 emitemitAll의 호출이 try { .. } catch { .. } 블록으로 래핑되지 않아야 한다는 것을 의미합니다. flow의 예외 처리는 catch 연산자로 수행되어야 하며 이 연산자는 모든 다운스트림에게 예외를 전달하는 동안 업스트림 flow에서 발생하는 예외만 catch 하도록 설계되었습니다. 마찬가지로 collect와 같은 터미널 연산자는 코드 또는 업스트림 flow에서 발생하는 처리되지 않는 예외를 발생시킵니다. 예제를 보겠습니다.

flow { emitData() } 
.map { computeOne(it) }
.catch {...} // emitData 및 computeOne에서 예외 포착
.map { computeTwo(it) }
.collect { process(it) } // 다음에서 예외 발생 처리 및 computeTwo

finally 블록에 대한 대체로 onCompletion 연산자에도 동일한 추론을 적용할 수 있습니다.

예외 투명성의 요구 사항을 준수하지 않으면 collect { .. }의 예외로 인하여 코드에 대한 추론을 어렵게 만드는 이상한 동작이 발생할 수 있습니다. 왜냐하면 exception이 업스트림 flow에 의해 어떻게든 “caugth”되어 로컬 추론 능력을 제한할 수 있기 때문입니다.

flow 머신은 런타임에 예외 투명성을 적용하여 이전 시도에서 예외가 발생된 경우 값을 방출하려는 모든 시도에서 IllegalStateException을 던집니다.

Reactive streams

Flow는 Reactive Stream과 호환되므로 Flow.asPublisherPublisher.asFlow를 사용하여 kotlin-coroutines-reactive 모듈의 리액티브 스트림과 안전하게 상호 작용할 수 있습니다.

Not stable for inheritance

향후에 Flow 인터페이스에 새 메소드가 추가될 수 있지만 현재 사용은 안정적이므로 flow 인터페이스는 타사 라이브러리에서 상속할 때 안정적이지 않습니다. flow { .. } 빌더를 사용하여 구현을 생성합니다.

Functions

  • collect : 지정된 collector를 허용하고 해당 collector로 값을 emits 합니다. 이 메소드를 직접 구현하여 사용하면 안됩니다.

Extension Functions : ⚠️ deprecated 된 부분은 제외하였습니다.

  • buffer : 버퍼는 지정된 용량의 채널을 통하여 값을 방출하며 별도의 코루틴에서 collector를 실행합니다.
  • cancellable : 각 방출에 대한 취소 상태를 확인하고 flow 콜렉터가 취소된 경우 해당 취소 원인을 던지는 flow를 반환합니다. flow 빌더 및 SharedFlow의 모든 구현은 기본적으로 cancellable 될 수 있습니다.
  • catch : flow 완료되었을 때 예외를 포착하고 포착한 예외와 함께 인자에 지정된 action을 호출합니다. 이 연산자는 다운스트림 흐름에서 발생하는 예외에 대해 투명하며 flow를 취소하기 위해 던지는 예외는 catch하지 않습니다.
  • collect : 1. 인자가 지정되지 않은 익스텐션은 지정된 flow를 수집하지만 모든 방출된 값을 무시하는 터미널 flow 연산자 입니다. 수집 중 또는 제공된 flow에서 예외가 발생하는 경우 이 예외는 이 메소드에서 다시 발생합니다. 2. 인자가 지정된 익스텐션은 제공된action으로 지정된 flow를 수집하는 터미널 flow 연산자입니다.
  • collectIndexed : 엘리먼트(0 기반)와 인덱스를 취하는 제공된 action으로 지정된 flow를 수집하는 터미널 flow 연산자입니다.
  • collectLatest : 제공된 action을 사용하여 지정된 flow를 수집하는 터미널 flow 연산자입니다. collect와 중요한 차이점은 원래 flow가 새 값을 방출하면 이전 값에 대한 action 블록이 취소된다는 것입니다.
  • combine : 각 flow에서 가장 최근에 방출한 값을 조합하여 인자로 주어진 transform 함수와 함께 값이 생성된 flow를 반환합니다.
  • combineTransform : 각 flow에서 가장 최근에 방출한 값을 처리하는 transform 함수에 의해 값이 생성되는 flow를 반환합니다.
  • conflate : 압축된 채널을 통해 flow 방출값을 압축하여 별도의 코루틴에서 콜렉터를 실행합니다. 이로 인해 콜렉터가 느려서 방출자가 일시 중단되지 않지만 콜렉터는 항상 가장 최근의 값을 방출한다는 것입니다.
  • count : flow의 요소 수를 반환합니다. predicate 인자를 넣어주면 predicate 조건에 해당하는 요소의 수만 반환합니다.
  • debounce : 원래 flow와 동일한 flow를 반환하지만, 지정된 제한 시간(timeout) 안에 새로운 값 뒤에 오는 값은 필터링합니다. 항상 최신 값이 방출됩니다.
  • distinctUntilChanged : 1. 인자가 없는 경우, 동일한 값의 후속 반복이 모두 필터링되는 flow를 반환합니다. 2. 제공된 areEquivalent 인자를 통해 동일한 값의 후속 반복을 비교하며 마찬가지로 필터링하는 flow를 반환합니다.
  • distinctUntilChangedBy : 동일한 키의 후속 반복이 모두 필터링되고 인자로 주어진 keySelector 함수로 키를 추출하는 flow를 반환합니다.
  • drop : 인자로 주어진 count가 음수일 경우 예외를 던지며, 값을 방출하기 시작할 때 주어진 count 값 만큼 방출값을 무시하는 flow를 반환합니다.
  • filter : 지정된 predicate와 일치하는 값만 포함하고 있는 flow를 반환합니다.
  • filterInstance : 지정된 타입 R의 인스턴스 값만 포함하는 flow를 반환합니다.
  • filterNot : 지정된 predicate와 일치하지 않는 값만 포함하고 있는 flow를 반환합니다.
  • filterNotNull : null이 아닌 값만 포함하는 flow를 반환합니다.
  • first : 1. 인자가 없는 경우 첫 번째 요소를 반환한 후 flow의 수집을 취소하는 터미널 연산자입니다. flow가 비어있는 경우 NoSuchElementException을 던집니다. 2. 지정된 predicate와 일치하는 flow에서 방출된 첫 번째 요소를 반환한 후 flow의 수집을 취소하는 터미널 연산자입니다. 마찬가지로 flow가 비어있는 경우 동일한 에러를 던집니다.
  • firstOrNull : first와 비슷하지만 flow가 비어있으면 null을 반환합니다.
  • flatMapConcat : 인자로 주어진 transform을 적용하고 flow를 반환한 다음 이러한 flow들을 연결 및 평탄화하여 원래 flow에서 방출되는 요소를 변환합니다,
  • flatMapLatest : 원래 flow가 값을 방출할 때마다 transform 함수에 의해 생성된 새로운 flow로 전환된 flow를 반환합니다. 원래 flow가 새 값을 방출하면 transform 블록에서 생성된 이전 flow는 취소됩니다.
  • flatMapMerge : transform을 적용하고 다른 flow를 반환한 후 이러한 flow를 병합 및 평탄화하여 원래 flow에서 방출되는 요소를 반환합니다.
  • flattenConcat : 인자로 주어진 flow를 끼우지 않고 순차적인 단일 flow로 지정된 flow로 평탄화합니다. 이 메소드는 개념적으로 flattenMerge(concurrency = 1)과 동일하지만 구현 속도가 더 빠릅니다.
  • flattenMerge : 동시에 수집되는 flow 수에 대한 concurrency 제한이 있는 단일 flow로 지정된 flow를 평탄화합니다.
  • flowOn : 이 flow가 실행되는 context를 지정된 context로 변경합니다. 이 연산자는 합성 가능하며 고유한 context가 없는 이전 연산자에만 영향을 미칩니다. 이 연산자는 context 보존 특성을 가집니다. 또한 context는 다운스트림 flow로 유출되지 않습니다.
  • flowWith : builder 내에서 지정된 flow에 적용된 모든 변환이 실행되는 context를 변경하는 연산자입니다. 이 연산자는 context 보존 특성을 가지며 이전 및 후속 작업의 context에는 영향을 주지 않습니다.
  • fold : initial 값부터 시작하여 operation 인자의 acc 값과 현재 값으로 계산된 값과 각 요소를 적용하여 값을 누적합니다.
  • launchIn : scope에서 지정된 flow의 컬렉션을 시작하는 터미널 flow 연산자입니다. scope.launch { flow.collect() } 의 줄임입니다.
  • map : 지정된 transform 함수를 원래 flow의 각 값에 적용한 결과가 포함된 flow를 반환합니다.
  • mapLatest : transform 함수에 의해 변환된 원래 flow에서 요소를 방출하는 flow를 반환합니다. 원래 flow가 새 값을 방출하면 이전 값의 transform 블록 작업이 취소됩니다.
  • mapNotNull : 지정된 transform 함수를 원래 flow의 각 값에 적용한 null이 아닌 결과만 포함하는 flow를 반환합니다.
  • onCompletion : 취소 예외 또는 실패를 action 매개 변수로 전달하여 flow가 완료되거나 취소된 후 지정된 action을 호출하는 flow를 반환합니다.
  • onEach : 업스트림 flow의 각 값이 다운스트림에서 방출되기 전에 지정된 action을 호출하는 flow를 반환합니다.
  • onEmpty : 이 flow가 완료되었을 때 요소를 방출하지 않고 지정된 action을 호출합니다. action의 수신자는 FlowCollector 이므로 onEmpty는 추가 요소를 방출할 수 있습니다. 예를 들어 다음과 같습니다.
emptyFlow<Int>().onEmpty {
emit(1)
emit(2)
}.collect { println(it) } // prints 1, 2
  • onStart : 이 flow가 수집되기 전에 지정된 action을 호출하는 flow를 반환합니다.
  • produceIn : 지정된 flow를 수집하는 생상적인 코루틴을 만듭니다.
  • reduce : 첫 번째 요소부터 시작하여 현재 계산 값 및 각 요소에 operation을 적용합니다. flow가 비어있는 경우 NoSuchElementException을 던집니다.
  • retry : 지정된 predicate와 일치하는 예외가 업스트림 flow에서 발생할 경우 최대 retries 횟수까지 지정된 flow의 수집을 재시도 합니다. 이 연산자는 다운스트림 flow에서 발생하는 예외에 대해 투명하며 flow를 취소하기 위해 발생하는 예외에 대해서는 재시도하지 않습니다.
  • retryWhen : 업스트림 flow에서 예외가 발생하고 predicate 가 true를 반환할 때 지정된 flow의 수집을 다시 시도합니다. predicate는 또한 초기 호출 시 0부터 시작하는 attempt 숫자를 매개 변수로 수신합니다. 이 연산자는 다운스트림 flow에서 발생하는 예외에 대해 투명하며 flow를 취소하기 위해 발생하는 예외에 대해서는 재시도하지 않습니다.
  • sample : 지정된 샘플링 period 동안 원래 flow에서 방출된 최신 값만 방출되는 flow를 반환합니다.
  • scan : operation 지정된 flow을 접고 initial 값을 포함한 모든 중간 결과를 내보냅니다. 초기 값은 서로 다른 콜렉터 간에 공유되므로 변경할 수 없거나 변경해서는 안 됩니다. 예를 들어 다음과 같습니다.
flowOf(1, 2, 3)
.scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
// [[], [1], [1, 2], [1, 2, 3]]
  • sharedIn : 콜드 flow를 지정된 코루틴 scope에서 시작되는 핫 SharedFlow로 변환하여 업스트림 flow의 단일 실행 인스턴스에서 방출된 방출값을 다운스트림 구독자와 공유하고 replay 수 만큼의 값을 새 구독자에게 발행합니다.
  • single : 하나의 값만 내보내기를 기다리는 터미널 연산자입니다. 둘 이상의 요소가 포함된 flow의 경우 IllgalStateException을 던지고 빈 요소로 구성된 flow의 경우 NoSuchElementException을 던집니다.
  • singleOrNull : single과 유사하지만 에러가 발생하는 상황에서 에러 대신 null을 반환합니다.
  • stateIn : 1. scope, started, initialValue 인자를 모두 지정한 경우, 콜드 Flow를 지정된 코루틴 scope에서 시작되는 핫 StateFlow로 변환하여 업스트림 flow의 단일 실행 인스턴스에서 가장 최근에 내보낸 값을 다운스트림 구독자 여러 명과 공유합니다. 2. scope 인자만 지정한 경우, 지정된 scope에서 업스트림 flow를 시작하고, 첫 번째 값이 방출될 때까지 일시 중단하며, 향후 방출되는 핫 StateFlow를 반환하여 업스트림 flow의 실행 인스턴스에서 가장 최근에 방출된 값을 여러 다운스트림 구독자와 공유합니다.
  • take : 첫 번째 부터 count 수 만큼의 요소를 포함하는 flow를 반환합니다. count 수 만큼 소비되면 원래 flow는 취소됩니다.
  • takeWhile : 지정된 predicate를 만족하는 첫 번째 요소를 포함하는 flow를 반환합니다.
  • transform : 지정된 flow의 각 값에 인자로 주어진 transform 기능을 적용합니다.
  • transformLatest : 원래 flow가 값을 방출할 때마다 transform으로 요소를 생성하는 flow를 반환합니다. 원래 flow가 새 값을 방출하면 이전에 있던 transform 블록은 취소됩니다.
  • transformWhile : 이 함수가 true를 반환하는 동안 지정된 flow의 각 값에 transform 함수를 적용합니다.
  • withIndex : 값과 해당 인덱스(0부터 시작)를 포함하는 각 요소를 IndexedValue로 감싸는 flow를 반환합니다.
  • zip : 각 값 쌍에 적용된 제공된 transform 함수를 사용하여 other(인자로 주어진) flow와 함께 현재 flow(this)의 값을 압축합니다. 결과 flow 중 하나가 완료되는 즉시 flow가 완료되고 나머지 flow는 취소됩니다.

Inheritors

  • AbstractFlow : Flow의 상태 저장 구현을 위한 기본 클래스입니다. 또한 context 보존에 필요한 모든 프로퍼티를 요구하며, 프로퍼티를 위반할 경우 IllegalStateException을 발생시킵니다.
  • SharedFlow : 모든 콜렉터가 내보낸 값을 브로드캐스트 방식으로 공유하여 내보낸 값을 얻는 hot Flow 입니다. 활성화 상태의 인스턴스가 콜렉터의 존재와 독립적으로 존재하기 때문에 hot이라고 합니다. 이것은 flow { .. } 함수에 의해 정의된 것과 같은 일반적인 Flow와 반대 개념입니다. 일반 Flow는 cold에 속하며 각 콜렉터에 대해 개별적으로 시작됩니다.

Conclusion

코틀린의 코루틴 Flow에 대한 블로그 글을 이렇게 살펴보니 지원하는 메소드, 익스텐션이 많아서 비동기 프로그래밍 및 흐름에 대한 구현에 매우 유용하게 사용할 수 있을 것 같다는 느낌을 받았습니다. Rx에 비하면 함수의 개수가 적긴 하지만 웬만한 구현은 충분히 가능하다고 생각합니다.

읽어주셔서 감사합니다 🙌

--

--