Kotlin Coroutines Case study — Cleaning up an Async API

Tom Hanley
9 min readNov 7, 2019

Asynchronous code can get really messy really quickly. Callback hell was named callback hell for a reason. There are some good ways to avoid this in your design, but sometimes you have to live with an old, difficult to use async API. Maybe because it’s provided by a third party, or maybe it’s just too difficult to rip out and rewrite. What you can do in this case is write a Kotlin extension/wrapper on top of the API.

Kotlin Coroutines are a fantastic way to clean up async APIs. They can make your async code as easy to read and understand as sequential code, but give you all the benefits of non blocking async code.

Note that some familiarity with coroutines is assumed in this post.

At Toast, we’re building an all in one restaurant management platform. At its core is the Android point of sale system. The project I was on was tasked with integrating our next-generation card reader into the application.

The card reader was designed and manufactured by an external hardware company, who also provided an Android SDK. All calls to the reader were asynchronous. Essentially the SDK had two main classes, the controller and the listener. The controller was used to send commands to the card reader. All its methods were void, so it did not return any information. All information and errors were returned via the Listener. This might sound ok, that it ticks the box for being asynchronous, but it’s very difficult to use correctly and cleanly. Let’s take a look at a small section of the API to illustrate.

Original Card Reader API data flow

Take the query to get the DeviceInfo from the card reader, which contains information such as battery level, firmware version and serial number. We have a Controller interface that we use to send commands to the card reader, and a Listener interface that we implement to handle events from the card reader.

interface Controller {
fun getDeviceInfo()
}

interface Listener {
fun onReturnDeviceInfo(deviceInfo: DeviceInfo)
fun onError(error: Exception)
}

To just get the DeviceInfo with these two classes is really hard to do cleanly and safely. We could end up going down the road of a super messy, easily misused, non thread safe implementation where our listener implementation saves each event to some public shared mutable variable as below:

//Naive implementation
class
MyListener : Listener {

var deviceInfo: DeviceInfo? = null
var error
: Exception? = null

override fun
onReturnDeviceInfo(deviceInfo: DeviceInfo) {
this.deviceInfo = deviceInfo
}

override fun onError(error: Exception) {
this.error = error
}
}

Then the code to get the device info after querying it could look something like this:

//Naive implementation 
fun getDeviceInfo(): DeviceInfo {
controller.getDeviceInfo() // returns nothing
while (listener.deviceInfo == null && listener.error == null) {
//could be an infinite loop if deviceInfo is never populated
Thread.sleep(100) //blocks the thread
}
val returnValue = listener.deviceInfo
//nulling the value so that it waits for a new value next time
listener.deviceInfo = null
listener
.error = null
return returnValue
}

Not only are we blocking the thread, and potentially getting into an infinite loop if the deviceInfo is never returned, but we have to null out the listeners values so that it will wait for a new value on subsequent calls. As this implementation is using shared mutable variables to communicate across threads, to make this thread safe would require a whole bunch of synchronisation and knowledge of the subtleties of the JVM memory model.

Kotlin Coroutines to the rescue

Clearly we needed to write some sort of wrapper or extension to clean this API up. There are several ways to do this, but I don’t think any would result in a safer and cleaner API than using Kotlin Coroutines.

API Extension Goals

I wanted to get to a nice clean API where:

  1. The result of the query is clearly returned by the function itself
  2. The caller of the function can control the asynchrony
  3. The implementation doesn’t block the thread
  4. Its easy to use, and hard to misuse
  5. You know that one call has completed before starting another
// Our target interface
interface
ControllerExtension {
suspend fun getDeviceInfo(): DeviceInfo
}

Notice the suspend modifier on the function, which is a key concept in coroutines. The suspend modifier indicates that the function can suspend the execution of a coroutine and resume at a later time. They can execute a long running operation and wait for it to complete without blocking any thread. To achieve this the compiler adds a hidden Continuation parameter to the function when you use the suspend modifier. This continuation parameter is then used to resume execution of the coroutine when the suspend function has completed, using a form of continuation passing style. See a better description of this by Roman Elizarov at KotlinConf here: https://youtu.be/YrrUCSi72E8?t=177

Continuation Passing Style basically means that where a function would normally return, there is instead a function call to a callback that contains the rest of the program execution.

But how to implement this interface? At a high level, what we want to do is:

  1. Start listening for the event/error
  2. Trigger the event
  3. Suspend until event/error is returned

To do this we will need some asynchronous communication between coroutines, as the event/error in step one, won’t be received until we trigger the event in step two.

Communicating Between Coroutines

In our naive implementation, we communicated outwards from from our Listener using public mutable, nullable variables. Of course this is asking for trouble. To create a thread safe implementation that is easier to write and use correctly, we can take some advice from go:

Do not communicate by sharing memory; instead, share memory by communicating.

So how can we communicate events asynchronously and safely from our Listener implementation? We can write a Listener implementation that will communicate events outwards via Kotlin Channels:

A Channel is conceptually very similar to BlockingQueue. One key difference is that instead of a blocking put operation it has a suspending send, and instead of a blocking take operation it has a suspending receive.

API Extension Implementation

