Asynchronous development in Android: RxJava Vs. Kotlin Flow

Paolo Rovelli
Making Tuenti
Published in
7 min readDec 27, 2019

Lately, a lot of exciting things are going on in the Android world.
Surely, the most eye-catching one is Jetpack Compose. However, personally, I feel that Kotlin Flow is as interesting as the previous one and could potentially become a killer-feature for asynchronous development (not only in Android, but in the Kotlin ecosystem as a whole).

Kotlin already had an existing way to handle streams of data in coroutines, which is the Channels API. However, Channels represent a hot stream of data and therefore are only fit for data sources that should always exist, even when the app is not requesting for them (e.g. incoming network connections, event streams, …). On the other hand, the recently released Flow API represents a cold stream of data and therefore is fit where Channels are not.

“Talk is cheap. Show me the code.” (Linus Torvalds)

Let’s imagine that we want to write a contacts app.

(!) DISCLAIMER (!)
There are literally thousands of ways to write the same thing, and I would not define myself as an expert in neither RxJava nor Kotlin Flow.
Therefore, even if I will try to be as impartial as possible, there might be some innocent mistakes. Sorry in advance if that is the case. :)

Dependencies

First of all, let’s import some dependencies to play with:

// Android
implementation "androidx.core:core-ktx:1.1.0"
implementation "androidx.lifecycle:lifecycle-extensions:2.1.0"
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0"
testImplementation "androidx.arch.core:core-testing:2.1.0"
// RxJava
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
implementation "io.reactivex.rxjava2:rxkotlin:2.4.0"
// Coroutines
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.2"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.2"
// JUnit + Mockito
testImplementation "junit:junit:$junit_version"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"

Repository

Now, let’s create the first piece of our app: a repository.
This is needed to encapsulate how we will retrieve and persist contacts in our app. For the sake of simplicity, we will just query a fake data source and go through each contact to emit it in an observable. We will neither merge contacts from different data sources (e.g. local and cloud contacts) nor map them from the data model to a domain model.

interface DataSource {
fun getAll(): List<Contact>
}

RxJava

Now, using RxJava, we would probably do something like the following.

Contact respository with RxJava
Testing contact respository with RxJava

RxJava offers a really powerful and complete API, including several built-in builders to create and combine different types of observables. Its API is so exhaustive that, at first, might feel a bit overwhelming.
But, as it is often the case, power comes with a price.

Neither the RxJava implementation nor the tests are totally straightforward, at least if you don’t have some rudimentary knowledge of reactive streams.

In particular, in the implementation, we need to explicitly wrap the data source call in a try-catch to be able to emit exactly once either the onComplete(), to signal the stream completion, or the onError().
In the tests, we need to call the test() to create a TestObserver and subscribe it to the observable, as well as the dispose() to cancel the TestObserver.

Kotlin Flow

On the other hand, using Kotlin Flow instead of RxJava, we would probably do something like the following.

Contact repository with Kotlin Flow
Testing contact repository with Kotlin Flow

Kotlin Flow offers a simpler but extensible API, including several already available extension functions. Its API is so “Kotlinic” that feels well-integrated in the language.

As you can see, in the implementation, the Observable.create { emitter -> ... } builder is replaced by the flow { ... } one and we can get rid of the explicit try-catch. Flow will automatically close the stream of data for us and, if the data source throws an exception, this will be passed to the upper layers.
Testing also seems more straightforward, or at least more coherent with the Kotlin standard library. Indeed, in the first test case we can simply convert the Flow in a List with the toList() extension function, while in the second one we can actually leverage the @Test(expected=...) JUnit annotation.

That said, even with Kotlin Flow there is a bit of “magic”. Coroutines “magic”.

In particular, we need to run the tests using the experimental runBlockingTest builder, and therefore we need to mark them all as @ExperimentalCoroutinesApi. This is is needed because all Flow operators are suspend functions and thus must be called either from a coroutine or another suspend function.
Note that, to run a coroutine while blocking the current thread until it’s completed, we could have actually used the standard runBlocking builder. However, the advantage of using runBlockingTest over runBlocking is that the former will immediately progress past delays.

