Demystifying RxJS, Part I: Building our own Observables

Travis Kaufman
7 min readNov 27, 2019

--

This is the first part in a series called Demystifying RxJS, where we build our own miniature version of RxJS in order to gain a deep fundamental understanding of how it works. You can follow along using this starter template on CodeSandbox, or check out this CodeSandbox containing the complete miniature library. You can also view the completed code for this section only at https://codesandbox.io/s/demystifying-rxjs-part-i-completed-0utge

Observables are the abstraction at the heart of RxJS. They provide the foundation that the rest of the library builds upon. Let’s start our own RxJS library by building Observables ourselves.

Below is the TypeScript code — in its entirety — for our extremely bare-bones Observable class. If this looks like a lot, don’t worry! We’ll be going over all of this code in detail soon enough. For now, if you’re following along using the starter template, type this below the last line of code within src/index.ts:

Our bare-bones observable class

To prove to ourselves that this actually works, let’s adapt the example from the official docs on Observables and see if we get the same output. Add the following right below your Observable code:

A basic observable test, adapted from the docs shown on https://rxjs.dev/guide/observable

If you’re following along using the starter template, hit the refresh button within the browser window in the editor and then open the editor console. You should now see output which matches that shown on the official guide:

Our output from our custom Observable code on the left, vs. the RxJS guide code on the right. They are equivalent.
Output from our code (left) vs. the RxJS guide (right)

Hooray! We got ourselves some Observables. Take a few minutes to examine the code you’ve just written. Hopefully by writing it out and studying the example below it, you’ve started to gain an intuition as to how it might work. If not, don’t worry, because we’ll be spending the rest of the article breaking it down 😁. Observables are the most important part of this library for you to understand. Having a solid grasp of how Observables accomplish what they accomplish will clear up a lot of the confusion around how RxJS works. Let’s now go back and walk through the code together.

Observers

Observables operate on streams of asynchronous data. A stream is basically a fancy way of saying “a sequence of data that might arrive, but you don’t know when”. For example, a stream could be data arriving from a WebSocket. A stream could also be clicks on a button, where the “stream” is the sequence of click events that happen over the course of a session. A stream could also be a timer that might fire (unless it’s cancelled), or an http request that might complete (unless it’s aborted). You can think of those last two examples as a sequence of only one piece of data (either the timer event, or the response). In RxJS, these examples are all treated the same, and this one of the sources of its power. But I digress.

Since we’re dealing with streams of data, it follows that we need something to directly interact with those streams. Therefore, we define an Observer interface, which is responsible for directly interacting with the stream our Observable is wrapping. Observers signal to Observables that something interesting has happened within the stream of data it is observing. That “something interesting” comes in one of three flavors:

  • A new value has arrived. This is what next() is for.
  • An error has occurred. This is what error() is for.
  • The stream of values has ended, and no more values will be emitted. This is what complete() is for.

One note for the astute “observer” (😏): you will see what we are calling Observers is called Subscribers in RxJS. An Observer in RxJS is actually an interface that a Subscriber implements. As I’ve mentioned, we are creating a radically simplified version of RxJS, so we don’t need this layer of abstraction. If you’re curious, I’d encourage you explore the RxJS APIs, which may indicate why more levels of complexity are needed (personally, I think Subjects seem like a likely cause).

Subscriptions

Next, we define a Subscription interface, which is responsible for managing observation on a stream of asynchronous values.

You can think about a subscription like an id you get when you call setTimeout() / setInterval(): it’s ahandle” to the observation itself. When you begin observing, you get the handle. When you want to stop observing, you can call unsubscribe(). Calling unsubscribe() is like calling clearInterval(); it signals that the handle is no longer needed.

Observables

Finally, we have our Observable class. Let’s step through the observable in detail.

