What happens when we subscribe to a stream? A look inside RxJava

Using RxJava seems rather simple: we create a stream, apply some operators, and then subscribe. But as our streams get more and more complex and we start using more advanced operators, sometimes we end up with behavior we don’t quite understand. This is one of several posts I have planned, that aim to clarify some of the inner workings of RxJava.

The goal of this post is not to explain how RxJava works in fine detail, neither to teach how to write custom operators. The flow presented is a simplified model which ignores some advanced elements like operator fusion (optimizing specific pairs of operators) or thread safety (and threading in general). Also even though in this post I only talk about Observables, other stream classes — Completable, Single and Maybe — use the same concept.

RxJava chains

A typical example of RxJava usage would look something like this:

We can distinguish four key parts:

  1. An observable source — some function or factory method that provides us with an initial Observable instance
  2. A number of chained operators which manipulate the stream values and its behavior
  3. The subscription*
  4. Events callbacks*

*It’s an important distinction, as we’ll see — the subscription happens immediately after calling subscribe, while events callbacks can happen anytime after that. But we’ll get to that.

While this is a pretty simple example, there’s quite a lot going on behind the scenes.

Creating an observable

First, Observable.fromArray(1, 2, 3, 4, 5) creates and returns an Observable instance, in this case ObservableFromArray specifically. This is an Observable subclass that, when subscribed to, will immediately emit given values one by one. Not much is happening here, but we’ll get back to this class later. The point is, this is one of many ways how we can acquire the initial Observable instance, which will be the source of our stream. Also even though this particular observable will emit values immediately, most will emit at some later point in time (or never).

Applying operators

Then we have the operators. RxJava applies all of them in a very similar fashion, so let’s look at map and take as examples:

We can see that the structure of both is similar. If we discard annotations with metadata and onAssembly call (used by plugins to track where operators are applied), we’re left with the following template (in Kotlin, so we define an extension function instead of instance method):

This is how most of the operators calls will look like. We can see that:

  • each operator will have a corresponding Observable subclass, e.g. ObservableTake, ObservableMap, ObservableFlatMap and so on;
  • to create an instance of the operator’s observable, we need to pass the necessary parameters and the source observable (this);
  • this new observable is returned from the operator function. This is because observables are immutable — applying an operator doesn’t change any already existing observable;
  • operator function declares its return value as an Observable, so the specific observable class of the operator we just applied is hidden;
  • since all operators are declared on the Observable class, we’re free to apply subsequent operators right away.

The most important thing here is that each operator will wrap current observable in the newly created one, so we effectively end up with a nested tree of observables. Each observable has a parent source observable and some (or no) parameters. Here’s how we would create our observable chain if we were creating the objects manually:

Fortunately we don’t have to do that 😬. But this is the structure we get when applying operators. We can make couple of interesting observations here:

  • in the end we only have reference to the observable returned by the last chained call — take operator was the last in chain, so its observable is the outermost one in the final tree;
  • each operator observable only has reference to its direct parent (source) observable;
  • ObservableFromArray doesn’t receive a source — it doesn’t need one, since it is the source of the events.

So how do we subscribe to the entire chain given that we only have reference to the outermost observable? How does the data flow through the stream if every observable only contains reference to one parent observable?

Subscription

Calling subscribe method is the key point of all RxJava-based code. It establishes a subscription and allows actual flow of events to which we can react. We usually call subscribe with handlers for events like onNext or onError, or with an Observer instance. But we should know that even if we pass only some handlers (or none), RxJava will still wrap them in a LambdaObserver instance. This means that each operator’s subscribe method always receives a single Observer instance, regardless of what permutation of onNext, onError and onComplete consumers we've passed when subscribing initially.

As we already know, we’re always dealing with chains of observables. So how can we subscribe to just one of them? The answer is: we can’t. That is, even though we explicitly only subscribe to one observable (the last one in the chain), that observable will, in turn, subscribe to its parent. Its parent will subscribe to its parent, and so on, until the beginning of the stream is reached.

