RxJava 2 Disposable — Under the hood

Mar 18, 2017 · 5 min read

Everyone has code like the following:

private CompositeDisposable compositeDisposable =
new CompositeDisposable();
@Override public void onCreate() {
compositeDisposable.add(backendApi.loadUser()
.subscribe(this::displayUser, this::handleError));
}
@Override public void onDestroy() {
compositeDisposable.clear();
}

A backend request is done and the Disposable returned from the subscribe() method is added to the list of CompositeDisposables. The moment the Activity will be destroyed, the list of Disposables gets cleared and we’re good.

What is this Disposable?

Disposable is an interface containing two methods.

public interface Disposable {
void dispose();
boolean isDisposed();
}

But how does this disposing mechanism work? To demonstrate what I mean, let’s look at this example:

TestScheduler scheduler = new TestScheduler();
TestObserver<Long> o = Observable.interval(1, SECONDS, scheduler)
.test();
o.assertNoValues();scheduler.advanceTimeBy(1, SECONDS);
o.assertValues(0L);
scheduler.advanceTimeBy(1, SECONDS);
o.assertValues(0L, 1L);
o.dispose(); // Dispose the connection.scheduler.advanceTimeBy(100, SECONDS);
o.assertValues(0L, 1L);

In this case Observable.interval() will emit a count every second. For faking the time, TestScheduler is used. Every time the Scheduler advances it’s internal time, the Observer will receive a new value. The moment the connection between the Observable and the Observer gets disposed via the Disposable.dispose() method the Observer won’t get any emission from the Observable.

In a nutshell, when the Disposable (which is implemented by the TestObserver) gets disposed, the Observer (also TestObserver) will no longer receive values from the Observable.

Let’s dive in.

The test() method is just sugar. Internally it creates a TestObserver, uses Observable.subscribe(Observer) and returns the created TestObserver. The same is true if you were to use lambdas for subscribing. It creates a LambdaObserver, which also implements both Observer and Disposable.

So what is the Observable.subscribe(Observer) method doing?

Simplified it just calls subscribeActual(Observer), which is an abstract method of the Observable class.

In RxJava 2 the way operators are implemented differs from RxJava 1. Now, all of the Observable operators extend from Observable and override the subscribeActual(Observer) method.

For instance the interval mechanism is implemented in ObservableInterval which just extends Observable<Long>. Also note that there is only one method that you can override from Observable, subscribeActual(Observer) which is also where all the magic happens.

Observer

Observer is just an interface which has 4 methods. 3 of which are semantically the same from RxJava 1.

- onNext(T) — notifies the observer with the item that can be observed
- onError(Throwable) — notifies the observer the error
- onComplete() — notifies the observer that there are no more items sent

Then there’s also onSubscribe(Disposable).

onSubscribe gets the Disposable as a parameter which can be used for disposing the connection between the Observable and the Observer itself as well as checking whether we’re already disposed or not.

Example of disposing

Let’s go and look at one of those operators that is using the disposing mechanism internally — takeUntil().

Observable.just(1, 2, 3)
.takeUntil(integer -> integer < 3)
.test()
.assertResult(1, 2);

The takeUntil operator is implemented in ObservableTakeUntilPredicate and hence the code above can be rewritten to.

new ObservableTakeUntilPredicate(Observable.just(1, 2, 3),
integer -> integer < 3)
.test()
.assertResult(1, 2);

The ObservableTakeUntilPredicate takes a source Observable (our 1, 2, 3 Observable) and a predicate that should be applied upon it and will give you a new Observable.

Note that in the public API of the takeUntil Implementation, ObservableTakeUntilPredicate, is never leaked and you will always get the abstract type back, Observable. Also all operators operate not on Observable itself but rather on ObservableSource, which is just an interface containing one method, the void subscribe(Observer) method, which is implemented by Observable and delegates down to subscribeActual(Observer).

Theoretically speaking you could write your own Observable implementation by implementing ObservableSource, while still using the existing operators.

ObservableTakeUntilPredicate

ObservableTakeUntilPredicate extends Observable and overrides subscribeActual(Observer). In there it subscribes to the source / parent Observable (our 1, 2, 3 Observable), traveling upstream, using the subscribe method from ObservableSource.

This little diagram might help to picture how things are calling each other (read from bottom to top):

ObservableFromArray.subscribeActual() // Send 1, 2, 3 down

ObservableFromArray.subscribe(TakeUntilPredicateObserver)

ObservableTakeUntilPredicate.subscribeActual()

ObservableTakeUntilPredicate.subscribe() // test() invokes this

Observable.just(1, 2, 3) is just sugar for Observable.fromArray() which explains why the implementation is in ObservableFromArray. The ObservableFromArray implementation of subscribeActual(Observer) calls onSubscribe(Disposable) first.

The disposable in this case is just a class that implements the Disposable interface and keeps track of the state using a boolean. This is a stripped down version of it:

class MyDisposableClass implements Disposable {
volatile boolean disposed;
@Override public void dispose() {
disposed = true;
}
@Override public boolean isDisposed() {
return disposed;
}
}

After that as long as it’s not disposed it’ll generate the sequence of items:

observer.onNext(1);
observer.onNext(2);
observer.onNext(3);
observer.onComplete();

Back to the takeUntil implementation

It’ll subscribe to the source Observable (our 1, 2, 3 Observable) with it’s own Observer, TakeUntilPredicateObserver that implements the Disposable and the Observer interface.

The implemented Disposable interface methods just forward to a member Disposable variable that is taken from the onSubscribe parameter.

@Override public void dispose() {
s.dispose();
}
@Override public boolean isDisposed() {
return s.isDisposed();
}

onSubscribe(Disposable) is implemented like this:

@Override public void onSubscribe(Disposable disposable) {
if (DisposableHelper.validate(this.disposable, disposable)) {
this.disposable = disposable;
actual.onSubscribe(this);
}
}

DisposableHelper.validate(Disposable next, Disposable current) — validates that the next parameter is null and the current one isn’t. If that’s not the case, it’ll delegate errors to the RxJavaPlugins. It’s just a safe mechanism for taking over the disposable from the upstream.

If it’s valid we set the disposable to our member Disposable and propagate the onSubscribe downstream to our subscribeActual Observer (in our case TestObserver).

Simplified, calls to onError and onComplete are just forwarded downstream to the subscribeActual Observer (in our case TestObserver).

@Override public void onError(Throwable t) {
actual.onError(t);
}
@Override public void onComplete() {
actual.onComplete();
}

onNext also forwards the value downstream to the subscribeActual Observer (in our case TestObserver). After that it takes the value and executes the predicate. If it’s not a match, nothing else needs to be done.

If it is a match, no more items should be emitted. All that needs to be done is calling dispose() on our member Disposable, which will then tell ObservableFromArray to not produce any more items and send them downstream to us. To notify the downstream subscribeActual Observer (in our case TestObserver) that we’re done, onComplete is called.

Simplified the implementation looks like this:

@Override public void onNext(T t) {
actual.onNext(t);
if (predicate.test(t)) {
s.dispose();
actual.onComplete();
}
}

Conclusion

This is just one example on how many operators work and the Disposable state is propagated.

This Disposable mechanism also applies to the other reactive Types like Flowable, Single, Completable and Maybe.

I hope you got a better vision of how things travel up and down and especially how RxJava 2 operates with Disposables under the hood.

NOTE: In the Code samples I left a few things out to keep things simple and focus on the Disposable parts.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade