코틀린의 코루틴 — 6. Channels

hongbeom
hongbeomi dev
Published in
10 min readJun 27, 2020

코루틴의 Channels에 대해 알아봅니다.

이 글은 공식 코루틴 가이드 링크의 내용을 기반으로 하여 작성하였습니다. 이 글을 작성할 당시 버전은 1.3.71입니다.

1. Coroutine Basic2. Cancellation and Timeouts3. Composing Suspending Functions4. Coroutine Context and Dispatchers5. Asynchronous Flow 1부6. Asynchronous Flow 2부7. Channels

Channels

Deffered 값은 코루틴 간에 단일 값을 편하게 전달할 수 있는 방법을 제공하지만 Channels은 stream의 값을 전달하는 방법을 제공합니다.

Channel basics

채널은 개념적으로 BlockingQueue와 매우 흡사합니다. 한 가지 중요한 차이점은 blocking의 put 작업 대신 suspending send가 존재하고, blocking의 take 작업 대신 suspending receive가 존재한다는 것입니다.

Closing and iteration over channels

큐와는 다르게 채널은 닫음으로써 더 이상 요소가 오지 않음을 표현할 수 있습니다. 수신하는 부분에선 일반 for 루프를 사용하여 채널로부터 요소를 수신하는 것이 편리합니다.

개념적으로 close는 채널에 특별한 close 토큰을 보내는 것과 같습니다. 이 close 토큰을 받는 즉시 반복이 중지되므로 close 되기 전에 이전에 보낸 모든 요소가 수신된다는 것을 보장합니다.

Building channel producers

코루틴에서 일련의 원소를 생성하는 패턴은 꽤 흔합니다. 이것은 concurrent 코드에서 흔히 발견할 수 있는 producer-consumer 패턴의 한 부분입니다. 우리는 이 producer를 추상화해서 채널을 매개 변수로 가지는 함수로 만들 수도 있겠지만, 이는 함수에서 결과가 반드시 되돌아와야 한다는 상식에 어긋납니다.

채널에선 생산자 측에서 바로 produce 할 수 있도록 하는 편리한 코루틴 빌더와 producer 측에서 for loop를 대체하여 사용할 수 있는 consumeEach라는 익스텐션 함수가 있습니다.

Pipelines

파이프라인은 한 코루틴이 무한대의 stream 값을 생성하는 패턴입니다. 예를 들면 다음과 같습니다.

fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 1부터 무한대로 값을 생성하는 코루틴
}

아래 코드에서 또 다른 코루틴은 그 stream을 소비하며 약간의 처리를 한 후 다른 결과를 만들어내고 있습니다.

fun CoroutineScope.square(
numbers: ReceiveChannel<Int>
): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}

main 코드에서 시작되어 전체 파이프라인을 연결하는 예제입니다.

CoroutineScope를 생성하는 모든 함수는 CoroutineScope의 확장 함수로 정의되므로, 우리는 애플리케이션에 global 코루틴이 남아있지 않도록 구조화된 동시성에 의존할 수 있습니다.

Prime numbers with pipeline

코루틴의 파이프라인을 사용하여 소수를 생성하는 예시를 보겠습니다.

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++)
}

다음 파이프라인에서 들어오는 숫자의 stream을 필터링하여 주어진 소수값으로 나누어 떨어지는 모든 숫자를 제거합니다.

fun CoroutineScope.filter(
numbers: ReceiveChannel<Int>, prime: Int
) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}

이제 우리는 2부터 일련의 숫자들로 시작하여 현재 채널에서 소수를 가져오고, 각각의 소수에 대해 새로운 파이프라인 단계를 시작함으로써 파이프라인을 구축할 수 있습니다.

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

아래 예제는 처음 10개의 소수를 출력하여 전체 파이프라인을 main 스레드 context에서 실행합니다. 모든 코루틴이 mainrunBlocking 코루틴에서 launch되기 때문에, 우리는 우리가 시작한 코루틴의 구체적인 리스트를 유지할 필요가 없습니다. 또한 cancelChildren 확장 함수를 사용하여 처음 10개의 소수값 출력 후 모든 하위 코루틴을 취소합니다.

표준 라이브러리의 iterator 코루틴 builder를 사용하여 동일한 파이프라인을 작성할 수 있다는 점에 유의합시다. produceiterator로 교체하고, yield와 함께 전달하여 next로 수신한 후 iteratorReceiveChannel 및 코루틴 스코프를 제거하면 됩니다. runBlocking도 필요하지 않을 것입니다. 하지만 위의 코드와 같이 채널을 사용하는 파이프라인의 장점은 Dispathers.Default context에서 실행하면 실제로 여러 CPU 코어를 사용할 수 있다는 것입니다.

어쨌든, 이 방법은 소수를 찾는 비효율적인 방법입니다. 실제로 파이프라인은 완전히 비동기인 produce와는 달리 임의의 정지를 허용하지 않기 때문에(remote 서비스에 대한 비동기 호출 같은 방법) 다른 일시 중단 호출이 포함되며 이러한 파이프라인은 sequence/iterator를 사용하여 구축할 수 없습니다.

Fan-out

