If you’re a library author, you might want to make your Java-based or callback-based libraries easier to consume from Kotlin using coroutines and Flow. Alternatively, if you’re an API consumer, you may be willing to adapt a 3rd party API surface to coroutines to make them more Kotlin friendly.
This article covers how to simplify APIs using coroutines and Flow as well as how to build your own adapter using
callbackFlow APIs. For the most curious ones, those APIs will be dissected and you’ll see how they work under the hood.
If you prefer to watch a video about this topic. Check this one out:
Check existing coroutine adapters
Before writing your own wrappers for existing APIs, check if an adapter or extension function is available for your use case. There are existing libraries with coroutine adapters for common types.
// Awaits completion of CompletionStage without blocking a thread
suspend fun <T> CompletionStage<T>.await(): T // Awaits completion of ListenableFuture without blocking a thread
suspend fun <T> ListenableFuture<T>.await(): T
With these functions, you can get rid of callbacks and just suspend the coroutine until the future result comes back.
// Transforms the given reactive Publisher into Flow.
fun <T : Any> Publisher<T>.asFlow(): Flow<T>
These functions convert a reactive stream into Flow.
Android specific APIs
For Jetpack libraries or Android platform APIs, take a look at the Jetpack KTX libraries list. Currently, more than 20 libraries have a KTX version, creating sweet idiomatic versions of Java APIs, ranging from SharedPreferences to ViewModels, SQLite and even Play Core.
Callbacks are a very common solution for asynchronous communication. In fact, we use them for the Java programming language solution in the Running tasks in background thread guide. However, they come with some drawbacks: this design leads to nested callbacks which ends up in incomprehensible code. Also, error handling is more complicated as there isn’t an easy way to propagate them.
In Kotlin, you can simplify calling callbacks using coroutines, but for that, you’ll need to build your own adapter.
Build your own adapter
If you don’t find an adapter for your use case, it’s usually quite straightforward to write your own. For one-shot async calls, use the
suspendCancellableCoroutine API. For streaming data, use the
As an exercise, the following examples will use the Fused Location Provider API from Google Play Services to get location data. The API surface is simple but it uses callbacks to perform async operations. With coroutines, we can get rid of those callbacks that can quickly make our code unreadable when the logic gets complicated.
In case you want to explore other solutions, you can get inspiration from the source code of all the functions linked above.
One-shot async calls
We can have a better API by creating an extension function on
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location
As this is a one-shot async operation, we use the
suspendCancellableCoroutine function: a low-level building block for creating suspending functions from the coroutines library.
suspendCancellableCoroutine executes the block of code passed to it as a parameter, then suspends the coroutine execution while waiting for the signal to continue. The coroutine will resume executing when the
resumeWithException method is called in the coroutine’s Continuation object. For more information about continuations, check out the suspend modifier under the hood article.
We use the callbacks that can be added to the
getLastLocation method to resume the coroutine appropriately. See the implementation below:
Note: Although you will also find a non-cancellable version of this coroutine builder in the coroutines library (i.e.
suspendCoroutine), it is preferable to always choose
suspendCancellableCoroutine to handle cancellation of the coroutine scope, or to propagate cancellation from the underlying API.
suspendCancellableCoroutine under the hood
suspendCoroutineUninterceptedOrReturn to get the
Continuation of the coroutine inside a suspend function. That
Continuation object is intercepted by a
CancellableContinuation that will control the lifecycle of that coroutine from that point (its implementation has the functionality of a
Job with some restrictions).
After that, the lambda passed to
suspendCancellableCoroutine will be executed and the coroutine will either resume immediately if the lambda returns a result or will be suspended until the
CancellableContinuation is resumed manually from the lambda.
See my own comments in the following code snippet (following the original implementation) to understand what’s happening:
To know more about how suspend functions work under the hood, check out the suspend modifier under the hood article.
If instead we wanted to receive periodic location updates (using the
requestLocationUpdates function) whenever the user’s device moves in the real world, we’d need to create a stream of data using Flow. The ideal API would look like this:
fun FusedLocationProviderClient.locationFlow(): Flow<Location>
To convert streaming callback-based APIs to Flow, use the
callbackFlow flow builder that creates a new flow. In the
callbackFlow lambda, we’re in the context of a coroutine, therefore, suspend functions can be called. Unlike the
flow flow builder,
channelFlow allows values to be emitted from a different
CoroutineContext or outside a coroutine, with the
Normally, flow adapters using
callbackFlow follow these three generic steps:
- Create the callback that adds elements into the flow using
- Register the callback.
- Wait for the consumer to cancel the coroutine and unregister the callback.
Applying this recipe to this use case, we get the following implementation:
callbackFlow under the hood
callbackFlow uses a channel, which is conceptually very similar to a blocking queue. A channel is configured with a
capacity: the number of elements that can be buffered. The channel created in
callbackFlow has the default capacity of 64 elements. When adding a new element to an already full channel,
send will suspend the producer until there’s space for the new element in the channel whereas
offer won’t add the element to the channel and will return
awaitClose under the hood
suspendCancellableCoroutine under the hood. See my own comments in the following code snippet (following the original implementation) to understand what’s happening:
Reusing the Flow
Flows are cold and lazy unless specified otherwise with intermediate operators such as
conflate. This means that the builder block will be executed each time a terminal operator is called on the flow. This might not be a huge problem in our case as adding new location listeners is cheap, however, it might make a difference in other implementations.
shareIn intermediate operator to reuse the same flow across multiple collectors and make the cold flow hot.
To learn more about best practices for adding an
applicationScope to your app, check out this article.
Consider creating coroutine adapters to make your APIs or existing APIs concise, readable and Kotlin idiomatic. First check if the adapter is already available and if not, create your own using
suspendCancellableCoroutine for one-shot calls and
callbackFlow for streaming data.
To get hands-on this topic, check out the Building a Kotlin extensions library codelab.