Illustration by Virginia Poltrak

Lessons learnt using Coroutines Flow in the Android Dev Summit 2019 app

Manuel Vivo
Nov 26 · 8 min read

This article is about the best practices we found when using Flow in the Android Dev Summit (ADS) 2019 app; which has just been open sourced. Keep reading to find out how each layer of our app handles data streams.

The ADS app architecture follows the recommended app architecture guide, with the addition of a domain layer (of UseCases) which help separate concerns, keeping classes small, focused, reusable and testable:

Architecture of the ADS 2019 app

Like many Android apps the ADS app lazily loads data from the network or a cache; we found this to be a perfect use case for Flow. For one shot operations, suspend functions were a better fit. There are two main commits that refactor the app to use Coroutines. The first commit migrates one-shot operations, and the second one migrates to data streams.

In this article, you can find the principles we followed to refactor the app from using LiveData in all the layers of the architecture to just use LiveData for communication between View and ViewModel, and Coroutines for the UseCase and lower layers of our architecture.

1. Prefer exposing streams as Flows (not Channels)

There are two ways you can deal with streams of data in coroutines: the Flow API and the Channel API. Channels are a synchronisation primitive whereas Flow is built to model streams of data: it’s a factory for subscriptions to streams of data. Channels can however be used to back a Flow, as we’ll see later.

Flows automatically close the stream of data due to the nature of the terminal operators which trigger the execution of the stream of data and complete successfully or exceptionally depending on all the flow operations in the producer side. Therefore, you can’t (nearly as easily) leak resources on the producer side. This is easier to do with Channels: the producer might not clean up heavy resources if the Channel is not closed properly.

The data layer of an app is responsible for providing data usually by reading from a database or fetching from the Internet. For example here’s a DataSource interface that exposes a stream of user event data:

interface UserEventDataSource {
fun getObservableUserEvent(userId: String): Flow<UserEventResult>
}

2. How to use Flow in your Android app architecture

UseCase and Repository

The layers in-between View/ViewModel and the DataSource (i.e. UseCase and Repository in our case) often need to combine data from multiple queries or transform the data before it can be used by the ViewModel layer. Just like Kotlin sequences, Flow supports a large set of operators to transform your data. There are a wealth of operators already available, or you can create your own transformation (e.g. using the transform operator). However, Flow exposes suspend lambdas on many of the operators, there’s often no need to make a custom transform to accomplish complex tasks, just call suspend functions from inside your Flow.

In our ADS example, we want to combine the UserEventResult with session data in the Repository layer. We use the map operator to apply a suspend lambda to each value of the Flow retrieved from DataSource:

ViewModel

When performing UI ↔ ViewModel communication with LiveData, the ViewModel layer should consume the stream of data coming from the data layer using a terminal operator (e.g. collect, first or toList).

See full code here.

If you’re converting a Flow to a LiveData, you can use the Flow.asLiveData() extension function from the androidX lifecycle LiveData ktx library. This is very convenient since it will share a single underlying subscription to the Flow and will manage the subscription based on the observers’ lifecycles. Moreover, LiveData also keeps the most recent value for late-coming observers and the subscription active across configuration changes. Check this simpler code that showcases how you can use the extension function:

class SimplifiedSessionDetailViewModel(
private val loadUserSessionUseCase: LoadUserSessionUseCase,
...
): ViewModel() {
val sessions = loadUserSessionUseCase(sessionId).asLiveData()}

Disclaimer: The code snippet above is not part of the app; it’s a simplified version of the code that showcases how you can use Flow.asLiveData().

3. When to use a BroadcastChannel or Flow as an implementation detail

Back to the DataSource implementation, how can we implement the getObservableUserEvent function we exposed above? The team considered two alternatives implementations: the flow builder or theBroadcastChannel API. Each serve different use cases.

When to use Flow

Flow is a cold stream. A cold stream is a data source whose producer will execute for each listener that starts consuming events, resulting in a new stream of data being created on each subscription. Once the consumer stops listening or the producer block finishes, the stream of data will be closed automatically.

You can emit a limited or unlimited number of elements using the flow builder.

val oneElementFlow: Flow<Int> = flow {
// producer block starts here, stream starts
emit(1)
// producer block finishes here, stream will be closed
}
val unlimitedElementFlow: Flow<Int> = flow {
// producer block starts here, stream starts
while(true) {
// Do calculations
emit(result)
delay(100)
}
// producer block finishes here, stream will be closed
}

Flow tends to be used for expensive tasks as it provides automatic cleanup via coroutine cancellation. Notice that this cancellation is cooperative, a flow that never suspends can never be cancelled: in our example, since delay is a suspend function that checks for cancellation, when the subscriber stops listening, the Flow will stop and cleanup resources.

