Fandom Engineering
Published in

Fandom Engineering

Coroutines Channel

Photo by Rolf Erik Lekang on Unsplash

Transfering values

fun sendAndReceive() = runBlocking {
// create a channel
// Channel class implements
// ReceiveChannel and SendChannel interfaces
val channel = Channel<Int>()
// run a coroutine
launch {
repeat(3) {
// send a value to a channel
channel.send(it) // it's suspend so wait here
}
}
// 3 values have already sent to a channel
// so they can be received
repeat(3) {
// receive a value from a channel
val value = channel.receive() // it's suspend so wait here
}
}
fun iterateChannel() = runBlocking {
val channel = Channel<Int>()
launch {
repeat(3) {
channel.send(it)
}
channel.close()
// stop sending as soon as close token is received
}
// the channel was closed so a loop
// can iterate through finite number of values
for (value in channel) {
// do something with a value - 3 times
}
// without closing the channel, loop will never end
}

Producer-Consumer

fun producerConsumer() = runBlocking {
// produce builder create a ReceiveChannel
val producer: ReceiveChannel<Int> = produce {
repeat(3) {
send(it)
}
}

// consumer
producer.consumeEach {
// do something with a value - 3 times
}
}

Pipeline

// produce infinite (in terms of memory) stream of numbers
fun CoroutineScope.produceNumbers(
startValue: Int = 1
) = produce<Int> {
var counter = startValue
while (true) {
delay(25) // some time consuming work
send(counter)
counter++
}
}
// consume, process infinite stream and resend it
fun CoroutineScope.primeNumbersFilter(
numbers: ReceiveChannel<Int>, prime: Int
) = produce<Int> {
for (number in numbers) {
// filter the whole stream numbers divisible by prime
if (number % prime != 0) {
send(number)
}
}
}
fun pipeline() = runBlocking {
// 1, 2, 3, 4, 5 ... etc to infinity
var numbers = produceNumbers(startValue = 2)
repeat(5) {
val prime = numbers.receive()
numbers = primeNumbersFilter(numbers, prime)
}
// filtered stream result:
// prime = 2, filter: 3, 5, 7, 9, 11, 13 ...
// prime = 3, filter: 5, 7, 11, 13, 17, 19 ...
// prime = 5, filter: 7, 11, 13, 17, 19, 21 ...
// prime = 7, filter: 11, 13, 17, 19, 23, 29 ...
// prime = 11, filter: 13, 17, 19, 23, 29, 31 ...
// cancel the whole coroutine to make finish possible
coroutineContext.cancelChildren()
}

Fan

fun fanOut() = runBlocking<Unit> {
// stream of numbers with 25ms delay
val channel = produceNumbers()
val coroutineA = launch {
for (value in channel) {
// receive and consume a value
// coroutineB received: 1, 3, 5
}
}
val coroutineB = launch {
for (value in channel) {
// receive and consume a value
// coroutineB received: 2, 4
}
}
// allow to receive five values
delay(145)
// cancel a producer coroutine
// close its channel and stop iteration over the channel
channel.cancel()
}
fun fanIn() = runBlocking<Unit> {
val channel = Channel<String>()
val coroutineA = launch {
while (true) {
channel.send("coroutineA")
delay(10)
}
}
val coroutineB = launch {
while (true) {
channel.send("coroutineB")
delay(25)
}
}
// receive some values from any sender
repeat(6) {
val value = channel.receive()
// the result is:
// 10ms - coroutineA
// 20ms - coroutineA
// 25ms - coroutineB
// 30ms - coroutineA
// 40ms - coroutineA
// 50ms - coroutineB
}
coroutineContext.cancelChildren()
}
fun fifoOrder() = runBlocking<Unit> {
val channel = Channel<String>()
val coroutineA = launch {
while (true) {
var value = channel.receive()
value = "$value A"
delay(10)
channel.send(value)
}
}
val coroutineB = launch {
while (true) {
var value = channel.receive()
value = "$value B"
delay(10)
channel.send(value)
}
}
channel.send("start")
delay(50)
val result = channel.receive()
// result: start A B A B A
// coroutineA was called first, so received element first
// coroutineB was waiting, so received the second element first
// even if coroutineA was also to allowed to do that immediately
coroutineContext.cancelChildren()
}

