How we migrated from RxJava1 to RxJava2

RxJava1 → RxJava2

To start 2018 on a good note, we, at Lifesum, decided to take some time to finally do the migration from RxJava1 to RxJava2 in our Android app.

For those who don’t know what RxJava is, here is a small overview:

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures

Basically it allows us to do multi threading very easily on Android and also compose different series of events (a user clicks 4 times on an item, exponential backoff for I/O failures..) and make our code much cleaner, readable and robust.

Even though our app was working fine with Rx1 we decided that moving forward to the version 2 was something we had to do for various reasons. The development of Rx1 is no longer supported, which means every new issue (if any) won’t be addressed. Also the community behind Rx1 is going to get lower and lower while Rx2 examples and articles are going to be more abundant.

The RxJava repo states:

Timeline plans for the 1.x line:
June 1, 2017 — feature freeze (no new operators), only bugfixes
March 31, 2018 — end of life, no further development

Main differences between Rx1 and Rx2

There is a nice wiki on the RxJava repo that explains all the differences between the version 1 and version 2. RxJava2 is not harder than the first version to grasp, if you have experience with Rx1 you’ll be comfortable with the newer version pretty quickly. The biggest differences for us were the following.

No more nulls in the streams. This means we had to go through every Observable we were using and figure out if we were passing null into it at some point or not and find an alternative. Some of our code is in Kotlin so we could see this right away with nullables types.


New primitive types. The Observable type is still present from RxJava1 but new types comes into play to make the streams more meaningful on what they’re supposed to emit.

  • Flowable — Basically an Observable but with backpressure handling. This is useful when you need a stream that is susceptible to emit a large amount of items and you’re not sure the consumer will be able to process them as quickly as the emitter sends them.
  • Single —This is an Observable that only emits one value onSuccess(value) or emit an error onError(error). This is useful so you know your stream will emit either only one value (your result) or issue an error. We used this one for most of our network calls for example, because when you do a network call you only do one request and get one answer (at least in HTTP/1.1).
  • Completable — This is an Observable that only completes (without a value) onComplete() or emit an error onError(error). This is useful when you have a task that doesn’t return anything but can succeed or fail. For example you can have a saveToDisk(someData) method that doesn’t return any result but you want to get notified of the success or the failure of it, it’s a good candidate.
  • Maybe — This is the type I had a hard time to understand at first because of it’s name. It’s basically a combination of a Single and a Completable. This type can either emit one single value onSuccess(value), complete without emitting a value onComplete() or emit an error onError(error). The important thing to remember is that it can only call one of these three methods.
  • Processors — RxJava1 has this type Subject that is able to both be an Observer (emit items in a stream) and a Subscriber (consume items from the stream). The processors are the backpressure enabled version of the Subjects. The Processors are to Subjects what the Flowable are to Observables.

Different naming conventions. Subscription was renamed to Disposable. Instead of .unsubscribe() you now call .dispose()
Also CompositeSubscription became CompositeDisposable, the same .clear() method is used to dispose all the contained disposable.

The more the merrier

Because our app was using RxJava1 in a lot of places we decided to do the migration gradually
Rx1 and Rx2 have two different package names rx.* and io.reactivex.* this means you can have both versions in one project. This is exactly what we did.

This gave us some flexibility in the sense that the migration didn’t have to happen all at once. We also were using external libraries that were handing us Rx1 Observables so we had to keep that dependency for those cases as well (until we find alternatives to remove those dependencies).

We did the same for RxAndroid and RxBindings and our retrofit call adapter as well:

"io.reactivex.rxjava2:rxjava:${rxJava2Version}"
"io.reactivex.rxjava2:rxandroid:${rxAndroid2Version}"
"com.jakewharton.rxbinding2:rxbinding:${rxBindings2Version}"
"com.squareup.retrofit2:adapter-rxjava2:${retrofitVersion}"

Learnings and tips

