The most popular RxJava misunderstanding
Hello, dear reader, my name is Dmitry Manko, and I’m an Android Developer, but before we start, I want to ask you…
Are you sure you understand RxJava correctly?
Let’s take a look at this example and answer the following question:
I recommend you checking out the correct answer I left towards the end of this article, as I couldn’t find the spoiler mechanism on Medium, and if you’ve made a mistake or are not quite sure why you chose the right answer — welcome to my article.
The idea of the article
I’ve heard a number of times the phrase “only the last subscribeOn()
works” or “It doesn’t matter where you place the subscribeOn()
method” and I’ve even seen many articles where the same thing is repeated.
These statements aren’t true, and have never been true.
Rx basics
Many of us have heard that observeOn()
method affects only the downstream, and subscribeOn()
method affects both the upstream and downstream, but what do they mean?
Upstream — the Rx operators that are above the operator in question.
Downstream — the Rx operators that are below the operator in question
What will be the upstream and downstream for a map()
operator in the example below?
Or if we consider the subscribeOn()
operator?
But how does subscribeOn()
affect the upstream and downstream at the same time? Why do people say that multiple subscribeOn()
won’t have an affect? Does Rx just skip subscribeOn()
operators except the last one?
I’m here to unravel the mystery.
Rx stages
Rome wasn’t built in a day, so we start with the simplest example.
What’s going on here? What happens until the subscribe method is called, and what happens after?
For convenience, I’d prefer to emphasise 3 main stages in any Rx chain:
- Build stage
- Subscription stage
- Observation stage
1. Build stage
To be able to deal with the data, we must first create Rx sources, so what is the build stage?
The build stage — is a combination of Rx operators until subscribe method is called.
In our easy example, we have only one Single.just()
method at the building stage. There’re no magic tricks in reactive programming, so this method simply creates a SingleJust class that looks pretty clear.
SingleJust’s constructor saves our variable, but what is that subscribeActual()
method that requires some kind of an observer?
We’ll figure out this in the next stage.
2. Subscription stage
Okay, we’ve created something, but how can we run our Rx chain?
Our plain subscribe()
method calls subscribeActual()
on the Rx sources(which we received at the build stage) and sends an observer there.
But what is the subscription stage? In fact, we’ve just covered this stage.
The subscription stage— is the Rx stage where the observer is sent to the Rx sources until we go through all of them.
Let’s raise the bar and add a map()
operator.
How have our stages changed?
As we remember, the first stage is a building.
Now we have two classes, the first is the already familiar SingleJust, and the second is the SingleMap, which we get when we call the map()
operator.
So, this is not rocket science, the SingleMap class just wraps the SingleJust class.
But how does our subscription stage look like now?
When we pass our observer to the subscribe()
method, this method passes this observer further to SingleMap using the subscribeActual()
method, then SingleMap’s subscribeActual()
simply creates a MapSingleObserver class that wraps our initial observer and passes it to SingleJust.
So our original observer was passed through all Rx sources(SingleMap and SingleJust).
Eventually, we have two twins, SingleMap, which wrapped SingleJust, and MapSingleObserver, which wrapped our original Observer.
3. Observation stage
The next stage is observation.
After we setup everything, we can start getting the data.
The SingleJust class contains our MapSingleObserver, so let’s pay attention to what SingleJust will do with this observer.
When SingleJust receives MapSingleObserver, it calls onSubscribe()
on this observer and then calls onSuccess(value)
with the value we passed during the build stage.
Let’s turn to the diagrams.
When our top level Rx source (SingleJust) begins to emit a value, observation begins.
The observation stage — is the stage where Rx sources emit data for observers.
SingleJust sends the value to the MapSingleObserver via the onSuccess()
method, then the MapSingleObserver does the value transformation and sends it on to our original observer.
How does observeOn work?
observeOn()
changes the execution thread during the observation stage.
How does subscribeOn work?
subscribeOn()
changes the execution thread during the subscription stage.
The answer
So now I can answer the riddle from the very beginning of this article.
The correct answer is choice 4, that’s, this chain will print 2 lines:
doOnSubscribe-RxComputationThreadPool-1
onSuccess-RxCachedThreadScheduler-1
The explanation
But why? Let’s diagram our Rx chain:
The initial thread that called our subscribe()
method is the main one. But who changes this thread and when?
The thread will be modified by the subscribeActual()
method of the SingleSubscribeOn class, precisely at the subscription stage.
Look at line 6 in the following example, we won’t go into this so as not to overload the article(you can check it out yourself), but the scheduleDirect()
method will send our new SubscribeOnObserver to an internal Executor and start executing in another thread.
You may notice that this subscribeActual()
method doesn’t have any subsequent subscribeActual()
call on the Rx source that was wrapped, yes, because this method will be called in SubscribeOnObserver in a different thread.
We have to modify our diagram with the threads in mind.
We see that the execution thread will be changed twice in the subscription stage, and actually, we wouldn’t notice any side effects when our observation stage starts sending data, if there were not some tricky moments…
What if some of our Rx operators were called between SingleSubscribeOn classes during the subscription stage?
And that’s exactly what happens, because SingleSubscribeOn calls the onSubscribe()
method for the observer that it wraps.
In our example, the SingleSubscribeOn, which will change the execution thread to IO(in line 6), will first call the doOnSubscribe()
logic(in line 4), which is in DoOnSubscribeSingleObserver.
But at this point we are still in the Computation thread!
Thus, we got the first next line:
doOnSubscribe-RxComputationThreadPool-1
But what happens next?
After the subscription stage, the observation stage begins, and we emit our data on the IO thread.
And our result processing will be called on the IO thread.
So, our second line will be:
onSuccess-RxCachedThreadScheduler-1
In total:
doOnSubscribe-RxComputationThreadPool-1
onSuccess-RxCachedThreadScheduler-1
Everything in the observation stage will be called on the IO thread, and yes, it seems that in our Rx chain only the last subscribeOn()
method affects the execution thread, but as you can see, this is not always the case.
Conclusion
Each subscribeOn() changes the execution thread, but only at the subscription stage, so if we have nothing to call at the subscription stage between subscribeOn() operators, then only the last subscribeOn() will affect the further execution.
I believe there’s no intentional misrepresentation of these concepts, just that some incorrect statements have been made for simplification purposes.
And remember, there’s nothing bad with not knowing something, it’s bad when you don’t want to know.
If you have any questions, I’m happy to answer them in the comments!
P.S. There are other operators that can be called at the subscription stage, not just doOnSubscribe()
, for example, you can take a look at the article in which the collect()
operator can affect your code: