Mastering Kotlin Coroutines with Practical Examples

Hiten Pratap Singh
hprog99
Published in
10 min readMay 22, 2023

Before we dive into examples, it’s crucial to grasp the concept of coroutines. In the simplest terms, a coroutine is a way to write asynchronous code in a sequential manner. It’s a lightweight thread that doesn’t require the overhead of context switching. In the world of Kotlin, a coroutine is a piece of code that can be suspended and resumed without blocking the executing thread.

Coroutine Basics: Launch, Async and RunBlocking

To work with coroutines, Kotlin provides three basic building blocks: launch, async, and runBlocking.

Launch

launch is used to fire and forget coroutine. It's perfect for cases where you don't need to compute any result. Here's a simple example:

import kotlinx.coroutines.*

fun main() {
GlobalScope.launch {
delay(1000L)
println("Hello from Coroutine!")
}
println("Hello from Main Thread!")
Thread.sleep(2000L)
}

In this code, we’re launching a new coroutine using GlobalScope.launch. Inside this coroutine, we're delaying for one second (1000 milliseconds), and then printing a message.

Async

async is used when you need a result computed in a coroutine. It starts a new coroutine and returns a Deferred<T>, which is a non-blocking future that represents a promise to provide a result later. Here's an example:

import kotlinx.coroutines.*

fun main() {
GlobalScope.launch {
val result = async {
computeResult()
}
println("Computed result: ${result.await()}")
}
Thread.sleep(2000L)
}

suspend fun computeResult(): Int {
delay(1000L)
return 42
}

In this code, we’re launching a new coroutine and starting a computation inside it using async. This computation is a suspend function computeResult, which delays for one second and then returns the number 42. After the computation, we print the result using await.

RunBlocking

runBlocking is a bridge between non-coroutine world and coroutine world. It's a way to start top-level main coroutine. Here's how to use it:

import kotlinx.coroutines.*

fun main() = runBlocking {
launch {
delay(1000L)
println("Hello from Coroutine!")
}
println("Hello from Main Thread!")
}

In this code, we’re starting a main coroutine using runBlocking, and inside this coroutine, we're launching a new coroutine.

Coroutine Context and Dispatchers

Every coroutine in Kotlin has a context associated with it, which is a set of various elements. The key elements in this set are Job of the coroutine and its dispatcher.

Dispatchers

In simple words, coroutine dispatchers determine what thread or threads the corresponding coroutine uses for its execution. Kotlin provides three main dispatchers:

  • Dispatchers.Main — for UI-related tasks.
  • Dispatchers.IO — for input/output tasks, like reading or writing from/to a database, making network calls, or reading/writing files.
  • Dispatchers.Default — for CPU-intensive tasks, like sorting large lists or doing complex computations.

Here’s an example of using dispatchers:

import kotlinx.coroutines.*

