Mastering Kotlin Channels: From Beginner to Pro - Part 2

Morty
13 min readApr 19, 2023

--

In this series of posts, I’ll explain Koltin channels in detail. You can find the first part here:

There are four types of channels for different purposes:

val rendezvousChannel = Channel<String>(0)
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

Rendezvous channel - Channel with zero capacity

val rendezvousChannel = Channel<String>(0)
Rendezvous channel with zero capacity

Imagine you have two people who want to exchange messages with each other. They agree to meet at a specific location to exchange their messages. However, they can only exchange messages if they are both present at the location. This is similar to how a rendezvous channel works.

A rendezvous channel is a specific type of channel that requires both the sender and the receiver to be ready and waiting before a message can be exchanged. This ensures that both sides are synchronized and ready to exchange data.

For example, imagine one coroutine wants to send a message to another coroutine using a rendezvous channel. The sending coroutine will be suspended (paused) until the receiving coroutine is ready to receive the message. Likewise, the receiving coroutine will be suspended until the sending coroutine is ready to send the message. This ensures that both coroutines are synchronized and ready to exchange data, similar to how the two people in the previous example had to be present at the meeting location before they could exchange messages.

Rendezvous, when send() and receive() should "meet on time".

This is how the Rendezvous channel works:

On the producer side:

  1. A producer coroutine calls the send() function on the rendezvous channel with a value to send.
  2. If there is no coroutine waiting to receive a value, the send() function is suspended until a coroutine calls receive() on the channel.
  3. If there is a coroutine waiting to receive a value from the channel, the value is immediately passed to the waiting coroutine and both coroutines continue executing.

On the consumer side:

  1. A consumer coroutine calls the receive() function on the rendezvous channel to retrieve a value from the channel.
  2. If there is no coroutine waiting to send a value, the receive() function is suspended until a coroutine calls send() on the channel.
  3. If there is a coroutine waiting to send a value to the channel, the value is immediately retrieved from the channel and passed to the waiting coroutine, and both coroutines continue executing.

Because the channel is a rendezvous channel with a capacity of zero, the sender and receiver will rendezvous at the channel, ensuring that both parties are ready to exchange data before the exchange occurs.

When you create a channel, a “Rendezvous” channel is created by default.

Exploring the Pitfalls of Implementing Rendezvous Channels and How to Avoid Them

Now we know how a rendezvous channel is working, let’s discuss a common issue that can arise when using them. Here is a sample code for creating a channel:

In the code above, there is a producer coroutine which sends 5 times messages to the channel with two logs before and after sending messages. On the consumer side, there is a coroutine that receives the messages and prints them. In the log output, each log message is prefixed with the time difference in seconds with the format seconds.milliseconds since the start of the coroutines.

With the code above you expect to see the following result:

[0.002] Producer -> Sending 1
[0.004] Producer -> Sent 1
[0.004] Consumer Received 1
[0.007] Producer -> Sending 2
[0.007] Producer -> Sent 2
[0.008] Consumer Received 2
[0.009] Producer -> Sending 3
[0.009] Producer -> Sent 3
[0.009] Consumer Received 3
[0.011] Producer -> Sending 4
[0.011] Producer -> Sent 4
[0.012] Consumer Received 4
[0.013] Producer -> Sending 5
[0.014] Producer -> Sent 5
[0.014] Consumer Received 5

But if you run the code above you might notice this output:

[0.002] Producer -> Sending 1
[0.004] Producer -> Sent 1
[0.004] Producer -> Sending 2
[0.005] Consumer Received 1
[0.006] Consumer Received 2
[0.007] Producer -> Sent 2
[0.007] Producer -> Sending 3
[0.007] Producer -> Sent 3
[0.007] Producer -> Sending 4
[0.007] Consumer Received 3
[0.008] Consumer Received 4
[0.009] Producer -> Sent 4
[0.009] Producer -> Sending 5
[0.010] Producer -> Sent 5
[0.010] Consumer Received 5

As you see some groups of values are produced and consumed together, while other values are produced before being consumed. But why?

