RxJava Gotchas–Part 2

Allan Yoshio Hasegawa
4 min readJul 11, 2017

It’s a lot easier to handle async code with RxJava, specially on Android where we want expensive computations out of the main thread. However, RxJava does have some “gotchas” that may introduce bugs to your code. Here’s a small list:

Ps.: You can find Part 1 and the auxiliary code in this link.

4. Do work on subscription or on creation?

Let’s start with an exercise. Can you tell the difference between these two Observable?

val first = Observable.just(expensiveComputation())
.doOnSubscribe { println("first subscribed") }
.subscribeOn(computation)

val second = Observable.fromCallable { expensiveComputation() }
.doOnSubscribe { println("second subscribed") }
.subscribeOn(computation)

Spoilers. The first one is doing the expensiveComputation during the creation of the Observable, while the second one is doing it when the Observable is subscribed to. Also, the first one is blocking the thread while creating the Observable, while the second one is not.

Now, what’s the correct option? Well, if we are building an Observable to do an async job to return the result of a computation, as in the example, then we are talking about a Cold Observable. In that case, we can think of the Observable as a plan. When we create our Observable, we are creating a plan describing how to transform our data. When we subscribe to that Observable, we put the plan into action.

In that sense, if we were to create an Observable and never subscribe to it, it would do zero work. So, in the code above, the first option is wrong!

But, the first option is valid for when you want to pass a cached value, for example. Just keep in mind that both options exist and how they behave.

A more subtle example

Firebase has an async method to insert values to a database: “DatabaseReference::setValue(object)”; and it returns a Task<Void>.

This Task let us add a callback to be notified when it completes. Here’s how we can wrap a Completable around it:

fun <T> Task<T>.observeCompletion(): Completable {
return Completable.create { subscriber ->
this.addOnCompleteListener {
if (it.exception != null) {
subscriber.onError(it.exception)
} else {
subscriber.onCompleted()
}
}
}
}

Using the code above, let’s build a Single to put values into our database:

fun addUser(db: DatabaseReference, user: User): Single<User> {
return db.setValue(user).observeCompletion().toSingle { user }
}

This code works! But, can you spot any issue with it?

Spoilers. The code is adding an user to the database when the Single is created. This means that if you create the Single and never subscribe to it, the user will still be added to the database. This may not be what you want. See this example:

val userAObs = addUser(db, userA).toObservable()
val userBObs = addUser(db, userB).toObservable()
val obs = Observable.concat(userAObs, userBObs)
.doOnNext { /* doSomething */ }
.subscribe()

In this example, the expected behavior is for the first user to be added to the database, and, when it succeeds, the second user is added. However, because of our buggy implementation from earlier, both users will be added in parallel, and the second one will be added even if the first one fails.

5. Avoid using the Observable’s error channel

An Observable has three channels: Complete, Events and Error. The Complete and Error channels are for terminal events. This means that once an Observable receives a signal in one of those channels, it terminates the Observable’s stream (hopefully).

Because of this drastic behavior, you should avoid using these terminal events. Use the Error channel only for catastrophic failure or unexpected errors. For all else, specially control flow, use the Events channels.

If you are using Kotlin, a good option is to use sealed classes:

sealed class Result {
data class Success(val value: Foo): Result()
data class Failure(val cause: Throwable): Result()
}
Observable.fromCallable {
try {
val value = doSomething()
Result.Success(value)
} catch (e: Exception) {
Result.Failure(e)
}
}

Then, you can use the Result.Failure to control the flow of your Observable’s stream without having it crash completely.

The gotcha around this is that the error can be propagated to the rest of the system and break all sorts of things. Here’s an example with a Subject:

val subject = PublishSubject.create<Int>()

Observable
.timer(100, TimeUnit.MILLISECONDS)
.flatMap { Observable.range(0, 5) }
.subscribe(subject)

Observable
.timer(10, TimeUnit.MILLISECONDS)
.flatMap {
Observable.range(0, 2)
.map {
when (it == 1) {
true -> throw RuntimeException("Ops")
else -> it
}
}
}
.subscribe(subject)

subject.nonBusyWait { }

And the result:

Subscriber: onNext 0, in RxComputationScheduler-2
Subscriber: onError java.lang.RuntimeException: Ops, in RxComputationScheduler-2
Sub isUnsubscribed: true

Sometimes we use a Subject to extend the lifetime of a stream, but in this case an exception is thrown, forcing the Subject stream to terminate. By doing this, we will no longer receive events from that Subject.

6. Some operators will make your code parallel

Sometimes it’s beneficial to run your code in parallel, but not always. Here’s an example that will break:

var count = 0
val obs = (1..1000).map {
Observable.fromCallable { count++ }
.subscribeOn(io)
}
Observable.merge(obs)
.doOnCompleted { println("Result: $count") }
.nonBusyWait { }

And the output is:

Result: 993
Sub isUnsubscribed: true

The problem with that code is that it’s not thread-safe. So, operators like merge , zip and combineLatest will run your code in parallel; and extra care must be taken when using them.

Another example of a bad use of parallel code is when accessing the SQLite database on Android. Even if you run a thread-safe code, you can run into issues with the limit of opened cursors you can have at a certain point. Or out-of-memory issues. Be sure to use the parallel operators with care 👍

Last words

All the examples talked in this blog post can introduce issues in your app. Even when you already know about them. But it’s always a good idea to keep them in mind when debugging your RxJava streams.

So, try to keep those things in mind so at least when the problem arises you’ll know where to pay attention 😉

Ps.: Part 3 is in this link: https://medium.com/@AllanHasegawa/rxjava-gotchas-part-3-eb7763046dd

--

--