fun main() = runBlocking {
launch(Dispatchers.IO) {
println("IO: ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) {
println("Default: ${Thread.currentThread().name}")
}
launch(Dispatchers.Main) {
println("Main: ${Thread.currentThread().name}")
}
}

Exception Handling in Coroutines

Exceptions in coroutines are propagated in a way similar to regular exceptions, with some differences related to the asynchronous nature of coroutines. Here’s a simple example:

import kotlinx.coroutines.*

fun main() = runBlocking {
val job = GlobalScope.launch {
println("Throwing exception from coroutine")
throw IllegalArgumentException()
}
job.join()
println("Joined failed job")
val deferred = GlobalScope.async {
println("Throwing exception from async")
throw ArithmeticException()
42
}
try {
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
}

This code will output:

Throwing exception from coroutine
Joined failed job
Throwing exception from async
Caught ArithmeticException

Coroutine Exception Handling Best Practices

When dealing with exceptions in Kotlin Coroutines, there are some best practices you should follow:

  1. Prefer using structured concurrency: When a coroutine is launched in a CoroutineScope, it’s automatically cancelled when the scope is cancelled. This makes cleanup easier and helps avoid leaking resources.
  2. Use CoroutineExceptionHandler sparingly: Exceptions in coroutines are propagated to their parent coroutine or the thread that started them, so you usually don't need to use a CoroutineExceptionHandler. It's mostly useful for top-level coroutines that you launch and don't have a parent coroutine.
  3. Use catch operator with Flows: When working with Flows, exceptions can occur in the operators applied to the Flow. You can catch these exceptions using the catch operator.
  4. Don’t catch CancellationException: If a coroutine is cancelled, it throws a CancellationException. This is a normal operation and should not be treated as an error, so you usually shouldn't catch it.

Structured Concurrency

Structured concurrency is one of the main benefits of using Kotlin Coroutines. In traditional programming, when we start a thread, it operates independently of the code that started it. With structured concurrency, the newly created coroutine is bound within a specific scope and gets cancelled when the scope is cancelled. Here’s a simple example:

import kotlinx.coroutines.*

fun main() = runBlocking {
launch {
delay(1000L)
println("Task from runBlocking")
}

coroutineScope {
launch {
delay(2000L)
println("Task from nested launch")
}

delay(500L)
println("Task from coroutine scope")
}

println("Coroutine scope is over")
}

In this code, runBlocking creates a new coroutine scope, and within that scope, we launch a new coroutine and create a new coroutineScope. The coroutineScope blocks the current coroutine until all of its child coroutines are completed. So, the message "Coroutine scope is over" is printed only after the nested launch completes its execution.

Suspending Functions

Suspending functions are a cornerstone of Kotlin Coroutines. They are the functions that can be paused and resumed at a later time. To define a suspending function, you use the suspend modifier. Here's a simple example:

import kotlinx.coroutines.*

suspend fun doSomething() {
delay(1000L)
println("Doing something")
}

fun main() = runBlocking {
launch {
doSomething()
}
}

In this code, doSomething is a suspending function. Inside doSomething, we're delaying for one second and then printing a message. We're calling doSomething from a coroutine, because suspending functions can only be called from another suspending function or a coroutine.

Non-blocking Delays

In traditional programming, we use Thread.sleep() to delay the execution of a program. However, this blocks the thread and can be inefficient. Kotlin Coroutines offer delay function, which is a non-blocking equivalent of Thread.sleep(). Here's a simple example:

import kotlinx.coroutines.*

fun main() = runBlocking {
launch {
delay(1000L)
println("Hello from Coroutine!")
}
println("Hello from Main Thread!")
}

In this code, we’re delaying the execution of the coroutine for one second using delay. Despite the delay, the main thread continues its execution, demonstrating the non-blocking nature of delay.

Job Hierarchy

In Kotlin Coroutines, all coroutine builders like launch and async return a Job object, which can be used to control the lifecycle of the coroutine. If a parent job is cancelled, all its children are cancelled too. This forms a hierarchy of jobs. Here's a simple example:

import kotlinx.coroutines.*

fun main() = runBlocking {
val parentJob = launch {
val childJob = launch {
while (true) {
println("Child is running")
delay(500L)
}
}
delay(2000L)
println("Cancelling child job")
childJob.cancel()
}
parentJob.join()
}

In this code, we’re launching a parent job, and inside the parent job, we’re launching a child job. After some delay, we cancel the child job. Since the child job is running an infinite loop, it will continue to run until cancelled.

Channels

Channels in Kotlin Coroutines provide a way to transfer a stream of values between coroutines. They are similar to BlockingQueue, but with some key differences. For example, instead of putting an element to a queue, a sender sends it to a channel. Instead of taking an element from a queue, a receiver receives it from a channel. Here’s a simple example:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close()
}
repeat(5) { println(channel.receive()) }
println("Done!")
}

In this code, we’re creating a channel of Integers. Inside a coroutine, we’re sending squares of numbers from 1 to 5 to the channel, and then closing it. Outside the coroutine, we’re receiving the values from the channel and printing them.

Flow

Flow is a type in Kotlin that can emit multiple values sequentially, as opposed to suspend functions which return only a single value. It’s a cold stream, meaning the code inside a flow builder does not run until the flow is collected. Here’s a simple example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun numbers(): Flow<Int> = flow {
for (i in 1..5) {
delay(1000L)
emit(i)
}
}

fun main() = runBlocking {
launch {
for (k in 1..5) {
println("I'm not blocked $k")
delay(1000L)
}
}
numbers().collect { value -> println(value) }
}

In this code, numbers is a Flow that emits numbers from 1 to 5, with a delay of one second between each number. Inside main, we're launching a new coroutine that prints messages without being blocked by the Flow, demonstrating the non-blocking nature of Flows. After that, we're collecting the values emitted by the Flow and printing them.

Advanced Flow Operators

Kotlin’s Flow API provides a variety of operators that you can use to transform and manipulate streams of data. These operators include common ones like map and filter, as well as more advanced ones like debounce, buffer, combineLatest, and more. Here's a simple example of using debounce:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
delay(1000L)
return "response $request"
}

fun main() = runBlocking {
val flow = (1..5).asFlow().onEach { delay(300L) }
flow.debounce(500L)
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}

In this code, we’re creating a Flow of numbers from 1 to 5, with a delay of 300 milliseconds between each number. Then, we’re applying debounce operator to the Flow to only emit a value if 500 milliseconds have passed without it emitting another value. After that, we're mapping the numbers to their responses using performRequest, and collecting the result.

Combining Multiple Coroutines

In real-world applications, you often need to run multiple coroutines simultaneously and combine their results. Kotlin Coroutines provide several ways to achieve this, like zip, combine, select, and more. Here's a simple example of using zip:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
delay(1000L)
return "response $request"
}

fun main() = runBlocking {
val nums = (1..5).asFlow()
val strs = nums.map { performRequest(it) }
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { println(it) }
}

