Coroutine의 핵심 요소 이해하기 — suspend

SmallAcorn
21 min readJul 29, 2023

--

Coroutine System을 이루는 핵심 컴포넌트에는 대략 8가지가 있다. Coroutine을 잘 활용하기 위해서는 이러한 핵심 요소에 대해 충분히 이해해야 한다.

이번 글에서는 suspend에 대해서 정리해본다.

Preemptive Multitasking vs Non-preemptive Multitasking

suspend의 동작에 대한 설명으로 바로 들어가기전에, Concurrency를 구현하는 두 가지 방법에 대해서 이야기해보자.

Preemptive Multitasking(선점형 멀티태스킹)은 운영체제가 각 루틴에 대한 제어권과 우선순위를 통제하며, 루틴 자체는 리소스에 대한 제어권이 없는 형태로 리소스 할당 및 배분을 전적으로 운영체제가 관리하는 형태의 멀티태스킹을 의미한다.

Non-Preemptive Multitasking(비선점형 멀티태스킹)은 Application이 각 루틴의 진행/일시정지에 대한 제어권을 갖고 각각 적절한 시점에 Computing resource를 다른 루틴에 넘겨줄 수 있는 형태의 멀티태스킹이다.

Suspend function is “suspendable” function

Kotlin Coroutine은 Application level에서 비동기처리를 하며 비선점형 멀티태스킹형태로 리소스 배분을 구현하는데, 이때 각각의 Coroutine이 어느 시점에 다른 Coroutine에 Computing resource를 양보할 것인지를 표시해주는 식별자가 `suspend` 이다.

Suspend function is translated to CPS(Continuation Passing Style) callback by compiler

suspend function은 마치 마법과 같이 비동기실행이 완료될때까지 기다렸다가 비동기작업이 완료된 이후 로직이 재개되는 것처럼 보인다. 하지만 이것도 내부 동작원리를 들여다 보면 과거의 비동기 처리 방식과 크게 다르지않은 Callback Style인데, 이를 Compiler가 처리해줘서 마법처럼 보이는 것이다.

suspend function은 Compiler에 의해 각각 Continuation이라는 State Machine으로 변경되며, suspension point가 Continuation의 각 상태를 결정 짓는다( 어느 지점에서 로직 실행을 재개할 것인지를 Continuation 내부에서 상태값으로 판단함)

fun main() {
scope.launch {
startSuspendFun1()
}

}

suspend fun startSuspendFun1() {
delay(100L) // label 0
println("AsyncJob finished") // label 1
}

예를 들어 위의 startSuspendFun1라는 suspend function은 Compiler에 의해 label 0, 1의 상태를 갖는 Continuation으로 변경된다. 그리고 이 startSuspendFun1 Continuation은 내부의 suspend function(delay)로부터 생성된 Continuation에 전달되며, 전달되기전에 label이라는 상태가 +1로 업데이트 된다.

startSuspendFun1 Continuation은 내부 suspend function(delay)에서 필요한 시점에 다시 resume되고, 상태는 업데이트되었으므로 다음 로직으로 비동기적 실행이 가능한 것이다.

      
@Nullable
public static final Object startSuspendFun1(@NotNull Continuation var0) {

Object $continuation = null
// 파라미터로 전달된 var0가 undefinedtype이면 $continuation에 할당.
label20: {
if (var0 instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)var0;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}

// 전달된 continuation이 없으면 새로 생성
$continuation = ...

}



