From Zero To Hero Series ( Kotlin Sequences, Channels, Flows ) — Part 2— Channels

ömer iyiöz
DigiGeek
Published in
7 min readJun 27, 2021

--

Flows are cold streams, Channels are hot streams.

Flows are cold streams similar to sequences — the code inside a flow builder does not run until the flow is collected(until a terminal operator such as collect() function is called).

Flow can be used in a fully-reactive programming style. If you've used something like RxJava before, Flow provides similar functionality.

In channel, emittor emits items at its own pace, even if there is no collector.

Shared mutable state

Writing code with a shared mutable state is known to be quite difficult and error-prone. Sharing information by communication instead of sharing information using the common mutable state tries to simplify this. Coroutines can communicate with each other via channels.

Channels

https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/08_Channels

There are 4 types of channels :

  • Channel.RENDEZVOUS
  • Channel.BUFFERED
  • Channel.UNLIMITED
  • Channel.CONFLATED

When you create a channel, you specify its type or the buffer size (if you need a buffered one) :

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

The default channel type is Rendezvous. If you don’t specify the channel type explicitly, the channel type is Rendezvous.

Channels are like a pipe. You can send information in to one end and receive information from the other end.

A Channel is conceptually very similar to BlockingQueue. One key difference is that instead of a blocking put operation it has a suspending send, and instead of a blocking takeoperation it has a suspending receive.

Thus you can think a channel as a nonblocking queue.

Channels have 2 methods:

  • send()
  • receive()

Both send() and receive() methods are suspending methods. These methods can be called only inside coroutines.

There are 3 interfaces:

  • Channel (extends both SendChannel and ReceiveChannel)
  • SendChannel
  • ReceiveChannel

If you have a Channel object you can both send and receive data. If you have a SendChannel object you can only send data via this channel. If you have a ReceiveChannel object you can only receive data via this channel.

Rendezvous Channels

Here is the meaning of rendezvous i take from merriam-webster:

A rendezvous channel has no buffer.

The sending coroutine suspends until a receiver coroutine invokes receive on the channel. Similarly, a consuming coroutine suspends until a producer coroutine invokes send on the channel. We create a rendezvous channel using the default Channel constructor with no arguments.

The “Rendezvous” channel is a channel without a buffer; it’s the same as creating a buffered channel with zero size. One of the functions (send or receive) always gets suspended until the other is called. If the send function is called and there's no suspended receive call ready to process the element, then send suspends. Similarly, if the receive function is called and the channel is empty - or in other words, there's no suspended send call ready to send the element - the receive call suspends. The "rendezvous" name ("a meeting at an agreed time and place") refers to the fact that send and receive should "meet on time".

Rendezvous Channel example 1

In this example, we have 2 sender coroutines and 1 receiver coroutine. I explained how this code works.

  • 1st coroutine tries to send “A1” to channel by calling channel.send(“A1”). send() method suspends the coroutine since there is no receiver coroutine yet.
  • 2nd coroutine tries to send “B1” to channel by calling channel.send(“B1”). send() method suspends the coroutine since there is no receiver coroutine yet.
  • 3rd coroutine tries to receive values from channel if any sender coroutine waits. 1st coroutine and 3rd coroutine meets, 1st coroutine sends the value to channel and 3rd coroutine takes the value from channel. 1st coroutine is no longer suspending and waits its turn for working.
  • 3rd coroutine continue to work and meets with 2nd coroutine. Thus, 2nd coroutine sends the value to channel and 3rd coroutine takes the value from channel. 2nd coroutine is no longer suspending and waits its turn for working.
  • 3rd coroutine continue to work. However, this time, when channel.receive() is called, no coroutine waits for sending some value to channel, there is no suspending send() method exists. Thus channel.receive() suspends 3rd coroutine.
  • Now it’s 1st coroutine’s turn. channel.send(“A2”) is called but not suspends since receiver(3rd) coroutine waits for receiving. Since it’s 1st coroutine’s turn, 1st coroutine continues to work, and finishes. 3rd coroutine now is not suspending but waits for its turn.
  • Now it’s 2nd coroutine’s turn. Only something is printed.
  • Then it’s 3rd coroutine’s turn. channel.receive takes the “A2” from channel and ends.