The main learning I get from this is the use of the primitive types. They allow you to have a cleaner code and better vision of what your code is doing. Also it ensures correctness of what you’re writing.

For example if you try to apply a .filter() to a Single it will give you a Maybe back and forces you to think of the case where there is no item emitted. This makes sense because if you filter a single value it will either give you the result (pass the filter) or simply completes without the filtered item.

A typical Maybe use case for us was when we wanted to show some cached data to the user but if we didn’t have anything cached, make a network request to get the data. This is how it looked like before:

override fun getContent(): Observable<SomeContent?> {
return Observable.fromCallable(this::loadContentFromPrefs)
.flatMap { cachedContent: SomeContent? ->
when {
cachedContent == null || cachedContent.isEmpty() -> loadContentFromApi()
else -> Observable.just(cachedContent)
}
}
}
private fun loadContentFromPrefs(): SomeContent? {
content = gson.fromJson(prefs.getString(KEY_CONTENT, null), SomeContent::class.java)
     return content
}

Here we’re getting the content from the shared preferences, if it’s null or empty we make a network request otherwise we just emit it. So now our function returns an Observable<SomeContent?> which means we don’t know how many items will be emitted and they could be null (we know they won’t but since loadContentFromPrefs() could emit null into the stream we had to keep the nullable type)

This is the version with RxJava2:

override fun getContent(): Single<SomeContent> {
return loadContentFromPrefs()
.switchIfEmpty(SingleSource {
loadContentFromApi()
});
}
private fun loadContentFromPrefs(): Maybe<SomeContent> {

return Maybe.create({ sub ->
val prefContent = gson.fromJson(prefs.getString(KEY_CONTENT, null), SomeContent::class.java)

if (prefContent != null) {
sub.onSuccess(prefContent)
} else {
sub.onComplete()
}
})
}

This version does exactly the same thing, except we get a Single<SomeContent> as return type so we know this stream will only emit one item (and it won’t be null), or emit an error. The small trick here is the use of the switchIfEmpty() operator, which will “flatmap” the Maybe into a Single in case the Maybe completed without emitting an item. Basically if our Maybe emits an item it will go through, otherwise it will call our loadContentFromApi() Single.

Also working with Maybe there is the flatMapSingleElement() which will act as a “flatmap” on the single element emitted by the Maybe if any.

Safe use of the .create() methods. Prior to Rx2 the .create() methods of RxJava was something we would avoid to use because it wasn’t the right way of creating Observables, we would use .just() , .from() , .fromCallable() most of the time. This is a thing of the past and we can now easily create custom streams with .create().

Unit testing

To be able to do unit testing with RxJava we took the habit of changing threads on where things were executed. Rx2 still offers this possibility, the only difference is that the Schedulers.immediate() no longer exist and is replaced by Schedulers.trampoline() . So when unit testing we are replacing both Schedulers.io() and AndroidSchedulers.mainThread() with Schedulers.trampoline() .

@Before
fun setup() {
// RxJava2 override
RxJavaPlugins.reset();
RxJavaPlugins.setIoSchedulerHandler({ scheduler ->
Schedulers.trampoline()
});
    // RxAndroid override
RxAndroidPlugins.reset();
RxAndroidPlugins.setMainThreadSchedulerHandler({ scheduler ->
Schedulers.trampoline()
});
}

Conclusion

The transition wasn’t as bad as I thought it would be. Most of it was replacing Observable with Single, then making sure we’re using the version 2 of RxBindings and RxAndroid and replacing Subscription with Disposable.

There was some obstacle on the way with nulls in the streams that forced us to make use of the new primitive types, which is a good thing. Now when we look at our function signatures we know how the returned stream is supposed to behave and what type of result is expected (one item, only completion..).

Some more readings:
- https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0
- https://blog.kaush.co/2017/06/21/rxjava1-rxjava2-migration-understanding-changes/
- https://artemzin.com/blog/reply-to-kaushik-gopals-aricle-rxjava-1-rxjava-2-understanding-the-changes/