UseCase

Then, let’s create the second piece of our app: a use case.
This is needed to encapsulate the business logic that we will apply over the retrieved contacts. For the sake of simplicity, we will just call our repository straightforward. We will neither perform any business logic nor combine data from multiple repositories.

Search contacts use case with RxJava
Search contacts use case with Kotlin Flow

As you can see, the two implementations are nearly identical.

ViewModel

Finally, let’s create the last piece of our app: a ViewModel.
This is needed to encapsulate the presentation logic that we will apply over the retrieved contacts. For the sake of simplicity, we will just launch our use case and update the list of contacts.

RxJava

Let's start also this time with the RxJava version.

Contacts ViewModel with RxJava
Testing contacts ViewModel with RxJava

Again, we can see all the power and potential of RxJava together with its intrinsic complexity.

In particular, in the implementation, we explicitly subscribe to the stream on the Schedulers.io() and observe on the AndroidSchedulers.mainThread(), which is needed to execute actions on the Android UI thread.
In the tests, we need to use the RxAndroidPlugins to set the main thread scheduler handler, which in this case is set to Schedulers.trampoline().

That said, besides a bit of reactive “magic”, both the RxJava implementation and its tests are quite easy to follow.

Kotlin Flow

In this case, if we replace RxJava with Kotlin Flow, we would just need to change the loadContacts() method of our ViewModel.

Contacts ViewModel with Kotlin Flow
Testing contacts ViewModel with Kotlin Flow

Kotlin Flow has transparent back-pressure management, thanks to the use of Kotlin suspending functions. Indeed, as said, all Flow operators are suspend functions.

But, alas, also Kotlin Flow has its own complexity.

In particular, in the implementation, we launch the use case inside the viewModelScope so that the coroutines context is inherited from the ViewModel and it will be canceled when ViewModel is cleared.

NOTE: using viewModelScope requires the Lifecycle ViewModel KTX library with version 2.1.0 or higher.

Furthermore, it's important to note that by default the streams in Flow are executed sequentially, in a single coroutine.

Data flow without buffering

To execute them in parallel, running the collector in a separate coroutine, we need to use the experimental buffer() operator. This way, as soon as the repository has computed the contact and given it to the ViewModel, it can immediately start producing the next one and the ViewModel works on previous contact in parallel.

Data flow with buffering

However, since the buffer() operator is still marked as experimental, we need to mark the entire loadContacts() method as @ExperimentalCoroutinesApi.

Finally, in the tests, we need to set up and tear down the test dispatcher for the main thread.

That said, besides a bit of coroutines “magic”, also the Flow implementation and its tests are quite easy to follow.

Kotlin Flow (alternative syntax)

If you prefer, in Flow, there is also an alternative fluent-style syntax which uses the experimental onEach(), catch() and launchIn() operators.

And there is also an experimental asLiveData() extension function to automatically convert a Flow into a LiveData.

Asynchronous programming is challenging in any programming language, but both RxJava and Kotlin Flow do an awesome job to simplify it.

Both RxJava and Kotlin Flow are great choices for asynchronous development. Only you can decide which one is the right one for you and your project.

RxJava is and still remains a perfectly valid approach for asynchronous development. It is stable, really powerful and especially recommended for complex use cases. However, it also has a steep learning curve.

On the other hand, Kotlin Flow has a simpler but extensible API, automatic streams cleanup and transparent back-pressure management.
According to JetBrains, it also offers better performance.

And, if you’re planning to migrate your app from RxJava to Kotlin Flow, it might be useful to know that the two are actually interoperable (for more details see the asFlow() and asPublisher() extension functions).

All that said, for my personal taste, Kotlin Flow still requires too many @ExperimentalCoroutinesApi annotations to be considered for production. But, with some luck, most of them should go away in the next few months. Stay tuned! :)

--

--