Our ControllerExtension is the only public interface to the rest of our application. It contains the original Controller provided with the hardware, which can be used to send commands to the physical reader. The ControllerExtension also contains the ListenerExtension. The ListenerExtension receives events from the physical card reader. Each event that the ListenerExtension receives, it sends it to its relevant Channel. The ControllerExtension can then receive events from these channels. The ControllerExtension can then trigger an event such as getDeviceInfo() and then receive the resulting event from the relevant ListenerExtension channel. This is illustrated in the sequence diagram and data flow diagram below.

Sequence diagram showing getDeviceInfo happy path
Data flow diagram showing channels for communication between listener and controller

Our ListenerExtension implementation is shown below. We have a public Channel for each event type, such as the deviceInfoChannel shown below. We also have an exception Channel shared by all event types. You can either get an event or an error after each command.

class ListenerExtension() : Listener {

val exceptionChannel = Channel<Exception>(UNLIMITED)
val deviceInfoChannel = Channel<DeviceInfo>(UNLIMITED)

override fun onReturnDeviceInfo(deviceInfo: DeviceInfo) {
runBlocking { deviceInfoChannel.send(deviceInfo) }
}

override fun onError(error: Exception) {
runBlocking { exceptionChannel.send(error) }
}
}

Then, in our ControllerExtension implementation of getDeviceInfo() below, we immediately call a function called triggerEventAndGetResult() passing in three parameters:

  1. A lambda function that will trigger the event when invoked.
  2. The channel that the subsequent event should be received on.
  3. The channel that the subsequent error could be received on.

It’s clear what’s triggering the event, where the event is being received from, and where the error is being received from. As receiving from a channel is a suspend operation, it will suspend rather than block when waiting for the event from the channel, therefore never blocking the thread waiting for I/O.

class ControllerExtension(
private val controller: OriginalController,
private val listener: ListenerExtension
) {
override suspend fun getDeviceInfo(): DeviceInfo {
return triggerEventAndGetResult(
eventTrigger = { controller.getDeviceInfo() },
eventReceiver = listener.deviceInfoChannel,
errorReceiver = listener.exceptionChannel,
)
}
}

If an error occurs, an event will be sent to the error channel instead of the deviceInfo channel. So how do we deal with this?

Kotlin provides a really nice way to select an event from one of multiple channels with the Select expression.

The Select expression makes it possible to await multiple suspending functions simultaneously and select the first one that becomes available.

suspend fun <T : Any> triggerEventAndGetResult(
eventTrigger: () -> Unit,
eventReceiver: ReceiveChannel<T>,
errorReceiver: ReceiveChannel<Exception>,
): T = coroutineScope {
val
deferredEvent = async<T> {
select {
errorReceiver.onReceive { error ->
throw
error
}
eventReceiver.onReceive { event ->
event
}
}
}
eventTrigger()
return@coroutineScope deferredEvent.await()
}

Contained in our triggerEventAndGetResult() function, we have the core logic to:

  1. Start listening on the event and error channels asynchronously
  2. Trigger the event passed in in the lambda parameter
  3. Suspend waiting to receive the event/error
  4. Resume execution when the event is available, and return the event

The coroutineScope block inherits the coroutine scope from the caller of the function, but it creates a new Job for that block, so that it can be cancelled independently of the callers coroutine scope. If an exception happens within this coroutineScope block, it will not cancel the callers scope.

Moving away from throwing exceptions

While using exceptions can make your calling code look clean, when you actually have to catch and handle exceptions, things get really messy. This is why, after rolling this code out, we moved away from throwing exceptions.

Exceptions should only be used for fatal things that you can’t recover from. Avoid throwing and catching exceptions, this is comparable to using goto statements for control flow. Instead return an object that encapsulates the success or failure outcomes.

There were a couple of options we looked at. Kotlin’s Result object has some really really nice features, but unfortunately its not ready to be used as a return value of a function. Arrows Either object was another option we looked at, but I found the left and right convention a bit jarring in terms of readability. Creating our own generic result object for success/failure was another option we looked at, but I wanted to avoid this as it felt like reinventing the wheel. In the end the best option was to use domain specific return objects using Kotlin sealed classes.

We changed every controller command to return a generic result object:

sealed class ControllerResult<R> {
data class Success<R>(val result: R) : ControllerResult<R>()
data class Failure<R>(val error: Error) : ControllerResult<R>()
}

We could then map this to the domain specific result object as needed.

sealed class DeviceInfoResult {
data class Success(val deviceInfo: DeviceInfo) : DeviceInfoResult()
data class Failure(val error: Error) : DeviceInfoResult()
data class Timeout(val error: Error) : DeviceInfoResult()
}

While this leads to a bit more code, and some very similar looking classes, it doesn’t sacrifice on readability, and allows each result object to evolve independently with readability never compromised. Theres an excellent write up on the Kotlin Result object, its limitations and the other alternatives here: https://github.com/Kotlin/KEEP/blob/master/proposals/stdlib/result.md

Conclusion

By applying this same pattern to all the other functions in the API, we end up with an API that’s really clean and easy to use correctly. The code looks the same as sequential code, but it doesn’t block any thread when waiting, but rather suspends as you can see from the suspend point markers in the left gutter in Android Studio. You know that one call has completed successfully before starting the next, and any exceptions are thrown and can be handled normally.

Notice the suspend point markers on the left in the Android Studio gutter.

I hope you found this post useful. In a follow on post I’ll discuss some of the key lessons we learned along the way!

--

--