Buffer

fun differentChannels() = runBlocking<Unit> {
// capacity = 1, RENDEZVOUS set as default, so they are the same
val unbuffered = Channel<Unit>()
val rendezvous = Channel<Unit>(RENDEZVOUS)
// capacity = 2
val explicitCapacity = Channel<Unit>(2)
// buffers the most recent element
val conflated = Channel<Unit>(CONFLATED)
// unlimited buffer, in practice buffer as many as possible
val unlimited = Channel<Unit>(UNLIMITED)
// buffers with default capacity = 64
val buffered = Channel<Unit>(BUFFERED)
}
fun unbufferedChannel() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
repeat(4) {
delay(10)
channel.send(it)
// a sender is faster than a receiver
// so wait for the receiver to send new one
}
}
repeat(4) {
delay(25)
val value = channel.receive()
}
// 10ms - send 0, 25ms receive 0
// 35ms - send 1, 50ms receive 1
// 60ms - send 2, 75ms receive 2
// 85ms - send 3, 100ms receive 3
coroutineContext.cancelChildren()
}
fun bufferedChannel() = runBlocking<Unit> {
val channel = Channel<Int>(3)
launch {
repeat(4) {
delay(10)
channel.send(it)
// a sender is faster than a receiver
// so wait for the receiver to send new one
}
}
repeat(4) {
delay(25)
val value = channel.receive()
}
// 10ms - send 0, 20ms - send 1
// 25ms - receive 0
// 30ms - send 2, 40ms - send 3
// 50ms - receive 1
// 75ms - receive 2
// 100ms - receive 3
coroutineContext.cancelChildren()
}

Hot and cold

fun consumeAsFlow() = runBlocking<Unit> {
val channel = Channel<Int>(3)

repeat(3) {
channel.send(it)
}

channel
.consumeAsFlow()
.onEach {
// consume a value from channel like you do from a flow
}
.launchIn(this)
delay(10) // allow to start
coroutineContext.cancelChildren()
}

Broadcast

fun broadcastChannel() = runBlocking {
val broadcast = BroadcastChannel<Int>(3)
val subscriberA = broadcast.openSubscription()
val subscriberB = broadcast.openSubscription()

repeat(3) {
broadcast.send(it)
}

repeat(3) {
subscriberA.receive()
// gets: 0, 1, 2
}
repeat(3) {
subscriberB.receive()
// gets: 0, 1, 2
}

delay(100) // allow to start
// cancel all subscribers - ReceiveChannel
broadcast.cancel()
}

Hot Flow

fun sharedFlow() = runBlocking {
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 2)
// replay - kind of buffer
val subscriberA = mutableSharedFlow.asSharedFlow() // SharedFlow
val subscriberB = mutableSharedFlow.asSharedFlow()
val subscriberC = mutableSharedFlow.asSharedFlow()
subscriberA
// never completes so cancel it at some point
.takeWhile { it != 100 }
.onEach {
// gets: 1, 2
}
.launchIn(this)
subscriberB
.takeWhile { it != 200 }
.onEach {
// gets: 1, 2, 100, 3
}
.launchIn(this)
mutableSharedFlow.tryEmit(1) // not suspending variant of emit
mutableSharedFlow.emit(2)
mutableSharedFlow.emit(100)
mutableSharedFlow.emit(3)
mutableSharedFlow.emit(200)
subscriberC
.takeWhile { it != 400 }
.onEach {
// gets: 3, 200, 4
// 3 and 200 because of replay number
}
.launchIn(this)
mutableSharedFlow.emit(4)
mutableSharedFlow.emit(300)
}
fun stateFlow() = runBlocking {
val mutableStateFlow = MutableStateFlow(0) // initial value
val subscriberA = mutableStateFlow.asStateFlow() // StateFlow
val subscriberB = mutableStateFlow.asStateFlow()
subscriberA
.takeWhile { it != 100 }
.onEach {
// gets 0, 1
}
.launchIn(this)
subscriberB
.takeWhile { it != 100 }
.onEach {
// gets 0, 1
}
.launchIn(this)
mutableStateFlow.emit(1)
mutableStateFlow.value = 100
}

Conclusion

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store