Coroutine의 핵심요소 이해하기 — Dispatchers
Coroutine System을 이루는 핵심 컴포넌트에는 대략 8가지가 있다. Coroutine을 잘 활용하기 위해서는 이러한 핵심 요소에 대해 충분히 이해해야 한다.
- CoroutineBuilder, CoroutineScope, CoroutineContext
- suspend function
- Dispatchers
- Job
- Flow
- Exception in Coroutine System
이번 글에서는 CoroutineDispatcher
에 대해서 정리해본다.
기본 개념
CoroutineDispatcher
는 Coroutine들을 적절하게 Thread에 할당하여 실행시키는 Scheduler 역할을 한다. Dispatcher 객체들은 각각의 특성에 맞게 Single Thread 혹은 Thread Pool을 소유하며, suspending Coroutine
들을 잠시 저장해두었다가 적절한 시점에 Thread에 dispatch하며 실행시켜준다.
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = foldCopies(coroutineContext, context, true)
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
launch
나 async
를 실행시킬 때 CoroutineDispatcher
를 지정하지않으면 기본적으로 Dispatchers.Default를 할당해준다.
내부 동작 원리
CoroutineScope.launch
부터 시작해서 CoroutineDispatcher가 어떻게 Coroutine의 실행에 관여하는지 알아보자.
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
Coroutine.launch
가 실행되면 lazy하게 실행한 케이스가 아니면 StandaloneCoroutine
이 생성된다
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
// ⬇
@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
// "start" 인자로 넘어온 CoroutineStart가 Function Type임
start(block, receiver, this)
}
}
public enum class CoroutineStart {
DEFAULT,
LAZY,
ATOMIC,
UNDISPATCHED;
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
}
StandaloneCoroutine
의 부모 객체 AbstractCoroutine
를 따라가보면 start 함수 구현을 확인할 수 있는데, 여기에서는 처음 StandaloneCoroutine
을 생성할 때 전달받은 CoroutineStart
함수를 호출한다.
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
@kotlin.SinceKotlin public fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(receiver: R, completion: kotlin.coroutines.Continuation<T>): kotlin.coroutines.Continuation<kotlin.Unit> { /* compiled code */ }
CoroutineStart
는 createCoroutineUnintercepted
함수를 이용하여 Continuation
을 생성하고, 이를 intercept 시킨다. createCoroutineUnintercepted
는 intrinsic function이다.
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
intercepted() 구현을 보면 ContinuationImpl
의 intercepted 함수를 다시 호출해주고 있다. launch
나 async
와 같은 coroutine builder를 호출하면, createCoroutineUnintercepted
intrinsic function을 통해 ContinuationImpl
객체를 생성한다. 이 ContinuationImpl
객체는 launch block
을 “completion” 이라는 이름의 Continuation으로 내부 멤버변수로 들고 있으면서, 하위 suspend function Continuation의 최종 root 역할을 한다.
// ContinuationImpl
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
ContinuationImpl
에서 context[ContinuationInterceptor]?.interceptContinuation(this)
를 보면, Coroutine을 실행시키면 CoroutineContext에서 ContinuationInterceptor를 구현하는 ContextElement
를 찾아서 interceptContinuation()을 실행시키는 것을 알 수 있다.
ContinuationInterceptor
의 대표적인 구현체는 CoroutineDispatcher
이며, 일반적으로 많이 쓰는 Dispatchers.IO
, Dispatchers.Default
모두 CoroutineDispatcher를 상속하고 있으므로 이 코드를 좀더 살펴본다.
CoroutineDispatcher
는 스레드를 이용하는 실행기 역할을 하므로 플랫폼에 종속적이다. 따라서 ContinuationInterceptor
는 kotlin-stdlib-common
에 정의되어있지만 이를 구현하는 구현체들을 플랫폼마다 다르며, 아래에서 살펴볼 코드들은 kotlinx-coroutine-core-jvm
의 코드들이다.
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/** @suppress */
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor,
{ it as? CoroutineDispatcher })
/**
* Returns a continuation that wraps the provided [continuation], thus intercepting all resumptions.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*/
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
위 코드를 보면 CoroutineDispatcher.interceptContinuation()
는 DispatchedContinuation
을 반환하며 원래의 Continuation을 wrapping해주고 있음을 알 수 있다.
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
...
}
// ⬇
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
}
// ⬇
internal actual typealias SchedulerTask = Task
// ⬇
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
constructor() : this(0, NonBlockingContext)
inline val mode: Int get() = taskContext.taskMode // TASK_XXX
}
DispatchedContinuation
이 무엇인지 부모를 따라가보면 Runnable
임을 알 수 있다! 즉, ContinuationImpl.intercepted()
호출을 통해 Coroutine을 안드로이드 Thread 실행 단위인 Runnable
로 변환한다.
// We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
// It is used only in Continuation<T>.resumeCancellableWith
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
그런 이후 DispatchedContinuation
의 resumeCancellableWith
이 호출된다(아까 CoroutineStart의 스택 프레임으로 되돌아감). 여기서 CoroutineDispatcher
를 통해 dispatch가 필요한지 아닌지 여부를 결정하고, dispatch가 필요없으면 현재의 event loop에 실행시키고 아니면 dispatcher에 dispatch 시킨다.
// CoroutineDispatcher
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
일반적으로 isDispatchNeeded는 true다. 의도적으로 dispatch를 거치지 않고 바로 Coroutine을 실행시키는 Unconfined나 Dispatchers.Main.immediate는 이를 false로 반환한다.
/**
* Dispatches execution of a runnable [block] onto another thread in the given [context].
* This method should guarantee that the given [block] will be eventually invoked,
* otherwise the system may reach a deadlock state and never leave it.
* Cancellation mechanism is transparent for [CoroutineDispatcher] and is managed by [block] internals.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*
* This method must not immediately call [block]. Doing so would result in [StackOverflowError]
* when [yield] is repeatedly called from a loop. However, an implementation that returns `false` from
* [isDispatchNeeded] can delegate this function to `dispatch` method of [Dispatchers.Unconfined], which is
* integrated with [yield] to avoid this problem.
*/
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
CoroutineDispatcher.dispatch
로직을 보려면 상속 객체들을 살펴봐야한다. CoroutineDispatcher의 기본값으로 쓰이는 Dispatchers.Default 구현을 따라가보자.
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
}
// ⬇
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
...
}
// ⬇
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = coroutineScheduler
// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
// dispatch의 동작을 CoroutineScheduler에 위임하고 있다.
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
}
// ⬇
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
...
}
// ⬇
public interface Executor {
void execute(Runnable command);
}
Dispatchers.Default
의 구현을 쭉 따라가보면 최종적으로 Executor
interface를 구현하는 Runnable
실행기임을 알 수 있다. CoroutineScheduler
의 구현 코드를 좀더 살펴보자.
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
@JvmField
val globalCpuQueue = GlobalQueue()
@JvmField
val globalBlockingQueue = GlobalQueue()
@JvmField
val workers = ResizableAtomicArray<Worker>(corePoolSize + 1)
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
internal inner class Worker private constructor() : Thread() {
init {
isDaemon = true
}
override fun run() = runWorker()
@JvmField
var mayHaveLocalTasks = false
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask(mayHaveLocalTasks)
// Task found. Execute and repeat
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
/*
* No tasks were found:
* 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
* Then its deadline is stored in [minDelayUntilStealableTask]
*
* Then just park for that duration (ditto re-scanning).
* While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
* excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
* it with "spinning via scans" mechanism.
* NB: this short potential parking does not interfere with `tryUnpark`
*/
if (minDelayUntilStealableTaskNs != 0L) {
if (!rescanned) {
rescanned = true
} else {
rescanned = false
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L
}
continue
}
/*
* 2) Or no tasks available, time to park and, potentially, shut down the thread.
* Add itself to the stack of parked workers, re-scans all the queues
* to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
*/
tryPark()
}
tryReleaseCpu(WorkerState.TERMINATED)
}
}
- 기본적으로 CoroutineScheduler는 1)
GlobalQueue
2)workers
를 멤버변수로 가지고 있다.GlobalQueue
는 worker에 배정되지않은 Task들을 임시 저장해두는 저장소이며, worker는CoroutineScheduler
가 생성 및 관리하는 스레드를 의미한다.Worker
도 내부에LocalQueue
라는 이름으로 Task를 저장하고 관리한다. CoroutineScheduler
의 dispatch가 실행되면 CurrentThread가 CoroutineScheduler에 속하는지 확인하고, 속하면 해당 Thread(Worker 내부의 Queue에 Task를 밀어넣고, 아니면 CoroutineScheduler의 GlobalQueue에 Task를 넣는다.
fun findTask(scanLocalQueue: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
// If we can't acquire a CPU permit -- attempt to find blocking task
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
return task ?: trySteal(blockingOnly = true)
}
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
/*
* Anti-starvation mechanism: probabilistically poll either local
* or global queue to ensure progress for both external and internal tasks.
*/
if (scanLocalQueue) {
val globalFirst = nextInt(2 * corePoolSize) == 0
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
} else {
pollGlobalQueues()?.let { return it }
}
return trySteal(blockingOnly = false)
}
3. CoroutineScheduler의 Worker는 내부적으로 무한루프를 돌리면서 Task를 하나씩 실행한다. 실행할 Task는 LocalQueue 혹은 GlobalQueue에서 하나씩 polling해오는데 어느 곳에서 가져올지는 내부 알고리즘이 결정한다.
결론
CoroutineDispatcher
는 Coroutine을 실행하는 실행기 역할을 한다. jvm
에서 CoroutineDispatcher
는 Executor
구현체이며, Runnable
을 Task
타입으로 변환하여 GlobalQueue
혹은 Worker.LocalQueue
에 저장해둔다. 이 내부 Queue에 저장된 Task들은 CoroutineDispatcher가 관리하는 스레드들이 주기적으로 polling 하면서 실행이 필요한 작업들을 처리하면서 동시처리가 구현되게 된다.
이것은 전형적인 Worker Thread Pattern 이다. Coroutine을 생성하는 Thread가 Cliend Thread로써 Coroutine을 생성하면 그 Coroutine들은 CoroutineDispatcher
라는 실행기에 전달되며, CoroutineDispatcher가 관리하는 Worker들이 Queue에 적재된 요청들을 하나씩 꺼내면서 처리하는 구조이다.