When to use BroadcastChannel

A Channel is a concurrency primitive for communicating between coroutines. A BroadcastChannel is an implementation of Channel with multicast capabilities.

There are some cases where you might want to use an implementation of BroadcastChannel in your DataSource layer:

The BroadcastChannel API is the perfect fit when you want the producer to follow a different lifecycle and broadcast the current result to anyone who’s listening. In this way, the producer doesn’t need to start every time a new listener starts consuming events.

You can still expose a Flow to the caller, they don’t need to know about how this is implemented. You can use the extension function BroadcastChannel.asFlow() to expose a BroadcastChannel as a Flow.

However, closing that Flow won’t cancel the subscription. When using BroadcastChannel, you have to take care of its lifecycle. They don’t know if there are listeners or not, and will keep resources alive until the BroadcastChannel is cancelled or closed. Make sure to close the BroadcastChannel when it’s no longer needed. Also, remember that a closed channel cannot be active again, you’d need to create a new instance.

An example of how to use the BroadcastChannel API can be found in the next section.

Disclaimer

Parts of the Flow and Channel APIs are still in experimental, they’re likely to change. There are some situations where you would currently use Channels but the recommendation in the future may change to use Flow. Specifically, the StateFlow and Flow’s share operator proposals may reduce the usage of Channel in the future.

4. Convert data streams callback-based APIs to Coroutines

Multiple libraries already support coroutines for data streams operations, including Room. For those that don’t, you can convert any callback-based API to Coroutines.

Flow implementation

If you want to convert a stream callback-based API to use Flow, you can use the channelFlow function (also callbackFlow, which shares the same implementation). channelFlow creates an instance of a Flow whose elements are sent to a Channel. This allows us to provide elements running in a different context or concurrently.

In the following sample, we want to emit the elements that we get from a callback into a Flow:

  1. Create a flow with the channelFlow builder that registers a callback to a third party library.
  2. Emit all items received from the callback to the Flow.
  3. When the subscriber stops listening, we unregister the subscription to the API using the suspend fun awaitClose.

See full code here.

BroadcastChannel implementation

For our stream of data that tracks user authentication with Firestore, we used the BroadcastChannel API as we want to register one Authentication listener that follows a different lifecycle and broadcasts the current result to anyone who’s listening.

To convert a callback API to BroadcastChannel you need a bit more code than with Flow. You can create a class where the instance of the BroadcastChannel can be kept in a variable. During initialisation, register the callback that sends elements to the BroadcastChannel as before:

See full code here.

5. Testing tips

To test Flow transformations (as we do in the UseCase and Repository layers), you can use the flow builder to return fake data. For example:

/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
object FakeUserEventDataSource : UserEventDataSource {
override fun getObservableUserEvents(userId: String) = flow {
emit(UserEventsResult(userEvents))
}
}
class DefaultSessionAndUserEventRepositoryTest {
@Test
fun observableUserEvents_areMappedCorrectly() = runBlockingTest {
// Prepare repo
val userEvents = repository
.getObservableUserEvents("user", true).first()
// Assert user events
}
}

To test implementations of Flow successfully, a good idea is to use the take operator to get some items from the Flow and the toList operator as the terminal operator to get the results in a list. See an example of this in the following test:

class AnotherStreamDataSourceImplTest {
@Test
fun `Test happy path`() = runBlockingTest {
// Prepare subject
val result = subject.flow.take(1).toList()
// Assert expected result
}
}

The take operator is a great fit to close the Flow after you get the items. Not closing a started Flow (or BroadcastChannel) after each test will leak memory and creates a flaky and inconsistent test suite.

Note: If the implementation of the DataSource is done with a BroadcastChannel, the code above is not enough. You have to manage its lifecycle by making sure you start the BroadcastChannel before the test and close it after the test finishes. If not, you’ll leak memory. You can see a test like this in this other Flow sample.

Testing Coroutines best practices also apply here. If you create a new coroutine in code under test, you might want to execute it in your test thread for a deterministic execution of your test. Check out more about this in the Testing Coroutines ADS 2019 talk.


Summary

  • Prefer exposing Flow to consumers rather than Channel because of all the explicit contracts and operators Flow provides.
  • With Flow, the producer block will get executed every time there’s a new listener and the lifecycle of the stream of data will be handled automatically.
  • With BroadcastChannel, you can share the producer but you have to manage its lifecycle yourself.
  • Consider converting callback-based APIs to coroutines for a better and more idiomatic integration of the APIs within your app.
  • Easily test implementations of Flow by using the take and toList operators.

Android Developers

The official Android Developers publication on Medium

Manuel Vivo

Written by

Android DevRel @ Google

Android Developers

The official Android Developers publication on Medium

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade