The RxJava Threading Mistake You Are Probably Making

Brian Yencho
Livefront
Published in
6 min readDec 1, 2021

TL;DR: If you think you completely understand threading in RxJava, take a closer look at the behavior of Processor classes…

In addition to having a steep learning curve, RxJava has enough secrets to continue to confound developers long after they believe they have mastered it. What follows is one such curiosity related to threading.

The Setup

Consider the following RxJava call chain:

where waitForData and makeNetworkRequestWithData are both functions returning the reactive type Single<T>.

Now assume the following:

  • The project containing this call chain configures network requests (like makeNetworkRequestWithData) so that they run synchronously by default but may be placed on a background thread using subscribeOn. For years this was the default behavior of the RxJava adapters for Retrofit, the most commonly used networking library for Android.
  • In this project, all functions returning RxJava types are intentionally configured in a similar way: by default, calls run synchronously and alternative threading may similarly be supplied by callers using the subscribeOn operator.
  • Background threading is intended to be supplied by all final callers (the ones ultimately calling subscribe(...)) by adding a subscribeOn call near the end of the chain.

Given these assumptions, the intention of the above code is to place all code above the subscribeOn(Schedulers.io()) call on an I/O bound thread and all code below the observeOn(AndroidSchedulers.mainThread()) on the main thread of the app process. It is worth noting here that while this is not necessarily a universal setup, it has been a very common one.

The Problem

This brings us to the point of interest in the setup described above: what Scheduler is actually used when making the network request in the example above?

The answer would seem to be clear: as intended, this should be determined by the value that has been supplied to the only subscribeOn call in the chain, Schedulers.io().

In many cases that would be true. There are, however, two key exceptions here and they both depend on the details of the first call in this chain, waitForData:

The Violation

First, consider a broken assumption. Perhaps this call does not conform to the patterns of the project and it has intentionally applied its own Scheduler such that it runs on and emits on a thread of its choosing. For example, perhaps the call is constructed as:

As far as this call is concerned, this subscribeOn(Schedulers.computation()) is all that matters and calculateData will be run on a thread determined by Schedulers.computation(). The emission of the result then happens on that same thread and it forces our network call in question, makeNetworkRequestWithData, to run on this thread as well (and not one based on Schedulers.io() as the code intends).

The difference here is subtle and will result in a thread meant for heavy computations simply sitting idle while waiting for a network response. While this issue might go unnoticed, it could still be “fixed” by essentially considering this setup a mistake, a violation of the app’s threading patterns. The subscribeOn call here could be removed to bring it into conformity with the rest of the app and the remaining subscribeOn call in the main chain will work as expected.

The next exception, however, is not so easily addressed.

The Surprise

What if the call to waitForData was not based on a calculation or a network request, but some manual emission from something like a PublishProcessor? Consider the following:

This code returns a Single<Data> that emits as soon as a value is pushed to the dataProcessor. Note that there are no calls to subscribeOn or observeOn, so this function appears to satisfy the project’s assumptions and should therefore obey the subscribeOn(Schedulers.io()) call of the main chain and ultimately emit on an I/O bound thread.

There is just one problem: a PublishProcessor (or any other kind of Processor) completely ignores all calls to subscribeOn. In fact, from the perspective of a downstream caller there is no good way at all to control the thread a PublishProcessor emits on. This results in all code downstream of a PublishProcessor running on the thread that it emits on until the downstream threading is manually changed with something like a call to observeOn.

So what thread does a PublishProcessor actually emit on? The answer is simple: on whatever thread values are pushed to it and therefore whatever thread processor.onNext(...) is called on. If you’re looking carefully, this is spelled out in the documentation:

Scheduler:
PublishProcessor does not operate by default on a particular Scheduler and the Subscribers get notified on the thread the respective onXXX methods were invoked.

This could be a background thread or even the main thread (which can easily happen if the data you are waiting on depends on an OS-level event). In the latter case, the call chain we’ve been discussing would result in a NetworkOnMainThreadException in what would seem like an impossible error. Once this fact is fully absorbed it can seem obvious, but it at first seems to violate our intuition for how operators like subscribeOn are supposed to work.

The Resolution

Unfortunately, there is no perfect solution to the above problem with the given app constraints. For example, to prevent receiving a NetworkOnMainThreadException, the final subscribeOn call (which is being ignored) could be replaced with the careful use of observeOn or subscribeOn elsewhere. For example, observeOn could be used immediately after the waitForData call:

Or subscribeOn could be applied directly to the network request:

Either way, though, we must break our initial assumptions. Special cases could be made when working with Processor classes, but that is not a very satisfying approach.

The root issue here is assuming a single subscribeOn call at the end of the call chain was ever the appropriate way to supply threading for the whole call chain. Not only does the behavior of Processor classes make this impossible to use consistently, but it also ignores the fact that different parts of a call chain might be best to run using different kinds of Scheduler classes and that each function knows best which Scheduler it requires.

It would seem, then, that a safer, more consistent pattern is to allow each function returning an RxJava type to apply its own threading and only expect the final consumer to be concerned with changing the downstream threading. Many RxJava users always knew this, but the difference could feel like a matter of choice or opinion. The behavior of Processor classes, however, makes the answer clear. In fact, this is exactly what Retrofit now does with its RxJava 3 adapter: by default, all network requests now run on a background thread unless specified otherwise.

The Future

It is likely that many RxJava projects have encountered unexpected behavior due to this subtle fact about Processor classes. The confusion here really stems in large part from the freedom with how the subscribeOn and observeOn operators can be used and how they actually work: they can be placed anywhere in a call chain, or not at all; rather than one strict pattern for their usage, several appear to be allowed; subscribeOn applies threading to all upstream operators, except when it doesn’t.

While freedom can be very powerful, sometimes in programming opinionated frameworks and forced behavior can ultimately lead to less mistakes and more coherent code. Fortunately in this regard, the Android community is moving away from RxJava and toward Kotlin Coroutines. There are two key differences that avoid surprise threading issues like the one we have been discussing in RxJava:

  • Consumers of suspending functions or Flow instances are required to supply a CoroutineScope in order make the calls and receive results. Among other things, these scopes determine the Dispatcher used to define the threading behavior for any code downstream of the call in question and would be the equivalent of enforcing that observeOn is always called at the beginning of an RxJava call chain after the initial call.
  • There is a very clear pattern in the coroutines community (and pushed by the developers of coroutines themselves) that all suspending functions and Flow instances should apply their own threading and should therefore be safe to be called from any other coroutine.

I go into detail on each of these points in a previous article on coroutine threading behavior.

Much like Kotlin has far overtaken Java, coroutines are undoubtedly the future for handling asynchronous work in Android. We must acknowledge, though, that as there are still many legacy projects that use Java, there will be many projects that continue to use RxJava. For these, it is important to be aware of the subtle threading issues discussed in this article and to also be mindful of adopting patterns that better match the threading behavior of Android’s future.

Brian works at Livefront, where threading is half the battle.

--

--