RxJava Gotchas–Part 3

Allan Yoshio Hasegawa
4 min readSep 4, 2017

--

It’s a lot easier to handle async code with RxJava, specially on Android where we want expensive computations out of the main thread. However, RxJava does have some “gotchas” that may introduce bugs to your code. Here’s a small list:

Ps.: You can find Part 1 and the auxiliary code in this link. Part 2 is in this link.

7. One Observable or Many?

One cool part of RxJava is that we can compose different operations to transform our data, in order. But, when we have a chain of multiples operations, are we talking about a single Observable, or many different ones? Spoilers: Many different ones.

Here’s an example:

Observable.interval(10, TimeUnit.MILLISECONDS)
.doOnUnsubscribe { printThreadName("Top Unsub") }
.doOnCompleted { printThreadName("This will never be called") }
.take(2)
.doOnUnsubscribe { printThreadName("Mid Unsub") }
.doOnCompleted { printThreadName("Neither this one") }
.take(1)
.doOnUnsubscribe { printThreadName("Bottom Unsub") }
.doOnCompleted { printThreadName("But this one will ^^") }
.nonBusyWait { }

And the result:

Subscriber: onNext 0, in RxComputationScheduler-1
But this one will ^^, in RxComputationScheduler-1
Sub isUnsubscribed: true
Bottom Unsub, in RxComputationScheduler-1
Mid Unsub, in RxComputationScheduler-1
Top Unsub, in RxComputationScheduler-1

If we were to think of this expression as a single Observable, then the top doOnCompleted would be called. But, that’s not the case. Only the bottom most Observable completes. All the previous ones get unsubscribed instead.

8. Reactive pull backpressure

Let’s start with an example:

val now = { System.currentTimeMillis() } // This is a lambda
val start = System.currentTimeMillis()
val printDiff = { (letter, time): Pair<String, Long> ->
printThreadName("$letter at ${time - start}")
}
val src = (0..5).map { { "${it}i" to now() } } // A list of lambdas
Observable.from(src)
.map { it() } // Lazily initialize the Pair
.concatMap { letter ->
//.flatMap { letter ->
Observable.timer(100, TimeUnit.MILLISECONDS)
.map { letter }
}
.doOnNext { printDiff(it) }
.nonBusyWait()

Here’s the output of this code:

0i at 82, in RxComputationScheduler-1
Subscriber: onNext (0i, 1504552349852), in RxComputationScheduler-1
1i at 88, in RxComputationScheduler-2
Subscriber: onNext (1i, 1504552349858), in RxComputationScheduler-2
2i at 88, in RxComputationScheduler-3
Subscriber: onNext (2i, 1504552349858), in RxComputationScheduler-3
3i at 189, in RxComputationScheduler-4
Subscriber: onNext (3i, 1504552349959), in RxComputationScheduler-4
4i at 291, in RxComputationScheduler-5
Subscriber: onNext (4i, 1504552350061), in RxComputationScheduler-5
5i at 392, in RxComputationScheduler-6
Subscriber: onNext (5i, 1504552350162), in RxComputationScheduler-6
Sub isUnsubscribed: true

Note how the fourth pair is being initialized 100ms after the first one. This means that not all Pairs are being initialized when the stream is subscribed to. These pairs are being initialized only when needed.

Now, can you tell what would be the result if we were to use flatMap instead of the concatMap? Here, take a look:

0i at 60, in RxComputationScheduler-1
Subscriber: onNext (0i, 1504553337592), in RxComputationScheduler-1
1i at 68, in RxComputationScheduler-4
Subscriber: onNext (1i, 1504553337600), in RxComputationScheduler-4
2i at 68, in RxComputationScheduler-4
Subscriber: onNext (2i, 1504553337600), in RxComputationScheduler-4
3i at 68, in RxComputationScheduler-4
Subscriber: onNext (3i, 1504553337600), in RxComputationScheduler-4
4i at 68, in RxComputationScheduler-4
Subscriber: onNext (4i, 1504553337600), in RxComputationScheduler-4
5i at 68, in RxComputationScheduler-4
Subscriber: onNext (5i, 1504553337600), in RxComputationScheduler-4
Sub isUnsubscribed: true

All the functions in the src variable are evaluated immediately, and, nearly, at the same time.

This is something we should be aware of. What is happening is that some operators will only send items downstreams when needed. This is called “reactive pull backpressure”.

This is important. Understanding how it works can help us create more performant streams with less backpressure problems.

9. Subscriptions may leak memory

Always unsubscribe your subscriptions when they are not needed, otherwise you run the risk of memory leaks. Check the following code:

class Foo {
val a = "Leaked resource"
var subscription: Subscription? = null
init {
// What happens if I remove the "subscribeOn" from
// this observable?
// Spoiler: No leaks ^^, even though the subscription never ends
subscription = Observable.never<Int>().subscribeOn(computation)
.subscribe { print("hello $a") }
}
}
var foo: Foo? = Foo()
val fooWeakRef = WeakReference(foo)
foo = null
System.gc()
// "foo" is still in memory, even though our code has no
// reference to it
assertThat(fooWeakRef.get() == null, equalTo(false))
// Only after removing the initial subscription that foo is freed
fooWeakRef.get()!!.subscription!!.unsubscribe()
System.gc()
assertThat(fooWeakRef.get() == null, equalTo(true))

The problem is in the subscribe { print("hello $a") } part. This “lambda” is capturing the variable a. What is not immediately obvious is that the instance of Foo itself is also being captured. This means that not only a is being leaked, foo is also being leaked.

On Android you can get in even bigger problems by capturing a View inside the subscribe lambda. This’ll leak the view and the Activity. Therefore, always unsubscribe from your subscriptions!

The end

Throughout my years working with RxJava, I compiled a list of non-intuitive problems I had with it. In this series I tried to showcase the problems that are easy to explain, but at the same time can be quite bad for your system.

All of the problems listed introduced bugs into my system, and, hopefully, this series can help you prevent these issues in yours 👍

--

--