This is good time to start naming things right. Before the subscription happens, all we have is a chain of immutable observables. There’s nowhere the events could come from, and nowhere they could go anyway — just Observables.

But as subscribe methods are executed, one by one, they each create a new subscription: one that links that observable’s source with an observer it received. This is how the actual stream is established — events flow from observer to observer, starting with the source observable, and ending at the observer we pass to the initial subscribe method.

So now, for every observable, upstream is where the events come from — parent observable’s subscription. Downstream is where the events should be delivered to, and it’s the observer received in subscribe.

But it wouldn’t make much sense if the operators simply took values from upstream and passed them downstream. This is why in between, there’s observable’s custom logic, which acts as a middle-man between the upstream and downstream. Where does it come from?

We know that each subscription requires an observer, and that each observable will subscribe to its parent. So what observer will it use? Typically each observable will have its own custom Observer subclass, which receives emissions from upstream, acts on them, and sends them downstream.

Let’s see how a very simplified ObservableTake class would look like:

So much going on! Let’s break down what happens when we call subscribe on the ObservableTake:

  • inside subscribe method, the observable will subscribe to its parent;
  • the new subscription will use custom TakeObserver that wraps observer passed to subscribe method;
  • TakeObserver will execute logic to fulfill take operator’s purpose: every time upstream emits an event, TakeObserver will consider whether to pass it to the downstream or not, and whether the upstream subscription still makes sense or should be disposed.

Similarly, onErrorReturn operator, which can be used to map errors to onNext events, will provide an onError implementation that doesn’t signal downstream.onError — instead it would call downstream.onNext(value). map observer will have custom onNext implementation, which calls onNext but with value returned by the map function. Some operators can also have two upstreams, like withLatestFrom or merge— they would subscribe to both upstreams and combine their events accordingly.

This is how each observable installs custom logic in its place in the stream — it subscribes to the source observable using a custom observer, which receives events from upstream, modifies them and sends them downstream.

We saw that each observable uses custom observer to wrap the one it received in subscribe call, but let’s see why this works.

Since we call subscribe on the last (outermost) observable in the chain, it will be the first one to create a custom observer that wraps the one we pass. This new observer will then in turn be passed to its parent, which will wrap it in its own custom observer, and so on. This means that the last observable to wrap some observer is the first operator in the chain. In our example, the wrapped observers would look something like this:

MapObservable was the deepest one in the observables tree, but its MapObserver is the outermost one. And this of course makes perfect sense — we want operators to be applied in the order we write them. Map was first in the chain, so it should be the first to apply its logic.

Now when FromArrayObservable emits values, it passes them to its downstream — the MapObserver, which will execute its logic, and in turn pass mapped values further to its downstream — DoOnNextObserver. That one will perform an action on each doOnNext call, and pass all events down to TakeObserver, and so on. Each observer only receives values from its upstream, and only emits values directly to its downstream.

This is why we sometimes read that that in RxJava the subscription happens upstream, and emissions downstream. First we subscribe, from the bottom up, establishing a stream. Then that stream — connected observers — will be used to pass values from the source to our observer. Whenever an operator swallows an event, it simply disappears — but only downstream of the operator that swallows it.

Also notice that the source observable can hold onto the observer it was given. This way it can emit events at any point in time after it was subscribed to (and until it’s been disposed). But any time observer.onXXX method is called by the source observable, that event travels through all observers in the stream — until it’s swallowed or emitted to the initial observer we used to subscribe.

That’s it

At least for now — there’s still a lot more to uncover. But I hope this post was helpful in understanding the basics how RxJava’s streams work under the hood. While it doesn’t seem important when simply using the library in day-to-day development, it’s crucial to understand some of the more advanced topics: like how and why subscribeOn and observerOn work? How to distribute emitted values among several threads? Why stream with flatMap sometimes emits on the “wrong” thread? How to avoid creating a subscription when we don’t need it? I’ll try to answer these and other questions in future posts 🙂

Written by

android developer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store