In this code, performRequest is a suspending function that delays for one second and then returns a string. Inside main, we're creating a Flow of numbers from 1 to 5, and then mapping these numbers to their responses using performRequest. After that, we're using zip to combine the original numbers and their responses, and then collecting the result.

Callbacks and Coroutines

Sometimes, you may need to interact with APIs or libraries that use callbacks. Converting these callbacks to Coroutines can make your code cleaner and easier to understand. Kotlin Coroutines provides suspendCancellableCoroutine for this purpose. Here's a simple example:

suspend fun fetchUser(id: String): User = suspendCancellableCoroutine { continuation ->
api.getUser(id, object : Callback<User> {
override fun onResponse(call: Call<User>, response: Response<User>) {
continuation.resume(response.body())
}

override fun onFailure(call: Call<User>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}

In this code, fetchUser is a suspending function that fetches a user with a specific ID using a callback-based API. Inside fetchUser, we're using suspendCancellableCoroutine to convert the callback to a Coroutine.

Coroutine Timeouts

Occasionally, it’s necessary to set a timeout for a particular operation or task performed by a coroutine. Kotlin Coroutines provides the withTimeout function for this purpose. When the timeout is reached, withTimeout throws a TimeoutCancellationException and the corresponding coroutine is cancelled. Here's a simple example:

import kotlinx.coroutines.*

suspend fun doSomething() {
delay(3000L) // simulate a long-running task
}

fun main() = runBlocking {
try {
withTimeout(2000L) {
doSomething()
}
} catch (e: TimeoutCancellationException) {
println("The task exceeded the timeout limit.")
}
}

In this code, doSomething is a suspending function that simulates a long-running task with a delay. Inside main, we're using withTimeout to set a timeout of 2000 milliseconds for doSomething. If doSomething doesn't complete within the timeout, withTimeout throws a TimeoutCancellationException which we catch and handle.

Coroutine Scopes and Supervision

In structured concurrency, when a parent coroutine is cancelled, all its child coroutines are also cancelled. However, sometimes you want to control the cancellation of child coroutines independently. For this purpose, Kotlin provides the concept of coroutine supervision. You can create a SupervisorJob or use supervisorScope. Here's a simple example:

import kotlinx.coroutines.*

suspend fun doSomething() {
delay(1000L)
throw Exception("Something went wrong.")
}

fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
val child1 = launch {
doSomething()
}
val child2 = launch {
delay(2000L)
println("Coroutine 2 completed.")
}
}
delay(3000L)
}

In this code, doSomething is a suspending function that simulates a task and throws an exception. Inside main, we're creating a SupervisorJob and two child coroutines: child1 and child2. child1 calls doSomething and catches any exceptions it throws. child2 simply delays for 2000 milliseconds and then prints a message. Because we're using a supervisor job, child2 isn't cancelled when child1 fails.

Shared Mutable State and Concurrency

In concurrent programming, shared mutable state is a common source of bugs and problems. Kotlin Coroutines provide several tools to handle shared mutable state safely. One of these tools is Mutex, which is a mutual exclusion lock that can be used to protect shared state. Here's a simple example:

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

var counter = 0
val mutex = Mutex()

suspend fun increment() {
withContext(Dispatchers.Default) {
repeat(1000) {
mutex.withLock {
counter++
}
}
}
}

fun main() = runBlocking {
val job1 = launch { increment() }
val job2 = launch { increment() }
job1.join()
job2.join()
println(counter)
}

In this code, counter is a shared variable that's incremented by two concurrent coroutines. We're using mutex to ensure that only one coroutine can increment the counter at a time, avoiding race conditions.

Another tool provided by Kotlin Coroutines for handling shared mutable state is sharedFlow. A SharedFlow is a hot Flow that emits updates to its collectors. It's useful when you have state that's updated by one coroutine and read by one or more other coroutines. Here's a simple example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val sharedFlow = MutableSharedFlow<Int>()

suspend fun producer() {
var counter = 0
while (true) {
delay(1000L)
counter++
sharedFlow.emit(counter)
}
}

suspend fun consumer(id: Int) {
sharedFlow.collect { value ->
println("Consumer $id received $value")
}
}

fun main() = runBlocking {
val job1 = launch { producer() }
val job2 = launch { consumer(1) }
val job3 = launch { consumer(2) }
delay(5000L)
job1.cancel()
job2.cancel()
job3.cancel()
}

In this code, producer is a suspending function that emits increasing numbers to sharedFlow every second. consumer is another suspending function that collects these numbers from sharedFlow. Inside main, we're launching one producer coroutine and two consumer coroutines. After 5 seconds, we're cancelling all the coroutines.

Kotlin Coroutines provide a vast toolkit for efficient asynchronous and concurrent programming in Kotlin. From basics like launching and cancelling coroutines, to more advanced topics such as channels, flows, and handling shared mutable state, and to debugging and context elements, Kotlin Coroutines cover a broad spectrum of use-cases and scenarios. As you continue to explore and practice, you will find your programming skills in Kotlin advancing, and you will be able to write more effective, cleaner, and error-free code!

--

--