In a rendezvous channel, the send() function should suspend until there is a consumer coroutine ready to receive the value. Similarly, the receive() function should suspend until a producer coroutine is ready to send the value. Look at the time of [0.007]

The behaviour of the send() function may be affected by a number of factors, such as the size of the buffer used by the channel implementation, the scheduling behaviour of the underlying thread pool, and the speed of the coroutines themselves.

The behaviour you are observing in the second output suggests that the producer coroutine may be able to send multiple items to the channel without being suspended. If the buffer size is set to 0, as in the case of a rendezvous channel, then the send() function should suspend until there is a coroutine ready to receive the value. However, if the consumer coroutine is slow or is blocked by other operations, then the send() function may be able to send multiple values to the channel before being suspended.

How to solve this problem?

  • delay()

To ensure that the producer coroutine is correctly suspended until the consumer coroutine is ready to receive the value, you can add a delay between sending each value to the channel, like so:

fun rendezvousChannel(
coroutineScope: CoroutineScope
) {
// .....
// launch the producer coroutine
coroutineScope.launch {
for (i in 1..5) {
log("Producer -> Sending $i")
channel.send(i) // send data to the channel
log("Producer -> Sent $i", startTime)
delay(1) // wait for a short time before sending the next value
}
channel.close() // close the channel after sending all data
}
// .....
}

By adding a delay between sending values, you ensure that the consumer coroutine has a chance to process each value before the producer coroutine sends the next value. This helps to ensure that the producer coroutine is correctly suspended until the consumer coroutine is ready to receive the next value, and can prevent issues such as buffer overflow or race conditions.

  • yield()

You also can use yield() instead of delay() to achieve similar results. In Kotlin, yield() is a suspending function that allows other coroutines to run before resuming the current coroutine. When you call yield(), the current coroutine is suspended and other coroutines in the same context are given a chance to run. When the coroutine is resumed, it picks up where it left off, but the other coroutines may have had a chance to catch up and process some of the values from the channel.

fun rendezvousChannel(
coroutineScope: CoroutineScope
) {
// .....
// launch the producer coroutine
coroutineScope.launch {
for (i in 1..5) {
log("Producer -> Sending $i")
channel.send(i) // send data to the channel
log("Producer -> Sent $i", startTime)
yield() // yield to other coroutines
}
channel.close() // close the channel after sending all data
}
// .....
}

Note that yield() does not guarantee that the other coroutines will have a chance to run before the current coroutine is resumed. The actual behaviour may depend on the underlying thread pool and the scheduling behaviour of the coroutines. If you need more precise control over the timing of your coroutines, you may want to use delay().

Buffered channel - Channel with a capacity

// This creates a buffered channel that can hold up to 10 elements.
val bufferedChannel = Channel<String>(10)
Buffered channel that can hold elements.

As I mentioned before, In programming, a channel is a way for different parts of a program to communicate with each other. Think of it like a messenger between two people.

Now, a buffered channel is a type of channel that has a temporary storage area (or buffer) for messages. This buffer can hold a certain number of messages before it becomes full.

When you send a message through a buffered channel, the message is added to the buffer if there is space available. If the buffer is full, the sender is blocked (or paused) until there is space available to add the message.

Similarly, when you receive a message from a buffered channel, the message is removed from the buffer if there are any messages available. If the buffer is empty, the receiver is blocked (or paused) until a message becomes available.

So, buffered channels can be useful in situations where there is a delay between the sender and receiver, as it allows the sender to keep sending messages even if the receiver is slower to receive them. It can also help reduce the number of pauses between different parts of a program.

Here is a sample code of how to create a buffered channel:

And the output is:

[0.001] Producer Sent -> 1
[0.002] Producer Sent -> 2
[0.002] Consumer Received 1
[0.003] Producer Sent -> 3
[2.006] Consumer Received 2
[2.006] Producer Sent -> 4
[4.010] Consumer Received 3
[4.010] Producer Sent -> 5
[4.011] All Sent!
[6.014] Consumer Received 4
[8.018] Consumer Received 5
[8.019] Receiving Done!