여러 코루틴이 동일한 채널에서 수신되어 서로 간에 작업을 분배할 수 있습니다. 정수를 주기적으로 생산(초당 10개)하는 producer 코루틴부터 알아봅시다.

fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}

그럼 우리는 여러 개의 프로세서 코루틴을 얻을 수 있습니다. 이 예제에서는 id와 수신 번호만 출력합니다.

fun CoroutineScope.launchProcessor(
id: Int,
channel: ReceiveChannel<Int>
) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}

이제 5개의 프로세서를 launch하고 1초 동안 작동하도록 해보겠습니다. 어떤 일이 발생하는지 확인해보세요.

각 특정 정수를 수신하는 프로세서 id는 다를 수 있지만 출력은 전부 비슷할 것입니다.

producer 코루틴을 취소하면 채널이 닫히므로 결국 코루틴이 수행하는 채널에 대한 반복이 종료된다는 점에 유의하세요.

또한 launchProcessor 코드에서 fan-out을 수행하기 위해 for loop로 채널을 통해 명시적으로 반복하는 방법 유의해야합니다. consumeEach와는 달리 루프 패턴은 여러 코루틴에서 사용할 때 완벽하게 안전합니다. 프로세서 코루틴 중 하나가 고장나더라도 다른 프로세서는 채널을 처리하며, 각 프로세서는 항상 정상적으로 또는 비정상적으로 완료된 기본 채널을 소비(취소)합니다.

Fan-in

여러 코루틴이 동일한 채널로 전송될 수 있습니다. 예를 들어, string 타입의 채널에 delay을 사용하여 문자열을 이 채널에 반복적으로 전송하는 suspend 함수를 사용해보겠습니다.

suspend fun sendString(
channel: SendChannel<String>,
s: String,
time: Long
) {
while (true) {
delay(time)
channel.send(s)
}
}

만약 우리가 코루틴을 보내는 몇 개의 코루틴을 launch 한다면 어떤 일이 일어날까요?

Buffered channels

지금까지 살펴본 채널은 buffer가 없었습니다. 버퍼링되지 않은 채널은 송신자와 수신자가 서로 만날 때 요소들을 전송합니다. 전송이 먼저 호출되면 수신은 호출될 때까지 일시 중단되고, 수신 호출이 먼저 호출될 경우 전송이 호출될 때까지 일시 중단됩니다.

Channel() factory 함수와 produce builder 모두 버퍼 크기를 지정하기 위해 선택적으로 capacity라는 매개 변수를 사용합니다. 버퍼는 송신자가 일시 중단하기 전까지 여러 요소를 전송할 수 있도록 하는데, 이는 버퍼가 가득 차면 차단되는 지정된 용량의 BlockingQueue와 유사합니다.

다음 코드를 살펴보겠습니다.

4개의 크기로 버퍼링된 채널을 사용하였기 때문에 “sending”을 다섯 번 출력합니다. 처음 네 개의 요소는 버퍼에 추가되고 다섯 번째 요소를 전송하려고 할 때 송신이 중단됩니다.

Channels are fair

채널로의 송수신 작업은 여러 코루틴에서 호출하는 순서에 따라 공정하게 이루어집니다. 즉, 선착순으로 이루어지는데 예를 들자면 수신 작업을 호출하는 첫 번째 코루틴이 그 요소를 얻게 되는 것입니다. 다음 예제에서는 두 개의 코루틴의 “ping”과 “pong”이 공유 “table” 채널로부터 ball을 수신하고 있습니다.

"ping” 코루틴이 먼저 시작되어 공을 받는 첫 번째 코루틴이 됩니다. “ping” 코루틴은 ball을 테이블로 돌려보낸 후 즉시 다시 받기 시작하지만, ball“pong” 코루틴에 의해 수신됩니다.

Ticker channels

Ticker 채널은 이 채널의 마지막 consume 이후 주어진 delay를 통과할 때마다 Unit을 생산하는 특별한 약속같은 채널입니다. 이것은 쓸모없는 독립형 채널로 보일수도 있겠지만, 복잡한 시간 기반 produce 파이프라인과 윈도우 설정 및 기타 시간 단축 처리를 수행하는 연산자를 만드는 데 유용한 빌딩 block입니다. select를 이용하여 Ticker 채널을 사용했을 때 “on tick” 작업을 수행할 수 있다.

이런 채널을 생성하려면 팩토리 메서드 ticker를 사용하면 됩니다. 더 이상의 요소가 필요하지 않음을 표현하려면 해당 요소에서 ReceiveChannel.cancel 메소드를 사용하면 됩니다.

이제 예제를 살펴보겠습니다.

ticker는 consumer의 일시 중지를 인지하고 있으며, 기본적으로 정지 작업 발생 시 다음 생산 요소 지연을 조정하여 생산된 요소의 고정 속도를 유지하려고 시도합니다.

옵션으로 TickerMode.FIXED_DELAY와 동일한 mode 매개변수를 지정하여 요소들 사이의 고정된 지연을 유지할 수 있습니다.

읽어주셔서 감사합니다!🙌

다음 글에선 Exception Handling이라는 주제로 이어서 작성하겠습니다.

--

--