Rendezvous Channel example 2

If you call send() and receive() in the same Coroutine, this coroutine never terminates. If you understand the channels correctly, doing this already illogical because the purpose of channels is sending values between different coroutines.

// The following coroutine never terminates.
val channel1 = Channel<String>()
launch {
channel1.send("value")
channel1.receive()
}
// The following coroutine never terminates.
val channel1 = Channel<String>()
launch {
channel2.send("value")
channel2.receive()
}

Rendezvous Channel example 3

  • Coroutine2 tries to send the value 1 and immediately suspends it as there are no receivers.
  • receive() is called in Coroutine1, thus Coroutine1 receives the value 1, and Coroutine2 now un-suspends but waits its turn to work.
  • Then receive() is called again but this time receive() suspends the Coroutine1 as there is no more values to be received from the channel.
  • Then it’s coroutine2’s turn to work. Thus coroutine2 works and sends the value 2 to the channel, since a receiver waits at the other end of the channel. Coroutine1 un-suspends but waits its turn to work.
  • Coroutine2 continue to iterate and try to send 3 immediately suspends it as there are no receivers. And so on..

Rendezvous Channel example 4 — Closing the channel on sender coroutine, and, receiving data from channel using for loop in receiver coroutine

In receiver coroutine, rather than iterating 5 times using repeat(5) and calling channel.receive(), we can iterate on channel using for loop and we can access the received data directly without need to call channel.receive().

Closing a channel conceptually works by sending a special “close token” over this channel. You close a channel when you have a finite sequence of elements to be processed by consumers and you must signal to the consumers that this sequence is over. You don’t have to close a channel otherwise.

Channels are not tied to any native resource and they don’t have to be closed to release their memory. Simply dropping all the references to a channel is fine. GC will come to do clean it up.

Channel example 5 — coroutine builder function produce(), and, consumeEach extension function

We can update the above examples by using Channel idiomatic keywords, produce() and consumeEach().

The produce() extension-function is a beauty. What it does is, it creates a new Channel for us and returns it as a ReceiverChannel, which encapsulates the sending functionality within the produce-function. Outside of the closure, we can only receive from the Channel. The produce-function automatically closes the Channel when the closure is done. So you won’t accidentally end up with a coroutine and Channel that never terminates.

Launches a new coroutine to produce a stream of values by sending them to a channel and returns a reference to the coroutine as a ReceiveChannel. This resulting object can be used to receive elements produced by this coroutine.

consumeEach is an extension function, which replaces a for loop on the consumer side. consumeEach will become obsolete in future so you shouldn’t use it.

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking { // 1st way of sending integers to the channel.
/*val squares = Channel<Int>()
launch {
for (x in 1..5)
squares.send(x * x)
squares.close() // we're done sending
}*/
// 2nd way of sending integers to the channel.
val squares = produceSquares()
// 1st way of receiving integers from the channel.
squares.consumeEach {
delay(3000)
println(it)
}
// 2nd way of receiving integers from the channel.
/*repeat(5) {
delay(3000)
println(squares.receive())
}*/
println("Done!")
}

In the above example, i showed the two different ways for producing and consuming integers in channel. 1st way and 2nd way is equivalent and does the same job, so you can choose one of these 2 ways.

Output :

(3 second delay)
1
(3 second delay)
4
(3 second delay)
9
(3 second delay)
16
(3 second delay)
25
Done!

In my article, i got help from the following links. Check these links for more information.

That’s all for now.

See you in the 3rd part of this article for Flows.

Happy Channel Coding.

--

--