Coroutine의 핵심 요소 이해하기 - CoroutineBuilder, CoroutineScope, CoroutineContext

SmallAcorn
19 min readMar 26, 2023

--

Coroutine System을 이루는 핵심 컴포넌트에는 대략 8가지가 있다. Coroutine을 잘 활용하기 위해서는 이러한 핵심 요소에 대해 충분히 이해해야 한다.

  • CoroutineBuilder
  • CoroutineScope
  • CoroutineContext
  • suspend function
  • Job
  • Dispatchers
  • Flow
  • Exception in Coroutine System

이번 글에서는 CoroutineBuilder , CoroutineScope , CoroutineContext 에 대해서 정리해본다.

CoroutineBuilder

기본적으로 Coroutine는 CoroutineBuilder를 통해 생성된다. launch 혹은 async 처럼 특정 CoroutineScope의 interface를 통해 경우도 있고, runBlocking 를 이용해 builder 내부에서 CoroutineScope를 생성하여 Coroutine을 만드는 경우도 있으나 모든 경우 “CoroutineBuilder”를 이용하여 Coroutine을 생성한다고 보면 된다.

GlobalScope.launch {
// this is code block that the Coroutine represents
}

CoroutineBuilder를 통해 Coroutine을 생성하는 이유는 Coroutine은 프로그램 관점에서 보면, “상태를 가진 Call Tree”라 볼 수 있는데 그 Call Tree를 Coroutine Lifecycle 내에서 적절하기 관리하기 위함이다. Coroutine Lifecycle은 CoroutineScope와 관련있는 개념인데 여기서는 일단 넘어가자.

launch

`launch` 는 fire and forget용 coroutine builder이다. 특정 로직을 실행시키되 그 결과값을 받을 필요가 없는 경우 사용한다. 아래 코드를 보면 launch는 Job 이라는 객체를 반환한다. ( Job은 Coroutine State과 관련 있는데 별도로 설명)

public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

async

async 는 Coroutine으로 전달된 lambda의 결과값을 caller에 반환할 수 있다. `async`의 반환값을 보면 Deffered 인데 이 객체의 await() 함수를 호출함으로써 lambda의 반환값을 받을 수 있다.

public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}


// usecase
val resultJob = GlobalScope.async {
// some delay job
return 5
}

// 5
resultJob.await()

runBlocking

Coroutine 외부에서 suspend function을 사용하고 싶을 경우 쓸 수 있는 Builder이다. runBlocking을 호출시 eventLoop을 생성하여 lambda 실행이 완료될 때까지 현재의 thread를 block하는 Coroutine을 내부적으로 생성하고 실행한다.


@Throws(InterruptedException::class)
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
// 중략
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}

private class BlockingCoroutine<T>(
parentContext: CoroutineContext,
private val blockedThread: Thread,
private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true, true) {

@Suppress("UNCHECKED_CAST")
fun joinBlocking(): T {
registerTimeLoopThread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
@Suppress("DEPRECATION")
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
parkNanos(this, parkNanos)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
// now return result
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
}

CoroutineScope

Lifecycle에 따라 Coroutine의 상태를 관리하기 위한 객체이다. 아래와 같은 interface를 가지는데, `CoroutineContext`라는 Coroutine의 각 구성 요소들을 저장하는 Map을 속성으로 가지며, 이를 이용하여 Coroutine의 Lifecycle을 관리하게 된다.

public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}

Coroutine은 CoroutineBuilder를 통해 생성해야 하며, 이 CoroutineBuilder는 CoroutineScope의 extension으로 제공되므로, CoroutineScope가 Coroutine 실행환경(Dispatchers 등)을 전반적으로 정의하고 실행된 Coroutine들을 Lifecycle에 맞춰 관리해준다고 보면 된다.

CoroutineScope는 미리 구현되어있는 객체도 있고, Coroutine Builder를 통해 생성되는 객체도 있다.

GlobalScope

process lifecycle에 대응되는 CoroutineScope

Coroutine Builder

launchasync 같은 coroutine builder 호출시에도 CoroutineScope가 생성된다. 기본적으로 Job이 생성될 때, 그에 대응되는 CoroutineScope가 생성된다고 보면 된다.

public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

CoroutineScope.launch를 보면 내부적으로 StandaloneCoroutine 객체를 만들고 이를 start 시켜주는데, 이 StandaloneCoroutine의 정의를 보면 AbstractCoroutine이라는 CoroutineScope임을 알 수 있다.

