RxJava Ninja: Building your first Observables and Observers

Tompee Balauag
Familiar Android
Published in
5 min readJul 30, 2018
Photo by Chris Lawton on Unsplash

This article is a part of the RxJava Ninja series. The complete list of the articles for this series can be found at the end of this article (and will be updated as more articles become available).

This article will discuss the idea behind the basic building blocks of RxJava2, the observers and observables. Operators are also a fundamental component but we will reserve this topic for the future.

Observables and Observers

In the simplest sense, an observable is something that emits. An excellent example would be your home wireless router as it emits radio signals.

An observer on the other hand, consumes the items emitted by the observables. An example of this would be your smart phones as it consumes the wireless signals generated by your wireless router.

In reactive extensions, these two entities have a very specific relationship. According to the docs.

An observer subscribes to an observable while an observable emits items or sends notifications to its observers by calling the observer’s methods.

The keywords here are subscribe and emit. How exactly does an observable emit or an observer subscribe? The specifics are described in the Observable Contract.

Observable Contract

According to the observable contract, an observable communicates with its observer (and vice versa) by means of Notifications.

  1. OnNext — an item is emitted
  2. OnCompleted — there are no more items to emit
  3. OnError — an error was encountered and can no longer emit items
  4. OnSubscribe — the observer has successfully subscribed to the observable (optional)

The observable contract has these important behaviors defined for the above notifications.

  • An observable may make zero or more OnNext notifications
  • An observable may make an OnCompleted or OnError notification but not both
  • An OnError notification must contain the cause of error
  • Once an OnCompleted or OnError notification is sent, an observable should not send any more notifications
  • Items are emitted one after the other preserving order
  • Terminating the observable requires it to send an OnError or OnCompleted notification.

The observer on the other hand communicates with the following notifications.

  1. subscribe — an observer is ready to consume items
  2. unsubscribe — an observer wants to terminate the subscription
  3. request — a rate control mechanism (optional)

The following behaviors are defined for the above notifications.

  • An observer may consider an observable active up until an OnError or OnCompleted is received.
  • Once an observable emits an OnError or OnCompleted, the observer should not send any more notifications to that same observable.
  • When an OnError or OnCompleted is received, the observer is not required to unsubscribe explicitly.

The complete specification of the observable contract can be found here for your reference.

Let me see some code

The wall of text above is boring, I agree, but trust me, it is very important for you to know them. The observable contract is an specification and all implementation across all languages and frameworks follow it.

Alright onto some code. In RxJava2, the observable and observer classes are defined by Observable<T> and Observer<T> respectively. They are both defined under the io.reactivex package.

The Observer<T> class is an interface that contains the following methods.

These methods are directly translated from the observable to observer notifications. The method names are lower camel-case because of Java coding standards. The Pascalcase is the norm in C# where the reactive extensions first originated.

The Observable<T> on the other hand is a very large abstract class that contains all the available methods for observables so we will just show its base class, ObservableSource<T> instead.

Notice that it has only 1 method, the subscribe method that accepts an observer. This observer is what will be used by the observable to emit items to. Forget about the missing notifications for now, as we will definitely tackle them in the next articles.

Can we see some action?

Before we see some action, we have to create an observable first (remember, you cannot instantiate it directly since it is an abstract class). Luckily for us, there are a ton of factory methods available in the RxJava2 library. We will discuss some of them in the next articles. For now, let’s use a very simple factory method called just. Observable.just creates an observable from the arguments you supply it with. Consider this example.

The above code creates an observable of type String with 2 items, hello and world. The doOnNext is an operator and it simply means do this operation to each of the items emitted by the observable. In our case, we print each item. Do not worry if operators confuses you at the moment. We will discuss it thoroughly on next articles. We are using it now to showcase an important state in RxJava2.

Running this code will net you no output logs. This is because at this point, no observer is attached to the observable. Therefore, we can conclude that emission will only happen if there are observers subscribed, a behavior which is defined in the contract. This pre-subscription moment has a name and it is called Assembly Time. Assembly time is synonymous to building the processing pipeline or setting up the stream graph.

Now let’s try creating an actual observer and subscribe.

We instantiated an Observer<String> class and added logs on each method and subscribed to the observable, passing the observer instance. Notice that you can only subscribe with an observer of the same type parameter with the observable, in our case, String. There is a special event that happens at the point of subscription called Subscription Time. This is the point where subscription side-effects are triggered (like doOnSubscribe) before the actual items are emitted. The point where items are actually emitted and flowing through the processing stream is called Runtime.

Running the above code will net you the following output.

onSubscribe
hello
onNext: hello
world
onNext: world
onComplete

Based on the above logs, onSubscribe is called first (Subscription Time), then doOnNext and the observer’s onNext handler for each item (Runtime). We can see that each item is processed vertically. This behavior is similar to Java streams and Kotlin sequences, the main difference being the nature of data flow. Stream/Sequences are pull based while reactive extensions is push based. Lastly, the onComplete is called signifying the end of the stream.

What’s next?

We had just created our very first observable and observer. Congratulations! But I’m sure you have more questions than answers at this point. For now, rejoice in the fact that we are one step closer to “reactifying” our apps.

For the next article, we will take a in-depth look at observables and observers and how the notifications work under the hood. Give this article a clap if you enjoyed it.

Check out the other articles in this series.

  1. Introduction to Reactive Programming
  2. Building your first Observables and Observers
  3. In Depth Observables and Observers
  4. Marble Diagrams and Operators
  5. Observable Factories Part 1
  6. Observable Factories Part 2
  7. Single, Maybe and Completable
  8. Hot and Cold Observables
  9. Filtering Operators Part 1
  10. Filtering Operators Part 2

--

--