What happens when we subscribe to a stream? A look inside RxJava
Using RxJava seems rather simple: we create a stream, apply some operators, and then subscribe. But as our streams get more and more complex and we start using more advanced operators, sometimes we end up with behavior we don’t quite understand. This is one of several posts I have planned, that aim to clarify some of the inner workings of RxJava.
The goal of this post is not to explain how RxJava works in fine detail, neither to teach how to write custom operators. The flow presented is a simplified model which ignores some advanced elements like operator fusion (optimizing specific pairs of operators) or thread safety (and threading in general). Also even though in this post I only talk about
Observables, other stream classes —
Maybe — use the same concept.
A typical example of RxJava usage would look something like this:
We can distinguish four key parts:
- An observable source — some function or factory method that provides us with an initial
- A number of chained operators which manipulate the stream values and its behavior
- The subscription*
- Events callbacks*
*It’s an important distinction, as we’ll see — the subscription happens immediately after calling
subscribe, while events callbacks can happen anytime after that. But we’ll get to that.
While this is a pretty simple example, there’s quite a lot going on behind the scenes.
Creating an observable
Observable.fromArray(1, 2, 3, 4, 5) creates and returns an
Observable instance, in this case
ObservableFromArray specifically. This is an
Observable subclass that, when subscribed to, will immediately emit given values one by one. Not much is happening here, but we’ll get back to this class later. The point is, this is one of many ways how we can acquire the initial
Observable instance, which will be the source of our stream. Also even though this particular observable will emit values immediately, most will emit at some later point in time (or never).
Then we have the operators. RxJava applies all of them in a very similar fashion, so let’s look at
take as examples:
We can see that the structure of both is similar. If we discard annotations with metadata and
onAssembly call (used by plugins to track where operators are applied), we’re left with the following template (in Kotlin, so we define an extension function instead of instance method):
This is how most of the operators calls will look like. We can see that:
- each operator will have a corresponding
ObservableFlatMapand so on;
- to create an instance of the operator’s observable, we need to pass the necessary parameters and the source observable (
- this new observable is returned from the operator function. This is because observables are immutable — applying an operator doesn’t change any already existing observable;
- operator function declares its return value as an
Observable, so the specific observable class of the operator we just applied is hidden;
- since all operators are declared on the
Observableclass, we’re free to apply subsequent operators right away.
The most important thing here is that each operator will wrap current observable in the newly created one, so we effectively end up with a nested tree of observables. Each observable has a parent source observable and some (or no) parameters. Here’s how we would create our observable chain if we were creating the objects manually:
Fortunately we don’t have to do that 😬. But this is the structure we get when applying operators. We can make couple of interesting observations here:
- in the end we only have reference to the observable returned by the last chained call —
takeoperator was the last in chain, so its observable is the outermost one in the final tree;
- each operator observable only has reference to its direct parent (source) observable;
ObservableFromArraydoesn’t receive a source — it doesn’t need one, since it is the source of the events.
So how do we subscribe to the entire chain given that we only have reference to the outermost observable? How does the data flow through the stream if every observable only contains reference to one parent observable?
subscribe method is the key point of all RxJava-based code. It establishes a subscription and allows actual flow of events to which we can react. We usually call
subscribe with handlers for events like
onError, or with an
Observer instance. But we should know that even if we pass only some handlers (or none), RxJava will still wrap them in a
LambdaObserver instance. This means that each operator’s
subscribe method always receives a single
Observer instance, regardless of what permutation of
onComplete consumers we've passed when subscribing initially.
Subscribing to a chain
As we already know, we’re always dealing with chains of observables. So how can we subscribe to just one of them? The answer is: we can’t. That is, even though we explicitly only subscribe to one observable (the last one in the chain), that observable will, in turn, subscribe to its parent. Its parent will subscribe to its parent, and so on, until the beginning of the stream is reached.
Upstream and downstream
This is good time to start naming things right. Before the subscription happens, all we have is a chain of immutable observables. There’s nowhere the events could come from, and nowhere they could go anyway — just
subscribe methods are executed, one by one, they each create a new subscription: one that links that observable’s source with an observer it received. This is how the actual stream is established — events flow from observer to observer, starting with the source observable, and ending at the observer we pass to the initial
So now, for every observable, upstream is where the events come from — parent observable’s subscription. Downstream is where the events should be delivered to, and it’s the observer received in
But it wouldn’t make much sense if the operators simply took values from upstream and passed them downstream. This is why in between, there’s observable’s custom logic, which acts as a middle-man between the upstream and downstream. Where does it come from?
We know that each subscription requires an observer, and that each observable will subscribe to its parent. So what observer will it use? Typically each observable will have its own custom
Observer subclass, which receives emissions from upstream, acts on them, and sends them downstream.
Let’s see how a very simplified
ObservableTake class would look like:
So much going on! Let’s break down what happens when we call
subscribe on the
subscribemethod, the observable will subscribe to its parent;
- the new subscription will use custom
TakeObserverthat wraps observer passed to
TakeObserverwill execute logic to fulfill
takeoperator’s purpose: every time upstream emits an event,
TakeObserverwill consider whether to pass it to the downstream or not, and whether the upstream subscription still makes sense or should be disposed.
onErrorReturn operator, which can be used to map errors to onNext events, will provide an
onError implementation that doesn’t signal
downstream.onError — instead it would call
map observer will have custom
onNext implementation, which calls
onNext but with value returned by the
map function. Some operators can also have two upstreams, like
merge— they would subscribe to both upstreams and combine their events accordingly.
This is how each observable installs custom logic in its place in the stream — it subscribes to the source observable using a custom observer, which receives events from upstream, modifies them and sends them downstream.
We saw that each observable uses custom observer to wrap the one it received in
subscribe call, but let’s see why this works.
Since we call
subscribe on the last (outermost) observable in the chain, it will be the first one to create a custom observer that wraps the one we pass. This new observer will then in turn be passed to its parent, which will wrap it in its own custom observer, and so on. This means that the last observable to wrap some observer is the first operator in the chain. In our example, the wrapped observers would look something like this:
MapObservable was the deepest one in the observables tree, but its
MapObserver is the outermost one. And this of course makes perfect sense — we want operators to be applied in the order we write them. Map was first in the chain, so it should be the first to apply its logic.
FromArrayObservable emits values, it passes them to its downstream — the
MapObserver, which will execute its logic, and in turn pass mapped values further to its downstream —
DoOnNextObserver. That one will perform an action on each
doOnNext call, and pass all events down to
TakeObserver, and so on. Each observer only receives values from its upstream, and only emits values directly to its downstream.
This is why we sometimes read that that in RxJava the subscription happens upstream, and emissions downstream. First we subscribe, from the bottom up, establishing a stream. Then that stream — connected observers — will be used to pass values from the source to our observer. Whenever an operator swallows an event, it simply disappears — but only downstream of the operator that swallows it.
Also notice that the source observable can hold onto the observer it was given. This way it can emit events at any point in time after it was subscribed to (and until it’s been disposed). But any time
observer.onXXX method is called by the source observable, that event travels through all observers in the stream — until it’s swallowed or emitted to the initial observer we used to subscribe.
At least for now — there’s still a lot more to uncover. But I hope this post was helpful in understanding the basics how RxJava’s streams work under the hood. While it doesn’t seem important when simply using the library in day-to-day development, it’s crucial to understand some of the more advanced topics: like how and why
observerOn work? How to distribute emitted values among several threads? Why stream with
flatMap sometimes emits on the “wrong” thread? How to avoid creating a subscription when we don’t need it? I’ll try to answer these and other questions in future posts 🙂