Mastering Kotlin Channels: From Beginner to Pro - Part 1

Morty
8 min readApr 1, 2023

--

Kotlin Channels are a powerful concurrency construct that enables communication between two or more coroutines. They provide a way to coordinate between different parts of an application, allowing them to share data and work together without interfering with each other. In this series of posts, we will dive deeper into Kotlin Channels and explore tips, tricks, and best practices for mastering this powerful tool.

What are Kotlin Channels?

Kotlin Channels can be thought of as a pipeline through which data can flow from one coroutine to another. A channel is essentially a buffer or a queue where coroutines can send and receive messages. One coroutine can put data (send) into the channel while another coroutine can retrieve that data from the channel.

One coroutine sends data to a channel, while another receives that data from a channel

Channels are an implementation of the Producer-Consumer pattern but built especially for coroutines. A coroutine that sends (produces) information is often called a producer, and a coroutine that receives (consumes) information is called a consumer. One or multiple coroutines can send information to the same channel, and one or multiple coroutines can receive data from it:

When many coroutines receive information from the same channel, each element is handled only once by one of the consumers. Once an element is handled, it is immediately removed from the channel.

Here is an example of how to work with a Koltin Channel:

fun channel(
coroutineScope: CoroutineScope,
) {
val channel = Channel<String>() // Channel exchanges a data of String type

// Producer starts sending data inside a coroutine
coroutineScope.launch {
Log.d("Channel", "Sent data 1 to channel")
channel.send("Data 1")
Log.d("Channel","Sent data 2 to channel")
channel.send("Data 2")
channel.close() // we're done sending so channel should be closed
}

// Consumer starts receiving data inside another coroutine
coroutineScope.launch {
channel.consumeEach {
Log.d("Channel","Received: $it")
}
Log.d("Channel","Done!") // This line called when channel is closed
}
}

In this example, two coroutines are launched. The first coroutine sends data strings to the channel using the channel.send() function. The second coroutine consumes the data from the channel using the channel.consumeEach() function, which is a convenient way to iterate over all the values sent to the channel. After sending the data, the coroutine closes the channel using the channel.close() function.

The close() function in Kotlin Channels is used to signal the end of data transmission. Conceptually, it's like sending a special token that signifies the closing of the channel. When a coroutine receives this token, it knows that there will be no more data sent to the channel. It stops the iteration at this point, ensuring that all previously sent elements are received before the channel is closed. This makes it easier to handle the end of a communication session and ensures that all data is processed before the coroutine exits.

The Output for the code above is:

Sent data 1 to channel
Received: Data 1
Sent data 2 to channel
Received: Data 2
Done!

ReceiveChannel

In Kotlin Coroutines, ReceiveChannel is a type of channel that provides a way to receive data from a coroutine. It is used when you want to consume data from a channel without being able to send any data back to the sender.

By using a ReceiveChannel for the consumer coroutine, we ensure that it can only receive data and cannot accidentally send data back. This separation of concerns helps make the code more maintainable and less error-prone.

Here is an example of how to use ReceiveChannel

@OptIn(ExperimentalCoroutinesApi::class)
fun receiveChannel(
coroutineScope: CoroutineScope
) {
var channel: ReceiveChannel<String> = Channel()

// Producer Coroutine
coroutineScope.launch {
channel = produce {
send("A")
send("B")
send("C")
send("D")
// we don't have to close the channel explicitly
}
}

// Consumer Coroutine
coroutineScope.launch {
channel.consumeEach {
Log.d(TAG, "Received $it")
}
// sending back data to channel inside consumer coroutine is not possible
// because it is a ReceiveChannel
// channel.send("E")

// channel is automatically closed
Log.d(TAG, "Is producer closed: ${channel.isClosedForReceive}")
}
}

In the code above, we use produce function, which is a convenient way to create a ProducerScope for coroutines. The produce function allows you to send data from within a coroutine and returns a ReceiveChannel that can be consumed by another coroutine.

The channel.send("E") line is commented out because it would result in a compilation error. Since channel is a ReceiveChannel, it doesn't have the send() method. This demonstrates that the consumer coroutine cannot send data back to the producer when using a ReceiveChannel.

After the consumeEach block has been completed, the channel is automatically closed by the produce function. In the end, the channel.isClosedForReceive property is checked, and the result is true which means the channel is closed.

The output for the code above is:

Received A
Received B
Received C
Received D
Is producer closed: true

Difference between Channel and ReceiveChannel

The main difference between ReceiveChannel and a regular Channel in Kotlin Coroutines is that a ReceiveChannel can only be used to consume data from the channel, while a regular Channel can be used to both send and receive data.

When you create a Channel, you are creating a reference to a channel that can be used to send and receive data between coroutines. The Channel class provides functions like send() and receive() that can be used to send and receive data on the channel.

On the other hand, when you create a ReceiveChannel, you are creating a reference to a channel that can only be used to consume data from the channel. The ReceiveChannel class provides functions like receive() and tryReceive() that can be used to receive data from the channel.

In other words, a regular Channel is a bidirectional channel that can be used to send and receive data between coroutines, while a ReceiveChannel is a unidirectional channel that can only be used to receive data from the channel.

Pipeline

Receiver channels can be used to implement pipelines. A pipeline is a set of stages that are connected by channels and that work together to transform input data into output data.

Each stage in the pipeline is a coroutine that consumes data from an input channel, performs some computation on the data, and then sends the transformed data to an output channel, which is consumed by the next stage in the pipeline.