The constructor takes in an observe() factory function for observing an asynchronous stream of values. The factory function is passed an Observer, and that observer is expected to interact with said stream of values. This factory function is crucial, because it links the Observer to its underlying stream. It might set up event handlers, set up timer callbacks, subscribe to socket events, or anything else that deals with asynchronous behavior. It is this factory function, given within the Observable constructor, that allows the underlying stream to be abstracted into the Observable interface. It is, in my opinion, an extremely important part of RxJS to understand. It’s also the one that’s easiest to miss, because normally we create Observables using factory methods like of(), from(), and the like (more on those later).

In our example code, we’ve written a factory function that has the Observer immediately emit three values, and then wait 1 second via a setTimeout() call before emitting an additional value and completing. While this is a contrived example, imagine a data feed from a WebSocket: you don’t know exactly when the data will arrive, but when it does, you need to react to it in some way. In this case, the Observer interactions would look similar to the code inside that setTimeout() callback.

If there is any cleanup logic that needs to be done when the observation is to end, the factory function can return a teardown function that is responsible for undoing any setup code needed to listen to the asynchronous values. Continuing with the example of a WebSocket data feed, if the user takes some action such that we can stop listening for changes, we no longer need to listen to those events on the socket. Thus, the teardown function allows us to unsubscribe from our observation.

Even though all our Observable can do right now is subscribe to values emitted from a stream of data, this is all that’s needed to provide the foundation for our observable library. Let’s demonstrate this in the next section.

Solidifying our understanding: Observable factories

The key to understanding how an observable works lies within the subscribe() function. As we can see from the code, Observables take the callbacks that you provide to subscribe(), turn them into an Observer, and call the observe() constructor function with that Observer. When you peel back all of the layers of abstraction, this is all that observables actually do.

If you’re not convinced that this is the case, let’s implement some of the Observable factory functions you’ve most likely come across while working with RxJS to see how it all ties back into this observe() function.

To start off, let’s implement of(), which simply iterates through all values provided as arguments, forwarding each one to the observer, and then completes.

of() factory function

Next, let’s implement from(), simplified — for brevity — so that it accepts only array-like objects and promises. Notice how here, we either iterate through the array-like object or await promise completion (respectively), passing the results back into the observer.

from() factory method, simplified

Notice that, in from, we explicitly do not complete if there’s an error within the promise. This is part of the Observable contract, which is a spec that governs how Rx-family observables work. You must not emit anything after emitting an error, including a completion (this, by the way, is why you can’t reuse Angular HTTP observables once they’ve error’d out, which was a source of confusion for me personally).

Finally, let’s implement fromEvent(). Here, we’ll create an event listener which forwards all events to the observer. We’ll also return a teardown function that removes our event listener.

fromEvent() factory method

As you can see from above, each of these are implementable just by passing in a pre-defined observe() factory function into our Observable’s constructor.

Key takeaways

There are some key things to note based on what we’ve written above:

  • An Observable’s observe() function is not called until subscribe() is called. This is what we mean when we say an Observable is cold. As you can see, this makes sense because we don’t want values to be triggered until we subscribe to them, lest we miss being notified of their arrival.
  • Notice how in fromEvent, there’s no way we could prevent events from being emitted before the observable is subscribed. A user could click that button whenever they wanted to, whether or not we were subscribed. This is what we mean when we say an Observable is hot.
  • Notice how every time we subscribe(), we re-invoke the factory function. This is where all those unexpected multiple triggers come from, and it’s what share() and friends prevent.

We are now armed with the baseline functionality we need for an Observable library. You can use what we’ve built here to wrap any stream of asynchronous events.

The real power of Observables, however, comes from being able to treat streams of asynchronous data as lists, and lists are only useful when they can be operated on. In Part II, we’ll write our own operators, as well as a .pipe() function, which we can use to achieve this functionality.

--

--

Travis Kaufman

Software engineer specializing in UI / UX development. Proud New Yorker, lifelong learner. ⚡️Gryffindor ⚡️