RxJava Gotchas — Part 1

Allan Yoshio Hasegawa
3 min readJun 21, 2017

--

It’s a lot easier to handle async code with RxJava, specially on Android where we want the expensive computations out of the main thread. However, RxJava does have some “gotchas” that may introduce bugs to your code. Here’s a small list:

Ps.: You can find the auxiliary code in this link.

1. Scheduler after a subscribe on

The operator Observable::subscribeOn is used to declare in which scheduler the Observable should do its job. See the following example:

Observable
.create({ emitter: Emitter<Unit> ->
printThreadName("a")
emitter.onNext(Unit)
emitter.onCompleted()
}, Emitter.BackpressureMode.NONE)
.printThread("b")
.subscribeOn(computation)
.printThread("c")
.map { expensiveComputation() }

Will output:

a, in RxComputationScheduler-1
b, in RxComputationScheduler-1
c, in RxComputationScheduler-1
expensive computation, in RxComputationScheduler-1
Subscriber: onNext kotlin.Unit, in RxComputationScheduler-1

As expected, the work is done in the computation scheduler. However, what would happen if we were to emit items from a different thread?

Observable
.create({ emitter: Emitter<Unit> ->
printThreadName("a")
emitter.onNext(Unit)
androidMainExecutor.submit {
printThreadName("b")
emitter.onNext(Unit)
emitter.onCompleted()
}
}
, Emitter.BackpressureMode.NONE)
.printThread("c")
.subscribeOn(computation)
.printThread("d")
.map { expensiveComputation() }

androidMainExecutor is forcing the emission of the item from the Android main thread. The result:

a, in RxComputationScheduler-2
c, in RxComputationScheduler-2
d, in RxComputationScheduler-2
expensive computation, in RxComputationScheduler-2
Subscriber: onNext kotlin.Unit, in RxComputationScheduler-2
b, in AndroidMain
c, in AndroidMain
d, in AndroidMain
expensive computation, in AndroidMain
Subscriber: onNext kotlin.Unit, in AndroidMain

Oops! We just executed an expensive operation in the main thread! Turns out that Observable::subscribeOn is only useful to define in which scheduler the initial work will be done. There’s no guarantee in which scheduler the stream will continue.

The solution is to always use Observable::observeOn to make sure we are not sending expensive computations to the main thread.

Real world case: I wrote an Rx wrapper around the Firebase callbacks. And using Rx I chained multiples expensive computations. The problem was that Firebase was executing those callbacks in the main thread, thus making the rest of the stream run on the main thread. This resulted in some small frame drops in the app, and it took a while for me to notice it!

2. A Single’s subscription may never end

This is another weird behavior. Some Single’s subscriptions will never end. This means that calling Subscription::isUnsubscribed() will always return false, even after the stream “ended”. See the code below:

val subscription = Single.just("a")
.doAfterTerminate { println("Terminated!") }
.nonBusyWait()
println("Is subscribed? ${!subscription.isUnsubscribed}")

Will result in:

onSuccess a, in main
Terminated!
Is subscribed? true

Note how the “Terminated!” was called, therefore indicating the end of the stream. However, the subscription is telling it remains “subscribed”. This can cause problems if you depend on the result of the Subscription::isUnsubscribed().

A solution is to manually call SingleSubscriber::unsubscribe() inside your SingleSubscriber::onSucess() block.

Real world case: One bad Rx pattern is to check Subscription::isUnsubscribed() to determine if a computation completed so that you can repeat it. With the above unexpected behavior, some computations were being triggered only once because the subscription would never “end”.

3. The Scheduler Immediate is useless?

I had a use case where I wanted to start an Observable in the caller’s thread, do a computation in a new thread, then return to the caller’s thread. I then read the docs for the Scheduler Immediate, that reads:

Creates and returns a Scheduler that executes work immediately on the current thread.

At the time I thought “current thread” meant the thread creating the Observable. This is not the case, see this code:

Observable.just(Unit)
.printThread("a")
.observeOn(computation)
.printThread("b")
.observeOn(immediate)
.printThread("c")

Will result in:

a, in main
b, in RxComputationScheduler-1
c, in RxComputationScheduler-1
Subscriber: onNext kotlin.Unit, in RxComputationScheduler-1

It’s quite obvious now. The Scheduler Immediate will just keep doing work on the thread the code is already running (not the thread in which the Observable was created).

Real world case: I thought Scheduler Immediate could be used to return code to the caller’s thread. That was not the case, resulting in crashes because I was calling “main thread”-only methods.

Conclusions

RxJava is great. You can use it to compose operations and easily switch between schedulers. However, it can have a few “gotchas” that even experienced programmers may face.

My recommendation is to keep a small, test-bed project to try different things with RxJava. It can really boost your understanding of how RxJava works!

Part 2: https://medium.com/@AllanHasegawa/rxjava-gotchas-part-2-1f654e970703

--

--