The input and output channels between stages in the pipeline act as a buffer that allows each stage to process data asynchronously and independently. This allows the pipeline to efficiently handle large volumes of data and to parallelize the computation across multiple cores or threads.

Pipelines are useful in scenarios where you need to process data in stages, where each stage performs a specific computation on the data. For example, you can use a pipeline to process a stream of data from a sensor, where each stage performs a specific transformation on the data, such as filtering, smoothing, or averaging.

Pipeline Example 1:

Here is an example of a pipeline that processes a stream of integers by filtering out even numbers, squaring the remaining odd numbers, and then summing them up:

fun streamingNumbers(scope: CoroutineScope) {
scope.launch {
val numbers = produceNumbers(10)
val result = pipeline(numbers)

Log.d(TAG, result.receive().toString())
}
}

// Producing numbers, each number being sent to the pipeline
fun CoroutineScope.produceNumbers(count: Int): ReceiveChannel<Int> = produce {
for (i in 1..count) send(i)
}

// Pipeline which process the numbers
fun CoroutineScope.pipeline(
numbers: ReceiveChannel<Int>
): ReceiveChannel<Int> = produce {
// Filtering out even numbers
val filtered = filter(numbers) { it % 2 != 0 }

// Squaring the remaining odd numbers
val squared = map(filtered) { it * it }

// Summing them up
val sum = reduce(squared) { acc, x -> acc + x }

send(sum)
}

fun CoroutineScope.filter(
numbers: ReceiveChannel<Int>,
predicate: (Int) -> Boolean
): ReceiveChannel<Int> = produce {
numbers.consumeEach { number ->
if (predicate(number)) send(number)
}
}

fun CoroutineScope.map(
numbers: ReceiveChannel<Int>,
mapper: (Int) -> Int
): ReceiveChannel<Int> = produce {
numbers.consumeEach { number ->
send(mapper(number))
}
}

fun reduce(
numbers: ReceiveChannel<Int>,
accumulator: (Int, Int) -> Int
): Int = runBlocking {
var result = 0
for (number in numbers) {
result = accumulator(result, number)
}
result
}

In this example, the pipeline function creates a new pipeline by chaining together three stages: filter, map, and reduce. The filter stage filters out even numbers, the map stage squares the remaining odd numbers, and the reduce stage sums up the squared odd numbers.

Each stage is implemented as a separate coroutine that consumes data from an input channel and produces data to an output channel using the filter, map, and reduce functions. The pipeline function returns a new ReceiveChannel that represents the output channel of the pipeline.

Pipeline Example 2 — Image Processing:

Here’s an example of a pipeline that processes a stream of images by resizing, compressing, and storing them:

fun processImages(
coroutineScope: CoroutineScope
) {
coroutineScope.launch {
val images = produceImages(listOf(
"https://via.placeholder.com/300x300.png",
"https://via.placeholder.com/500x500.png",
"https://via.placeholder.com/800x800.png"
))
val resized = resizeImages(images, 400)
val compressed = compressImages(resized, 80)
storeImages(compressed, Paths.get("output/"))
}
}

fun CoroutineScope.produceImages(urls: List<String>): ReceiveChannel<ByteArray> = produce {
for (url in urls) {
val bytes = URL(url).readBytes()
send(bytes)
}
}

fun CoroutineScope.resizeImages(
images: ReceiveChannel<ByteArray>, size: Int
): ReceiveChannel<ByteArray> = produce {
images.consumeEach { image ->
// ImageResizer can a util class to resize the image
val resizedImage = ImageResizer.resize(image, size)
send(resizedImage)
}
}

fun CoroutineScope.compressImages(
images: ReceiveChannel<ByteArray>, quality: Int
): ReceiveChannel<ByteArray> = produce {
images.consumeEach { image ->
// ImageCompressor can a util class to compress the image
val compressedImage = ImageCompressor.compress(image, quality)
send(compressedImage)
}
}

suspend fun storeImages(images: ReceiveChannel<ByteArray>, directory: Path) {
Files.createDirectories(directory)
var index = 1
for (image in images) {
val file = directory.resolve("image${index++}.jpg")
FileOutputStream(file.toFile()).use { output ->
output.write(image)
}
}
}

In this example, the processImages function creates a ReceiveChannel that produces a stream of image data from a list of URLs using the produceImages function. It then passes this channel to the resizeImages function, which resizes the images to a specified size, and then passes the output channel to the compressImages function, which compresses the images to a specified quality level. Finally, the output channel of the compressImages function is passed to the storeImages function, which stores the compressed images to disk.

Each stage in the pipeline is implemented as a separate coroutine that consumes data from an input channel and produces data to an output channel using the resizeImages, compressImages, and storeImages functions.

The ImageResizer and ImageCompressor classes used in the resizeImages and compressImages functions are just examples of hypothetical classes that could perform these operations on the image data.

Overall, this pipeline provides a convenient and efficient way to process a stream of images by resizing, compressing, and storing them. This pipeline can be easily extended to include additional stages or to handle different types of image-processing tasks.

What’s Next?

In the next part of this series, we’ll dive deeper into the world of Kotlin Channels and explore different types of channels and their real-world applications. By the end of the second article, you’ll have a comprehensive understanding of how to use Kotlin Channels to build efficient and scalable concurrent applications. Stay tuned!

You can find the next part here

--

--

Morty

Senior Mobile Developer@ABNAMRO busy creating top-notch mobile apps. https://morti.tech