The logs show that the first two messages [0.001] Sent -> 1 and [0.002] Sent -> 2 are sent immediately — with only a 1 millisecond time difference — because the channel has a capacity of 2 and can hold two elements without suspending the sender. When the third message is sent, the sender is suspended because the channel is full. When the second coroutine consumes the first element from the channel [0.002] Received 1, the sender is resumed and the third message is sent [0.003] Sent -> 3Then the receiver is busy doing some process for two seconds. At this moment, sending messages is suspended until the consumer coroutine receives the next message [2.006] Received 2. So producer waits until the consumer can receive another message and then immediately sends the next message [2.006] Sent -> 4 . This pattern repeats until all 5 messages are sent and received. And at the end, when the producer sends all the messages and the channel is closed consumer no longer does any process and [8.019] Receiving Done! will be printed right after consuming the last message by the consumer [8.018] Received 5 with only a 1 millisecond time difference.

Conflated channel - Keeping Only the Latest Element

val conflatedChannel = Channel<String>(CONFLATED)
A conflated channel will overwrite the previously sent element

Conflated Channel is a type of channel that can hold only one element at a time, and if a new element is sent before the previous one is consumed, the channel will overwrite the previous element with the new one, effectively “conflating” or merging the two.

This behaviour can be useful in some scenarios where the most recent value is more important than previous values. For example, in an application that displays real-time stock prices, the most recent price is typically the most relevant, and previous prices can be discarded.

It’s important to note that since a conflated channel can hold only one element, sending a new element before the previous one is consumed will result in the previous element being lost. Therefore, conflated channels are not suitable for scenarios where every element must be processed.

Here’s an example of how a conflated channel could be used for GPS tracking updates in a mobile application. Suppose we have an Android app that displays the real-time location of a user on a text view. The app needs to periodically receive GPS updates from the phone’s hardware, but it’s only interested in the most recent location, and previous locations can be discarded.

Here’s the sample code:

As you can see in the code above I created a conflated channel for location updates. This channel is used to receive location updates from the phone’s GPS hardware.

val locationChannel = remember { Channel<Location>(Channel.CONFLATED) }

The LocationDisplay composable uses a coroutine to receive location updates from the channel, like this:

LaunchedEffect(locationChannel) {
val scope = CoroutineScope(Dispatchers.Default)
scope.launch {
locationChannel.receiveAsFlow().collect { location ->
locationState.value = "Lat: ${location.latitude}, Lng: ${location.longitude}"
}
}
}

This code sets up a coroutine that listens for new values in the location channel using receiveAsFlow(). Whenever a new location is received, it updates the locationState mutable state variable, which triggers a recomposition of the composable. receiveAsFlow() actually represents a channel as a hot flow, which means that every time the flow is collected, each collector will receive the same element. By using a hot flow, multiple collectors can listen to the same channel simultaneously, without blocking the UI thread.

I’ll explain receiveAsFlow , consumeAsFlow , Fan-out, and Fan-in in the next post of this series.

The LocationDisplay composable requests location updates from the phone's GPS hardware using a DisposableEffect, like this:

DisposableEffect(activity) {
val locationManager = activity.getSystemService(Context.LOCATION_SERVICE) as LocationManager
val locationListener = LocationListener { location ->
locationChannel.trySend(location)
log("Location updated")
}

requestLocationUpdatesIfNeeded(locationManager, locationListener)

onDispose {
locationManager.removeUpdates(locationListener)
}
}

This code creates a DisposableEffect that requests location updates from the phone's GPS hardware if the required permissions are granted. Whenever a new location is received, it sends it to the location channel using trySend(). The DisposableEffect is also responsible for removing the location updates when the composable is removed from the UI hierarchy.

If you’re not familiar with side effects the post below might help you

Unlimited channel - Channel with no capacity restrictions

val unlimitedChannel = Channel<String>(UNLIMITED)
An unlimited channel can hold elements until getting an OutOfMemeryException

An unlimited channel in Kotlin has no predefined buffer size and allows producers to send elements to the channel without any limit on its size. This means it can hold an unlimited number of elements. The send() operation on an unlimited channel does not block and will always succeed. If there is no element in the channel when a consumer tries to receive it, it will be suspended until a new element is sent to the channel.

