RxJava Ninja: In Depth Observables and Observers

Tompee Balauag
Familiar Android
Published in
6 min readJul 31, 2018
Photo by Jess Watters on Unsplash

This article is a part of the RxJava Ninja series. The complete list of the articles for this series can be found at the end of this article (and will be updated as more articles become available).

In the last article, we created our first observable and observer. Today we will talk a little more about them and investigate how they uphold the observable contract.

Observable, a closer look

As we learned from the previous article, the observable communicates with the observer by means of notifications. The observer defines 4 notifications the observable can use to signal and event or emit an item. For the benefit of those who have not seen the previous article, below is the definition of the Observer<T> interface.

Before we investigate the above notifications, let me introduce you to a very powerful observable factory method, Observable.create.

The Observable.create method allows you to create your own custom observable. It accepts an instance of ObservableOnSubscribe<T> which is a single abstract method (SAM) defined as follows.

The subscribe method requires an instance of the ObservableEmitter<T> class. This ObservableEmitter is an abstraction of the Observer class with added methods since it also serves as the source of items. The moment the observer subscribes to your observable, it will invoke this subscribe method and your custom emitter code will run. Let’s see it in action. Consider this example.

Note: It is highly recommended to target Java 8. This is because lambda expressions can greatly reduce boilerplate code. In the succeeding examples, SAM instances will be expressed in lambda instead of anonymous classes.

This code creates an observable of type String that when subscribed upon will emit three strings and an onComplete. As I said, this is a very powerful factory method as it allows you to create an observable from virtually anything you can think of. Running this will not do anything, for reasons discussed in the previous article. Attaching an observer that logs each method will output this.

onSubscribe
onNext: First item
onNext: Second item
onNext: Third item
onComplete

The exact order of our items are preserved, because the observable contract enforces so.

Now that we know how to create an observable, let’s validate some of the behaviors defined in the observable contract. For the succeeding examples, assume that all observables are subscribed upon.

We already proved sequence preservation, next on our list is an observable that does nothing.

This will output

onSubscribe

No crashes, compile errors or anything. Therefore it is perfectly valid for an observable to emit nothing.

Next, we will investigate onError. The onError method requires a Throwable parameter that is annotated as NonNull. Therefore, the observable must always provide the cause of the error.

The output of this code is as follows.

onSubscribe
onError

Therefore, when onError is sent, any succeeding calls to onNext or onComplete does nothing.

Last thing that we will check is onComplete. For the rest of the assumptions, I invite you to validate them on your own as it will be a good exercise for you.

This will actually result to a runtime error. This is compliant because when an onCompleted or onError is already sent, the observable should not send any more notifications. It is more lenient with onNext, but it will throw runtime exception on onError.

Now that we proved a great deal about observables, let’s turn our attention to observers.

Observer, a closer look

Let’s see what happens the moment an observer subscribe to an observable. As we have seen before, the ObservableSource<T> has a subscribe method that looks like this.

When subscribing to an observable, you need to pass an instance of your observer. Let’s see what happens in the context of Observable.create.

When you use the Observable.create method, you are actually instantiating a ObservableCreate<T> class, a subtype of Observable<T>. Observable<T> has an abstract method called subscribeActual that all concrete subtypes should implement. This method is what is being called when the subscription actually happens. But before it calls the subscribeActual methods, all registered subscribe hooks will be invoked first. This phase is Subscription Time. The subscribeActual method of ObservableCreate<T> is defined as follows.

It will create an emitter instance using the passed observer (remember, emitters are abstractions of observer) and calls the onSubscribe. This is why onSubscribe is always printed out at the beginning because it is the first method to be invoked. Only then the subscribe method of the ObservableOnSubscribe<T> is called where everything happens. This is Runtime.

If you notice, we keep on taking the Disposable argument of the onSubscribe for granted. But today, we will no longer do that.

The Disposable interface is defined as such.

If you are to take a guess, what do you think it does? Well, it disposes something, that I am sure. But dispose what? Let’s check again at the context of Observable.create.

In the subscribeActual, we can see that the disposable instance passed is the instance of the CreateEmitter<T> class. Let’s see how it is defined.

Sounds good. Nothing special. Your observer will receive an instance of this but in the form of a Disposable. What happens when we dispose this? According to the source, it sets some flag to disposed but does so in an atomic fashion to prevent race conditions. So what if a flag is set to disposed? How does that affect my observable? The answer lies in the onNext implementation of the CreateEmitter<T> class.

When the emitter is disposed, it will skip emitting succeeding items. We get it now. This is the unsubscribe mechanism. This disposable is actually called the subscription. You can unsubscribe from an observable by disposing the subscription.

We just witnessed how observer subscription and unsubscription works. Note that we observed it in the context of the Observable.create method but the principle is the same for all other types of Observable. If you have doubts, I invite you to check the source code of other factory methods.

Reducing Observer boilerplate code

In case you haven’t noticed, an Observer instance requires you to implement 4 abstract methods. This involves a lot of boilerplate code. Fortunately, Observable<T> provides a lot of subscribe overloads depending on your preference.

The above code shows a variant of the subscribe method accepting an instance of the Consumer<T> class. This single consumer input defines the behavior when onNext is called. This is useful for cases wherein you are sure that your observable will not output an error and you can safely ignore the onComplete event. Be wary though that when an error occurred and no onError handler is specified, you will encounter a runtime exception. So in production, it is always advisable to implement onError.

On variants wherein Observer<T> is not required, the Disposable object will be returned instead.

With lambda expressions, method references and chaining, boilerplate code can be further reduced. An example can be seen below.

Bonus: Execution context

We will just briefly tackle execution context for now as it will be discussed in detail in the future. Consider this example.

This method will log the different execution context in different phases. This will output

Assembly execution context: main
Subscription execution context: main
Runtime execution context: main

We can conclude therefore that by default, subscription is synchronous and runs on the same thread as the Assembly. Let’s see if this conclusion holds true without exceptions in the future articles.

Check out the other articles in this series.

  1. Introduction to Reactive Programming
  2. Building your first Observables and Observers
  3. In Depth Observables and Observers
  4. Marble Diagrams and Operators
  5. Observable Factories Part 1
  6. Observable Factories Part 2
  7. Single, Maybe and Completable
  8. Hot and Cold Observables
  9. Filtering Operators Part 1
  10. Filtering Operators Part 2

--

--