Coroutine의 핵심요소 이해하기 — Dispatchers

SmallAcorn
30 min readAug 6, 2023

--

Coroutine System을 이루는 핵심 컴포넌트에는 대략 8가지가 있다. Coroutine을 잘 활용하기 위해서는 이러한 핵심 요소에 대해 충분히 이해해야 한다.

이번 글에서는 CoroutineDispatcher에 대해서 정리해본다.

기본 개념

CoroutineDispatcher는 Coroutine들을 적절하게 Thread에 할당하여 실행시키는 Scheduler 역할을 한다. Dispatcher 객체들은 각각의 특성에 맞게 Single Thread 혹은 Thread Pool을 소유하며, suspending Coroutine들을 잠시 저장해두었다가 적절한 시점에 Thread에 dispatch하며 실행시켜준다.

@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = foldCopies(coroutineContext, context, true)
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}

launchasync를 실행시킬 때 CoroutineDispatcher를 지정하지않으면 기본적으로 Dispatchers.Default를 할당해준다.

내부 동작 원리

CoroutineScope.launch 부터 시작해서 CoroutineDispatcher가 어떻게 Coroutine의 실행에 관여하는지 알아보자.

public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

Coroutine.launch 가 실행되면 lazy하게 실행한 케이스가 아니면 StandaloneCoroutine이 생성된다


private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}

// ⬇
@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
// "start" 인자로 넘어온 CoroutineStart가 Function Type임
start(block, receiver, this)
}
}
public enum class CoroutineStart {
DEFAULT,
LAZY,
ATOMIC,
UNDISPATCHED;


@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
}

StandaloneCoroutine 의 부모 객체 AbstractCoroutine 를 따라가보면 start 함수 구현을 확인할 수 있는데, 여기에서는 처음 StandaloneCoroutine을 생성할 때 전달받은 CoroutineStart 함수를 호출한다.

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
@kotlin.SinceKotlin public fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(receiver: R, completion: kotlin.coroutines.Continuation<T>): kotlin.coroutines.Continuation<kotlin.Unit> { /* compiled code */ }

CoroutineStartcreateCoroutineUnintercepted 함수를 이용하여 Continuation 을 생성하고, 이를 intercept 시킨다. createCoroutineUnintercepted 는 intrinsic function이다.

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

public override val context: CoroutineContext
get() = _context!!

@Transient
private var intercepted: Continuation<Any?>? = null

public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}

intercepted() 구현을 보면 ContinuationImpl 의 intercepted 함수를 다시 호출해주고 있다. launchasync 와 같은 coroutine builder를 호출하면, createCoroutineUnintercepted intrinsic function을 통해 ContinuationImpl 객체를 생성한다. 이 ContinuationImpl 객체는 launch block을 “completion” 이라는 이름의 Continuation으로 내부 멤버변수로 들고 있으면서, 하위 suspend function Continuation의 최종 root 역할을 한다.

// ContinuationImpl
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

ContinuationImpl에서 context[ContinuationInterceptor]?.interceptContinuation(this) 를 보면, Coroutine을 실행시키면 CoroutineContext에서 ContinuationInterceptor를 구현하는 ContextElement를 찾아서 interceptContinuation()을 실행시키는 것을 알 수 있다.

ContinuationInterceptor의 대표적인 구현체는 CoroutineDispatcher이며, 일반적으로 많이 쓰는 Dispatchers.IO, Dispatchers.Default 모두 CoroutineDispatcher를 상속하고 있으므로 이 코드를 좀더 살펴본다.

CoroutineDispatcher 는 스레드를 이용하는 실행기 역할을 하므로 플랫폼에 종속적이다. 따라서 ContinuationInterceptorkotlin-stdlib-common 에 정의되어있지만 이를 구현하는 구현체들을 플랫폼마다 다르며, 아래에서 살펴볼 코드들은 kotlinx-coroutine-core-jvm 의 코드들이다.

public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

/** @suppress */
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor,
{ it as? CoroutineDispatcher })


/**
* Returns a continuation that wraps the provided [continuation], thus intercepting all resumptions.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*/
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)

위 코드를 보면 CoroutineDispatcher.interceptContinuation()DispatchedContinuation 을 반환하며 원래의 Continuation을 wrapping해주고 있음을 알 수 있다.

internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {

...
}

// ⬇

internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
}

// ⬇

internal actual typealias SchedulerTask = Task

// ⬇

internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
constructor() : this(0, NonBlockingContext)
inline val mode: Int get() = taskContext.taskMode // TASK_XXX
}

DispatchedContinuation이 무엇인지 부모를 따라가보면 Runnable임을 알 수 있다! 즉, ContinuationImpl.intercepted() 호출을 통해 Coroutine을 안드로이드 Thread 실행 단위인 Runnable로 변환한다.

// We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
// It is used only in Continuation<T>.resumeCancellableWith
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}

그런 이후 DispatchedContinuationresumeCancellableWith 이 호출된다(아까 CoroutineStart의 스택 프레임으로 되돌아감). 여기서 CoroutineDispatcher 를 통해 dispatch가 필요한지 아닌지 여부를 결정하고, dispatch가 필요없으면 현재의 event loop에 실행시키고 아니면 dispatcher에 dispatch 시킨다.

// CoroutineDispatcher
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