Unlimited Kotlin channels can be useful in scenarios where you don’t want producers (senders) to be suspended due to a full buffer, and you’re confident that consumers (receivers) can keep up with the rate of incoming data. However, keep in mind that using unlimited channels can lead to memory issues if the channel’s buffer grows too large.

Use cases for an unlimited Kotlin channel

Here are a couple of use cases where an unlimited Kotlin channel might be appropriate:

First use case: An event logging system

Suppose you have an application that generates logs for various events, such as user actions, system events, or network activity. These logs need to be processed and stored in a database, file, or external service. In this scenario, an unlimited channel can be used to send logs from different parts of the application to a coroutine responsible for processing and storing them. Since logs are generated at irregular intervals, you don’t want to limit the buffer size and potentially miss important log entries. Using an unlimited channel ensures that no logs are lost and that the processing coroutine can handle the incoming log data at its own pace.

Let’s see this example:

I created a simple application that simulates user actions and system events. Each event will be logged and sent to an unlimited channel. A dedicated logging coroutine will consume these events and process them.

  • Define an Event data class to represent different types of events:
data class Event(val timestamp: Long, val message: String)
  • Create an EventProducer function to simulate the generation of events:
suspend fun eventProducer(eventChannel: Channel<Event>) {
while (true) {
delay(Random.nextLong(500, 2000)) // Simulate irregular intervals between events
val eventMessage = when (Random.nextInt(1, 4)) {
1 -> "User clicked on button XYZ"
2 -> "System error occurred"
3 -> "Network activity detected"
else -> "Unknown event"
}
val event = Event(System.currentTimeMillis(), eventMessage)
eventChannel.send(event)
println("Event sent: $event")
}
}
  • Create a LoggingCoroutine function to consume and process events from the channel:
suspend fun loggingCoroutine(eventChannel: Channel<Event>) {
eventChannel.consumeEach { event ->
// Process the event (e.g., store it in a database, write to a file, or send to an external service)
println("Processing event: $event")
}
}
  • In the main function, launch the event producers and the logging coroutine:
fun main() = runBlocking {
val eventChannel = Channel<Event>(Channel.UNLIMITED)

// Launch multiple event producers
repeat(5) {
launch { eventProducer(eventChannel) }
}

// Launch the logging coroutine
launch { loggingCoroutine(eventChannel) }

// Keep the application running
delay(30000)
eventChannel.close()
}

Second use case: A task queue for background tasks

Imagine an application that generates tasks to be executed in the background, such as resizing images, generating reports, or sending notifications. These tasks can be sent to an unlimited channel, which is then consumed by a pool of worker coroutines. By using an unlimited channel, you ensure that tasks are not lost and can be executed as resources are available.

val taskChannel = Channel<() -> Unit>(Channel.UNLIMITED)

suspend fun taskProducer(taskChannel: Channel<() -> Unit>) {
// Generate a task and send it to the channel
val task = { println("Executing task") }
taskChannel.send(task)
}

// receives tasks from the producer and executes them
suspend fun taskWorker(taskChannel: Channel<() -> Unit>) {
taskChannel.consumeEach { task ->
// Execute the task
task()
}
}

fun main() = runBlocking {
repeat(5) { launch { taskProducer(taskChannel) } }
launch { taskWorker(taskChannel) }
}

In the main() function, we launch five instances of the taskProducer() coroutine using the repeat() function and one instance of the taskWorker() coroutine. The runBlocking() function is used to start the main coroutine and wait for all other coroutines to complete.

When a producer coroutine generates a task and sends it to the channel, the task is added to the end of the task queue. The worker coroutine, which is waiting for new tasks to be available in the channel, will receive the tasks from the channel in the order they were added and execute them.

What’s Next?

In the next post, I’ll share more practical examples so you will see how to use Koltin Channels following best practices in a real scenario.

Stay tuned!

--

--

Morty

Senior Mobile Developer@ABNAMRO busy creating top-notch mobile apps. https://morti.tech