Object $result = ((<undefinedtype>)$continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (((<undefinedtype>)$continuation).label) {
case 0:
((<undefinedtype>)$continuation).label = 1;
// delay 함수를 실행하며, continuation 전달
if (DelayKt.delay(100L, (Continuation)$continuation) == var4) {
return var4;
}
break;
case 1:
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}

// 중략
// println 로직

suspend function을 호출할다고 해서 무조건 로직이 suspend되진 않는다. 컴파일된 bytecode를 보면 suspend function의 반환값이 COROUTINE_SUSPENDED 이면 현재의 실행흐름을 종료(suspend)하고, 추후 Continuation resume을 통해서 실행을 재개하는데, 반환값이 COROUTINE_SUSPENDED가 아닌 경우는 실행흐름의 종료업 없이 일반 함수처럼 로직 실행이 연속적으로 진행된다.

COROUTINE_SUSPENDED가 어떻게 반환되는지 내부코드를 보면 좋을 것 같은데, Intrinsic이라 상세한 코드를 확인하기 한계가 있다(컴파일러가 직접 코드를 생성함: 참고).

/**
* This value is used as a return value of [suspendCoroutineUninterceptedOrReturn] `block` argument to state that
* the execution was suspended and will not return any result immediately.
*
* **Note: this value should not be used in general code.** Using it outside of the context of
* `suspendCoroutineUninterceptedOrReturn` function return value (including, but not limited to,
* storing this value in other properties, returning it from other functions, etc)
* can lead to unspecified behavior of the code.
*/
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED

@SinceKotlin("1.3")
@InlineOnly
@Suppress("UNUSED_PARAMETER", "RedundantSuspendModifier")
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}

그렇다면 suspendCoroutineUninterceptedOrReturn 함수 내에서 다른 스레드 전환이 있다면 continuation.resume 호출 시 본 스레드로 어떻게 돌아오는 걸까?

// Decompiled code of byte code of suspendCoroutineUninterceptedOrReturn 
SafeContinuation var2 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
Continuation cont = (Continuation)var2;
int var4 = false;

suspendCoroutineUnInterceptedOrReturn 함수를 decompile해보면 IntrinsicsKt.intercepted 를 호출하며 Continuation을 한번더 감싸주고 있음을 수 있다. 이 인터셉트는 이후 Dispatcher 편에서 좀더 알아볼텐데, 위 함수를 통해 resume 시점에 본래 코루틴의 Dispatcher에 밀어넣어진다.

결론

suspend function은 Coroutine에서 비동기적으로 실행되어야하는 함수들을 Compiler에게 알려주는 식별자이며, Compiler는 suspend function들을 Continuation이라는 State machine으로 변환한 후, 내부 suspend function의 호출 흐름에 Continuation을 전달하고 필요한 시점에 resume시켜서 실행흐름을 제어하는 원리이다.

여기서 Coroutine을 “일시정지”했다가 다시 실행시키려면 그 Coroutine의 Call Stack 상태들을 어딘가에 잠시 저장되었다가 필요한 시점에 다시 실행이 트리거되어야하는데, 그 Call Stack들은 모두 Continuation 내에 저장되며 다른 스레드에서 적절한 시점에 continuation.resume() 을 적절하게 호출해주는 형태로 필요한 시점에 실행이 재개된다.

번외: 위의 예제 코드에서 나온 Delay는 어떤 방식으로 suspend를 하고 있을까?

위에서 살펴보았듯, 실행흐름을 실제로 정지하기 위해서는 suspend function의 반환값이 COROUTINE_SUSPENDED 이어야 한다. delay 함수는 어느시점에 COROUTINE_SUSPENDED 가 반환되고 지연연산이 스케쥴링되는걸까?

public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}

public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
/*
* For non-atomic cancellation we setup parent-child relationship immediately
* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
* properly supports cancellation.
*/
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}


delay 내부로직을 보면 내부적으로 suspendCancellableCoroutine 을 호출하고 있는데, suspendCancellableCoroutine 는 내부적으로 CancellableContinuationImpl 를 생성하고, CancellableContinuationImpl.getResult() 를 호출하여 결과값을 계산한다.

