Structured Concurrency for Coroutines: Unraveling the Fundamentals

Rohit Singh
6 min readDec 26, 2023

--

Concurrency

Concurrency is ability of a program to execute multiple tasks simultaneously. It enables the efficient utilization of system resources and can improve the overall performance and responsiveness of the application.

Concurrency code is mostly achieved by threads. A common approach is to use a callback to retrieve the results. These callback functions accept parameters that facilitate notification when a thread successfully completes a specified task.

class MainActivity : AppCompatActivity(), Callback {
override fun onCreate(savedInstanceState: Bundle?) {
// Creating and starting a worker thread with the activity as the callback
val workerThread = WorkerThread(this)
workerThread.start()
}

// Callback interface implemented by the activity
override fun onComplete(result: String) {
textView.text = result
}
}

// Worker thread class
class WorkerThread(private val callback: Callback) : Thread() {
override fun run() {
// Simulating a time-consuming task
try {
sleep(2000)
} catch (e: InterruptedException) {
e.printStackTrace()
}

// Notify the callback with the result
callback.onComplete("Task completed by WorkerThread")
}
}
// Callback interface
interface Callback {
fun onComplete(result: String)
}

Common pitfalls using the above approach:

  1. Memory Leaks: Memory Leak happens when the application fails to release or deallocate memory that is no longer needed. Callback methods have the potential to retain references to the arguments passed for notifications. If the worker thread continues running while the activity is closed, the activity instance might not be properly garbage collected. This situation can lead to memory retention issues and potential memory leaks, as the worker thread could maintain references to the activity even after it has been destroyed.
  2. Cancellation: Whenever the component which created the thread has reached its lifecycle end, we need to cancel the active threads and free the resources. This cancellations are not straight forward and often requires complex logic to properly cancel thread.
  3. Nested Callbacks: Asynchronous code may lead to callback hell, where nested callbacks become difficult to read and maintain, reducing code clarity.

Structured Concurrency

Structured concurrency is an approach to concurrent programming that preserves the natural relationship between tasks and subtasks, which leads to more readable, maintainable, and reliable concurrent code. In structured concurrency, tasks are associated with specific scopes, this task patiently awaits the outcomes of its subtasks and actively monitors them for potential failures, ensuring a coherent and robust concurrent code design. This framework promotes clarity in the relationships between different elements of the code, contributing to a more organized and comprehensible approach to concurrent programming.

In Kotlin, coroutines are generated within a specific scope, often created using builders such as async or launch. These builders generate child coroutines that are inherently linked to the parent scope.

The child coroutines have the capability to run concurrently among themselves and simultaneously with the parent coroutine. If the parent coroutine encounters a failure or is canceled, the same applies to all of its child coroutines — they will also be canceled.

Structured concurrency is maintained in Kotlin coroutines through the use of CoroutineScope and CoroutineContext.

A CoroutineContext represents the environment in which a coroutine runs. It includes various elements such as the coroutine dispatcher, which defines the thread or pool of threads the coroutine should use, as well as other context elements like coroutine name, exception handler.

CoroutineScope itself doesn't contain data instead, instead it is essentially a wrapper around the coroutine context in Kotlin. The actual information and configuration are stored within a CoroutineContext.

public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}

CoroutineScope serves as parent in structured concurrency, it functions as an entity dedicated to the management and monitoring of coroutines created within its boundaries.

Coroutine Builders: The Launch coroutine builder is used to start a new coroutine that performs a specified block of code asynchronously. It is particularly useful for fire-and-forget scenarios where the result of the coroutine is not needed immediately.

Job: When a coroutine is created using a coroutine builder such as launch or async, it returns a Job instance. This Job can be used to cancel the coroutine, await its completion, or even combine it with other jobs to create a hierarchy.

Launch is an extension function of CoroutinesScope.

fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, 
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit): Job

Parent-Context Element Job

  • The launch function takes an implicit CoroutineContext parameter as part of the extension function syntax.
  • The parent CoroutineContext includes a Job element, which represents the coroutine's lifecycle and allows for cancellation.
  • If the coroutine is launched within a scope, the parent context’s Job serves as the parent job.
  • The launch function creates a new Job for the coroutine it's launching.
  • This new job becomes a child of the parent job, forming a parent-child relationship.
runBlocking {
// This is the CoroutineScope
val job: Job? = this.coroutineContext[Job]
val jobChild = launch {}
println(job?.children?.first()?.equals(jobChild))
}

Code snippet demonstrates the parent-child relationship between the runBlocking coroutine scope and a coroutine launched with launch. The output of the code is true, indicating that the child coroutine created with launch is indeed a child of the runBlocking coroutine scope.

CoroutineContext is a collection, we can find the the Job object using the get method. The above code snippet correctly checks whether launch job is child of parent job. The coroutine scope created by runBlocking enforces structured concurrency. Any coroutines launched within this scope become children of the scope.

Launch function allows you to specify a dispatcher as a parameter to control the thread or thread pool on which the coroutine should run. This is particularly useful in mobile programming, where you may need to offload work to a background thread to keep the UI responsive.

fun main() {
runBlocking {
// This is the CoroutineScope
launch(Dispatchers.IO) {
println("Child is executing in ${Thread.currentThread().name}")
}
println("Parent is executing in ${Thread.currentThread().name}")
}
}
Output for code snippet. This demonstrates usage of different thread pools.

When you use launch, it receives the coroutine context from the coroutine scope it's called on (the parent context) and allows you to provide additional custom context. The CoroutineContext in Kotlin coroutines supports merging two contexts.

When merging, if there are elements with different keys, they coexist. However, if an element with the same key is present in both the parent and child contexts, the child context overrides the corresponding element from the parent context.

Cancellation Mechanism in Coroutines

In coroutines, canceling the parent scope leads to the cancellation of all child scopes. However, the cancellation process is cooperative, meaning child tasks are not instantly halted. Instead, it involves setting the isActive flag to false. This cooperative cancellation mechanism allows ongoing child tasks to complete their current work before gracefully concluding.

fun main() = runBlocking {
val parentJob = launch {
repeat(10) { index ->
if (isActive) {
println("Working on task $index")
delay(1000)
} else {
println("Parent job is cancelled. Exiting.")
return@launch
}
}
}
delay(1500) // Delaying the main thread for 1.5 seconds
// Cancelling the parent job
parentJob.cancelAndJoin()
}

In the above code snippet the launch coroutine checks the isActive flag. If the parent job is still active, it prints a message and add delays for 1000 milliseconds. If the parent job is canceled during the delay, the isActive check becomes false, and the coroutine gracefully exits.

I’m confident you’ve acquired insight into how structured concurrency functions in Kotlin coroutines.

Thanks for reading and if you like the article, remember to 👏.

Please feel free to ask any questions, do reach out at LinkedIn or Twitter.

Thank you AndroidWeekly for selecting this article

--

--