일반적으로 isDispatchNeeded는 true다. 의도적으로 dispatch를 거치지 않고 바로 Coroutine을 실행시키는 Unconfined나 Dispatchers.Main.immediate는 이를 false로 반환한다.

/**
* Dispatches execution of a runnable [block] onto another thread in the given [context].
* This method should guarantee that the given [block] will be eventually invoked,
* otherwise the system may reach a deadlock state and never leave it.
* Cancellation mechanism is transparent for [CoroutineDispatcher] and is managed by [block] internals.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*
* This method must not immediately call [block]. Doing so would result in [StackOverflowError]
* when [yield] is repeatedly called from a loop. However, an implementation that returns `false` from
* [isDispatchNeeded] can delegate this function to `dispatch` method of [Dispatchers.Unconfined], which is
* integrated with [yield] to avoid this problem.
*/
public abstract fun dispatch(context: CoroutineContext, block: Runnable)

CoroutineDispatcher.dispatch 로직을 보려면 상속 객체들을 살펴봐야한다. CoroutineDispatcher의 기본값으로 쓰이는 Dispatchers.Default 구현을 따라가보자.

public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
}

// ⬇

internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
...
}

// ⬇

internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {

override val executor: Executor
get() = coroutineScheduler

// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()

private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

// dispatch의 동작을 CoroutineScheduler에 위임하고 있다.
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)

}

// ⬇

internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
...
}

// ⬇

public interface Executor {
void execute(Runnable command);
}

Dispatchers.Default 의 구현을 쭉 따라가보면 최종적으로 Executor interface를 구현하는 Runnable 실행기임을 알 수 있다. CoroutineScheduler의 구현 코드를 좀더 살펴보자.

internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {

@JvmField
val globalCpuQueue = GlobalQueue()
@JvmField
val globalBlockingQueue = GlobalQueue()


@JvmField
val workers = ResizableAtomicArray<Worker>(corePoolSize + 1)

override fun execute(command: Runnable) = dispatch(command)

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}


private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }


internal inner class Worker private constructor() : Thread() {
init {
isDaemon = true
}


override fun run() = runWorker()

@JvmField
var mayHaveLocalTasks = false

private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask(mayHaveLocalTasks)
// Task found. Execute and repeat
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
/*
* No tasks were found:
* 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
* Then its deadline is stored in [minDelayUntilStealableTask]
*
* Then just park for that duration (ditto re-scanning).
* While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
* excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
* it with "spinning via scans" mechanism.
* NB: this short potential parking does not interfere with `tryUnpark`
*/
if (minDelayUntilStealableTaskNs != 0L) {
if (!rescanned) {
rescanned = true
} else {
rescanned = false
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L
}
continue
}
/*
* 2) Or no tasks available, time to park and, potentially, shut down the thread.
* Add itself to the stack of parked workers, re-scans all the queues
* to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
*/
tryPark()
}
tryReleaseCpu(WorkerState.TERMINATED)
}

}
  1. 기본적으로 CoroutineScheduler는 1) GlobalQueue 2) workers 를 멤버변수로 가지고 있다. GlobalQueue 는 worker에 배정되지않은 Task들을 임시 저장해두는 저장소이며, worker는 CoroutineScheduler가 생성 및 관리하는 스레드를 의미한다. Worker 도 내부에 LocalQueue 라는 이름으로 Task를 저장하고 관리한다.
  2. CoroutineScheduler 의 dispatch가 실행되면 CurrentThread가 CoroutineScheduler에 속하는지 확인하고, 속하면 해당 Thread(Worker 내부의 Queue에 Task를 밀어넣고, 아니면 CoroutineScheduler의 GlobalQueue에 Task를 넣는다.
fun findTask(scanLocalQueue: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
// If we can't acquire a CPU permit -- attempt to find blocking task
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
return task ?: trySteal(blockingOnly = true)
}

private fun findAnyTask(scanLocalQueue: Boolean): Task? {
/*
* Anti-starvation mechanism: probabilistically poll either local
* or global queue to ensure progress for both external and internal tasks.
*/
if (scanLocalQueue) {
val globalFirst = nextInt(2 * corePoolSize) == 0
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
} else {
pollGlobalQueues()?.let { return it }
}
return trySteal(blockingOnly = false)
}

3. CoroutineScheduler의 Worker는 내부적으로 무한루프를 돌리면서 Task를 하나씩 실행한다. 실행할 Task는 LocalQueue 혹은 GlobalQueue에서 하나씩 polling해오는데 어느 곳에서 가져올지는 내부 알고리즘이 결정한다.

결론

CoroutineDispatcher 는 Coroutine을 실행하는 실행기 역할을 한다. jvm에서 CoroutineDispatcherExecutor 구현체이며, RunnableTask 타입으로 변환하여 GlobalQueue 혹은 Worker.LocalQueue 에 저장해둔다. 이 내부 Queue에 저장된 Task들은 CoroutineDispatcher가 관리하는 스레드들이 주기적으로 polling 하면서 실행이 필요한 작업들을 처리하면서 동시처리가 구현되게 된다.

이것은 전형적인 Worker Thread Pattern 이다. Coroutine을 생성하는 Thread가 Cliend Thread로써 Coroutine을 생성하면 그 Coroutine들은 CoroutineDispatcher 라는 실행기에 전달되며, CoroutineDispatcher가 관리하는 Worker들이 Queue에 적재된 요청들을 하나씩 꺼내면서 처리하는 구조이다.

--

--