@PublishedApi
internal open class CancellableContinuationImpl<in T>(
final override val delegate: Continuation<T>,
resumeMode: Int
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {

@PublishedApi
internal fun getResult(): Any? {
val isReusable = isReusable()
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
// or we got async cancellation from parent.
if (trySuspend()) {

if (parentHandle == null) {
installParentHandle()
}

if (isReusable) {
releaseClaimedReusableContinuation()
}
return COROUTINE_SUSPENDED
}

// 중략
}

getResult() 함수를 보면 trySuspend() 의 결과값에 따라서 COROUTINE_SUSPENDED 를 반환해주고 있다. 이러한 cancellable.getResult()호출은 suspendCoroutineUninterceptedOrReturn 에 전달된 lambda를 실행한 이후 호출되며, 이 lambda에서 Delay.scheduleResumeAfterDelay 호출을 통해 해당 Coroutine의 스케쥴링 및 실행 재개를 처리해주고 있다.

Delay.scheduleResumeAfterDelay 의 동작을 좀더 살펴보면,

// Delay.kt
public interface Delay {

/** @suppress **/
@Deprecated(
message = "Deprecated without replacement as an internal method never intended for public use",
level = DeprecationLevel.ERROR
) // Error since 1.6.0
public suspend fun delay(time: Long) {
if (time <= 0) return // don't delay
return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) }
}

public fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>)

public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
}


internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

Delay는 interface인데, 이를 CoroutineElement가 적절하게 구현하여 scheduling을 하고 있다. delay 함수에서 참조되는 CoroutineContext.delay extension은 CoroutineContext에 ContinuationInterceptor가 있는 경우 그것을 반환하고, 아니면 DefaultDelay 객체를 반환. 즉, delay 기능의 구현 책임은 ContinuationInterceptor가 가지고 있을 것임을 예상해볼 수 있음.

public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

그리고 CoroutineDispatcher는 ContinuationInterceptor를 구현하고 있음. 즉, Scheduling을 담당하는 Dispatcher의 구현을 보면 delay가 어떤 방식으로 구현되는지 상세히 알 수 있다.

internal actual val DefaultDelay: Delay = initializeDefaultDelay()

private fun initializeDefaultDelay(): Delay {
// Opt-out flag
if (!defaultMainDelayOptIn) return DefaultExecutor
val main = Dispatchers.Main
/*
* When we already are working with UI and Main threads, it makes
* no sense to create a separate thread with timer that cannot be controller
* by the UI runtime.
*/
return if (main.isMissing() || main !is Delay) DefaultExecutor else main
}

현재 실행되는 CoroutineContext에 ContinuationInterceptor가 없는 경우, DefaultDelay를 이용하는데, DefaultDelay 는 1) Delay interface를 구현한 MainDispatcher가 있는 경우 그것을 이용하고 2) 없는 경우 DefaultExecutor를 이용한다.

internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"

init {
incrementUseCount() // this event loop is never completed
}

@Suppress("ObjectPropertyName")
@Volatile
private var _thread: Thread? = null

override val thread: Thread
get() = _thread ?: createThreadSync()

override fun run() {
ThreadLocalEventLoop.setEventLoop(this)
registerTimeLoopThread()
try {
var shutdownNanos = Long.MAX_VALUE
if (!notifyStartup()) return
while (true) {
Thread.interrupted() // just reset interruption flag

// queue에 넣은 다양한 Task들을 확인하며 처리
// processNextEvent는 EventLoopImplBase에 구현되어있음.
var parkNanos = processNextEvent()
// 중략
}
} finally {
// 중략
}
}

// 중략

}
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)

// Allocated only only once
private val _delayed = atomic<DelayedTaskQueue?>(null)

// DelayedResumeTask를 생성하여 DelayedTaskQueue에 적재
public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
continuation.disposeOnCancellation(task)
schedule(now, task)
}
}
}

override fun processNextEvent(): Long {

val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
// 각 DelayedResumeTask의 target time과 비교하여, 목표한 시간에 도달했으면 queue에서 제거하고 실행
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
// then process one event from queue
val task = dequeue()
if (task != null) {
task.run()
return 0
}
return nextTime
}

public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)

internal class DelayedTaskQueue(
@JvmField var timeNow: Long
) : ThreadSafeHeap<DelayedTask>()
}

DefaultExecutor 는 하나의 Thread를 실행하고, 내부 DelayedTaskQueue 에 적재된 모든 Task들을 나노초단위로 모두 확인하고, 실행시간이 다된 Task들을 실행하는 형태다. 이 Task의 실행 block에 Continuation.resume이 호출되어 지연 — 재실행이 되는 구조. CoroutineDispatcher들도 비슷한 원리로 구현되었지않을까 싶다.

--

--