private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}

public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

/**
* The context of this coroutine that includes this coroutine as a [Job].
*/
@Suppress("LeakingThis")
public final override val context: CoroutineContext = parentContext + this

/**
* The context of this scope which is the same as the [context] of this coroutine.
*/
public override val coroutineContext: CoroutineContext get() = context

...

}

CoroutineBuilder를 통해 생성된 Coroutine들은 부모 CoroutineScope의 Job을 자기 자신으로 대체하고, 그 외 CoroutineContext Element들은 복사한 자식 CoroutineScope를 생성하고, 그 CoroutineScope를 Job이라는 interface로 caller에 반환함을 알 수 있다.

Lifecycle도 결국 “상태"와 관련있는데, 그 상태를 Job이 관리하며 CoroutineScope은 Lifecycle에 따라 Job의 상태를 업데이트해주는 역할을 한다. 나중에 보겠지만 Job 이 child job들을 직접 관리하며 CoroutineScope의 Cancellation 전파에 핵심적인 역할을 한다.

withContext, coroutineScope

withContext()coroutineScope() 도 호출 시 새로운 CoroutineScope를 생성한다. 기본적으로는 부모 CoroutineScope의 CoroutineContext를 상속하는 CoroutineScope를 생성한다. coroutineScope은 lambda에서 새로운 동시성 처리를 구조화하고 싶을 때 활용하고, withContext()CoroutineDispatcher 를 오버라이드할 때 자주 사용함.

withContext 내부 로직을 보면 첫번 째 인자로 전달된 CoroutineContext 가 무엇이냐에 따라 다른 Coroutine을 생성한다.

  1. CoroutineContext에 변경이 없는 경우 ScopeCoroutine
  2. CoroutineDispatcher 가 아닌 다른 CoroutineContext가 전달된 경우 UndispatchedCoroutine
  3. CoroutineDispatcher 가 바뀐 경우 DispatchedCoroutine

각각의 Coroutine들이 무엇인지는 별도 딥다이브 글에서 정리해본다.

public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
// Copy CopyableThreadContextElement if necessary
val newContext = oldContext.newCoroutineContext(context)
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}

supervisorScope

기본적으로 Coroutine은 자식 Coroutine에서 Exception이 발생한 경우 상위 CoroutineScope에 속하는 모든 Coroutine들을 취소시킨다. supervisorScope은 자식 Coroutine에서 Exception이 발생하더라도 Cancellation을 다른 Coroutine에 전파하지 않는다.

CoroutineContext

Coroutine Component들을 관리하기 위한 Map인데 LinkedList로 구현되어있다. Coroutine System을 구성하는 각 Component들은 CoroutineContext.Element 를 구현하게 되며, Element의 key를 지정하는데 이 key가 각 요소들의 식별자 역할을 하게 된다. 이 CoroutineContext.Element의 구현체에는 Dispatcher, Job 등이 있음.

public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E?

public fun <R> fold(initial: R, operation: (R, Element) -> R): R

public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}

public fun minusKey(key: Key<*>): CoroutineContext

public interface Key<E : Element>

public interface Element : CoroutineContext {
public val key: Key<*>

public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null

public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)

public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}

CoroutineContext 들은 CoroutineContext 사이의 합(plus) 연산을 할 수 있는데, Key 값이 동일한 Element는 제거되어 하나의 유일한 Key만이 CoroutineContext에 존재함을 보장한다. 합연산을 통한 여러 CoroutineContext들은 CombinedContext라는 객체를 통해 마치 LinkedList처럼 구현된다.

internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {

override fun <E : Element> get(key: Key<E>): E? {
var cur = this
while (true) {
cur.element[key]?.let { return it }
val next = cur.left
if (next is CombinedContext) {
cur = next
} else {
return next[key]
}
}
}

CombinedContext 구현 로직을 보면 내부적인 동작 원리가 LinkedList와 매우 유사함을 알 수 있다. 예를 들어, 특정 key를 가진 Element를 조회하는 get 의 경우, 자기자신이 들고 있는 Element가 그 Key와 동일한지 체크하고, 동일하면 자신의 Element를 반환하고, 아니면 다음 CoroutineContext가 CombinedContext가 아닐때까지 동일한 연산을 반복한다.

--

--