Welcome to part 3 of the series. In this installment, we’re going to look at RxJS’s take on the Observer pattern, and start implementing an observable from scratch. Our humble Observable will be nowhere near as sophisticated as what’s going on inside of RxJS, but it will give you a good enough understanding of observables so that you can start digging into the RxJS source code to learn more.
To understand Observables, it’s important to start with the Observer Pattern.
The concept is pretty straightforward. There is some object containing state that will change over time. This is known as the subject in the classical Observer Pattern. All this subject does is accept callback functions from observers, which are objects that want to be notified whenever the subject’s state changes. Whenever such a state change happens, the subject loops through all of the observer callbacks, and invokes them with the new state as an argument.
The classic observer pattern has been a staple of web development for many years. However, it has not been without its shortcomings, or its critics.
For our purposes, it provides no way of containerizing events, meaning we can’t compose streams out of subject events.
RxJS improves upon this classical observer pattern by introducing a more robust interface for observers, one that supports not just a method for publishing data (onNext), but also methods for notifying observers of completion, as well as errors.
Note that an observer’s
onNext method can be called many times, but
onCompleted indicate that no more data will be arriving from the observable.
As a quick aside, observables and subjects are technically different from one another. In RxJS, subjects are “stateful”, in that they maintain a list of subscribers that they multicast data to, similar to the subject in the classical observer pattern. By contrast, observables are really just functions that set up a context for observation, without holding onto state. Each observer will see its own execution of the observable. More on this in a later article, as it’s not too important right now.
Our simple class must support the Observer interface, which means that any observer that subscribes to our observable will provide the following three methods:
onNextto be called each time our observable emits data
onErrorto be called if an error happens in our observable
onCompletedto be called when our observable is done producing data
For reference, the official Observer interface can be found here.
A simple scenario looks like this:
Don’t concern yourself with the details too much just yet, we will implement everything in that example by hand shortly.
The Observer interface can be thought of as an analog to the Iterator interface. While iterators let a consumer pull data from a source, observers let a source push data to an observer, via
onNext. Similar to how Iterators can communicate error and completion information, our Observer has
onCompleted . In many ways, the two are symmetric. Jafar Husain has a great talk on the symmetry between the Iterator pattern, and the Observer pattern. He also talks about it here.
The first thing to do is create the Observable class.
The class has two functions, a public
subscribe, and an internal
_subscribe. This is pretty similar to how Observable is actually implemented in RxJS. The public method is a common way for observers to subscribe by providing
onCompleted functions. The internal
_subscribe method decides when to call those methods, and will be dependent on the implementation details of the underlying stream being observed, as we will see shortly when we implement
By itself, this Observable doesn’t do anything. It just provides the machinery for setting up observation for observers and their onNext, onError, and onComplete handlers. In order to do that, we need to add some creational methods. We’ll implement three,
But before we do that…
A quick note on RxJS 6, and pipeable operators
In the old days of RxJS (v5 and below), creational methods and operators were all put on the Observable prototype. Starting with RxJS 6, this has all changed. Operators are no longer placed on the Observable prototype, which leads to better tree-shaking, smaller bundle sizes, and better interoperability with third-party libraries. The chaining syntax for composition, such as:
Observable.of(1,2,3).map(x => x + 1).filter(x => x > 2);
has also been replaced in favor of using the
pipe operator for composition:
of(1,2,3).pipe(map(x => x + 1), filter(x => x > 2));
Conceptually, nothing much has really changed, so we’re going to stick to the RxJS 5 style of chaining operators on Observable prototype for composition, just for the sake of simplicity. In a later article, we’ll break things up, and implement a
pipe operator a la RxJS 6.
If you’d like to know more about RxJS 6 and pipeable (lettable) operators, head over to the RxJS section of AngularInDepth.
Now, with that out of the way, let’s implement a creational method, and write some tests for it. You can follow along with this repository:
Learn the power of RxJS Observables by implementing your own simple observable class! This repository supports my talk…
This method takes in some arguments, and simply returns an observable that emits each of those values one by one, then completes. Here’s the code:
Notice how on line 6, we start by returning a new Observable. This is a very common pattern that we will see over and over again when implementing this Observable class. An Observable operation must always return a new Observable, otherwise, composition will not be possible. For instance, if
Observable.of did not return an Observable, then we would not be able to do things like
Observable.of(5).map(x => x * 2) because
map expects an Observable in order to be callable. It’d be like if
Array.prototype.filter didn’t return an array, then things like
[1,2,3].filter(x => x > 1).map(x => x * 2) would break, because you’d be trying to call
map on something other than an array.
Let’s write a couple of unit tests for
of and see how it does. As a reference, here are the official RxJS 5 tests.
Of is pretty simple. By default, its execution is going to be synchronous once subscribe is called, so the
unsubscribe method will likely never be called. However, we supply one just in case.
Let’s take a look at a similar method,
from, which consumes an iterable.
from accepts an iterable, and fires onNext for each element.
Nothing too crazy here. We use a
for...of loop to consume the iterable. Here are some tests:
Last but certainly not least, let’s build
fromEvent is exciting since it will be our first asynchronous stream. We’ll implement a simplified version which only deals with DOM events, although RxJS’ implementation supports any event source which has
removeListener style methods.
We take a source DOM element, as well as the name of an event. We then use the browser’s
removeEventListener functions to manage that subscription.
- Observables are lazy. They don’t emit values until they are subscribed to (unless they are hot observables. More on that later).
- RxJS improves on the classical Observer pattern by adding methods for error and completion, as well as enabling composition.
- Observers have
That’s it for this article. In the next article we’ll dig into operators such as