[Kotlin][Flow] flatMap

equipment_room
6 min readNov 2, 2022

--

Photo by Amber Maxwell Boydell on Unsplash

코틀린 FlowflatMap 연산자들을 알아보자

flatMap 연산자들은 Flow<T> 에 transform 연산자를 적용하여 Flow<R> 로 변환한다. flatMap의 블럭의 반환 값은 Flow<R> 이다.

fun <T, R> Flow<T>.flatMapConcat(
transform: suspend (T) -> Flow<R>
): Flow<R>

fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (T) -> Flow<R>
): Flow<R>

inline fun <T, R> Flow<T>.flatMapLatest(
crossinline transform: suspend (T) -> Flow<R>
): Flow<R>

유한한 item을 발행하는 Flow에서의 flatMap 각각의 특성은 아래와 같다.

  • flatMapConcat발행 순서발행 개수를 보장한다.
  • flatMapMerge발행 개수를 보장하나 발행 순서는 보장 하지 않는다.
  • flatMapLatest발행 개수를 보장 하지 않는다.

flatMapConcat

flatMapConcatmap(transform).flattenConcat() 와 같고 버퍼 정책이 적용되지 않는다.

public fun <T, R> Flow<T>.flatMapConcat(
transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenConcat()

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value -> emitAll(value) }
}

flatMapLatest

flatMapLatest 은 버퍼 정책이 적용된다. 기본 64개의 버퍼 크기와 BufferOverflow.SUSPEND 정책이 적용된다.

새로운 값이 방출될때마다 previousFlow취소하는 것을 볼 수 있다. 그래서 발행 개수를 보장 하지 않는다.

public inline fun <T, R> Flow<T>.flatMapLatest(
@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R> = transformLatest { emitAll(transform(it)) }

public fun <T, R> Flow<T>.transformLatest(
@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = ChannelFlowTransformLatest(transform, this)

internal class ChannelFlowTransformLatest<T, R>(
private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) {
...
override suspend fun flowCollect(collector: FlowCollector<R>) {
...
var previousFlow: Job? = null
flow.collect { value ->
previousFlow?.apply {
cancel(ChildCancelledException())
join()
}
// Do not pay for dispatch here, it's never necessary
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
collector.transform(value)
}
}
...
}
}

flatMapMerge

flatMapMerge 는 기본 16개의 동시성을 지원하며 기본 64개의 버퍼 크기와 BufferOverflow.SUSPEND 정책이 적용된다. concurrency가 1로 설정되면 flattenConcat와 동일하다.

public val DEFAULT_CONCURRENCY: Int = 
systemProp(DEFAULT_CONCURRENCY_PROPERTY_NAME, 16, 1, Int.MAX_VALUE)

public fun <T> Flow<Flow<T>>.flattenMerge(
concurrency: Int = DEFAULT_CONCURRENCY
): Flow<T> {
require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
...

internal class ChannelFlowMerge<T>(
private val flow: Flow<Flow<T>>,
private val concurrency: Int,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {

flatMapMergeconcurrency를 조정해 확인해 보자

concurrency를 두배로 하면 완료 속도가 약 2배 빠름을 확인 할 수 있다.

--

--