Demystifying RxJS, Part II: Building our own operators

Travis Kaufman
13 min readNov 27, 2019

--

This is the second part in a series called Demystifying RxJS, where we build our own miniature version of RxJS in order to gain a deep fundamental understanding of how it works. If you’ve read Part I, you can continue where you left off. Or, you can fork the CodeSandbox containing the completed code for Part I. You can also find the completed code for this section at https://codesandbox.io/s/demystifying-rxjs-part-ii-completed-5fqxy. Finally, you can view the complete miniature library code at https://codesandbox.io/s/demystifying-rxjs-complete-implementation-5ib63.

In Part I of this series, we implemented Observables, a key component of RxJS. However, these Observables are pretty bare-bones: all they offer is a subscribe() function that allows us to be notified of events from the observed streams. As I mentioned at the end of Part I, the real power of observables comes from being able to treat streams of asynchronous data as lists. We can accomplish this by using operators, which allow us to do just that.

In Part I of this series, we implemented Observables, a key component of RxJS. However, these Observables are pretty bare-bones: all they offer is a subscribe() function that allows us to be notified of events from the observed streams. As I mentioned at the end of Part I, the real power of observables comes from being able to treat streams of asynchronous data as lists. We can accomplish this by using operators, which allow us to do just that.

In this part of the series, we will extend our miniature library to include some basic operators, and add the venerable pipe() method to our Observables. We will see how doing this allows us to easily leverage and combine operators to create the powerful manipulation of asynchronous data streams that RxJS provides. We will then explore higher-order mapping operators, building our own versions of mergeMap and concatMap. This will give us a much better understanding about how those more advanced operators function.

By the end of this section, you should have a very clear understanding of the core mechanisms that allow RxJS to function as such a powerful library, and be equipped with much of the knowledge you will need to use the library effectively. With that, let’s dive in!

I’ve focused a lot on this notion of representing an asynchronous data stream as a list. What makes lists so powerful is the operations that you can perform on them. Lists are the primitive mechanism in software engineering that allow arbitrary transformation and aggregation of a (theoretically) unbounded collection of items, in an extremely succinct manner. Consider the following code snippet:

const oneThroughTen = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];const squareSum = oneThroughTen
.map(n => n * n)
.reduce((s, n) => s + n, 0);

Consider what that squareSum method would look like if we did not have lists, let alone operations to use on them; it would be a much more unwieldy piece of code. Thankfully, we can use lists and their methods to semantically and succinctly describe what we are trying to accomplish.

You can think of an asynchronous stream of data exactly like a “list”, with two major differences:

  1. You don’t have all of the items in the list all at once
  2. You don’t know exactly when those items will arrive

Observables solve the above two problems by creating a unified abstraction over asynchronous data streams. Operators, then, allow us to achieve the same level of semantics and succinctness we have with lists when operating on streams of data within Observables.

For example, we can easily compute a squared sum over a stream of numbers using RxJS:

import {of} from 'rxjs';
import {map, reduce} from 'rxjs/operators';
const squareSum = of(1, 2, 3, 4, 5, 6, 7, 8, 19, 10).pipe(
map(n => n * n),
reduce((s, n) => s + n, 0),
);

The pipe() function, along with the operators it makes use of, is the second large piece of the puzzle regarding what makes RxJS so powerful. Let’s now build these map() and reduce() operators ourselves, implement pipe(), and test our implementation by recreating the squareSum Observable.

Operators + pipe()

Let’s start off with defining our map() and reduce() operators, which we’ll need in order to produce our squareSum observable. Type the following right below where you left off in Part I (or where the code ends if you’re just jumping in at this point):

Perhaps it’s starting to become clearer as to how everything flows back down into that primitive observe() function passed to the Observable constructor. But if not, no worries! We’ll dive more deeply into it soon enough.

Before we do, however, let’s write our pipe() function for our Observable class. Add the following inside the Observable implementation in your code.

Finally, before we walk through what we’ve just done, let’s convince ourselves this is working as intended by writing some code to output our sum of squares result using pipe() and the operators we’ve just written. Return back to the end of the code you’ve written so far, and type the following:

You should see Squared sum = 385 logged to the console.

Also notice that just like with RxJS’s operators, it is easy to define operators which simply use pipe() to create new operators.

This will log “Squared sum w/ custom op = 385”

Breaking down Operators

Now that we know this works, let’s go back through what we’ve just written and walk through what’s going on.

The first thing we do is define an OperatorFunction type. This is our library’s definition of “operator”. RxJS has more advanced mechanisms for defining how operators function and how they can be created. For our purposes, we distill an operator down into what I feel is its most basic and understandable form: a function that can transform one observable into another observable. This is called an OperatorFunction in RxJS, and if you look at the guide on operators, you can see it’s the preferred method of how to think about them.

So how exactly do we transform one observable into another? Exactly the way we do everything else in RxJS: using the Observable’s observe() constructor function. We create a new observable whose observe() function subscribes to the source observable, and emits a new value based on the value the source Observable emitted. The logic within the observe() function of an Observable created inside an operator is the same as the logic inside an observe() created anywhere else. All we’re doing is stitching observation logic together in order to layer transformations on top of one another.

Notice how we call the map() function as an argument to pipe(). This is because map returns an operator, which, given a source Observable, emits values equivalent to calling the transformation function on values emitted by the source. reduce() is similar: it returns an operator which consumes values emitted from the source, aggregates them, and then when the source completes, emits the result. map(), reduce(), and functions like it are not themselves operators. Rather, they produce operators. When I was just starting out with RxJS, this was a big source of confusion for me, so I wanted to state that explicitly.

Breaking down pipe()

We’ve established that operators are simply functions that take in Observables and return new ones. So how do we chain a bunch of operators together? The same exact way we’d chain any other group of functions together: we compose them into a pipeline. pipe() is kind of like reduce, but for Observables. It starts out with the source Observable, and then calls each of the defined operators, in succession, “accumulating” the result. Because each operator returns a new Observable, you’re left with the composed result of having all of those operators be applied in succession on the source Observable.

Wrapping up the basics

At this point, you should have a pretty solid understanding of the basics of operators. If you’d like, you can pause here and try out some exercises to strengthen your understanding:

  • Write some more of your own operators, like filter(), scan(), catchError(), and debounceTime()
  • Imagine you had an observable that emitted an infinite stream of incoming numbers over a web socket, perhaps something like a real-time stock ticker API. Use your own operators to transform that observable into an observable that emits the opening, high, low, and closing values for every 30 second window (hint: take a look at bufferTime() to help you accomplish this).

Higher-order mapping operators

NOTE: What I’m about to demonstrate here is, in particular, radically simplified from how it’s implemented in RxJS. However, the core intuition used here is the same as what’s used in RxJS, just that in RxJS it is implemented in a more elegant (and advanced) way.

You may have noticed that up to this point, I’ve conveniently glossed over operators like mergeMap and concatMap, which you have probably used if you’ve worked with RxJS in an Angular setting. These are referred to as higher-order mapping operators. That is, they leverage functions which, given an emitted value, return an Observable rather than a new raw value. These are a little bit trickier to wrap your head around, especially if you still haven’t really grasped the intuition around operators, so I’ve saved them for their own separate section. We’re going to build both mergeMap and concatMap, in as simple and straightforward a way as possible, in order to gain a better understanding of how these operators work.

Let’s start with mergeMap, which takes a function producing an observable as an argument. The returned observable works such that any time the source observable emits a value, the given function is called with the value to produce that new observable, and that observable is subscribed to and begins emitting immediately. All of the emissions from all of those produced observables are merged into returned observable as they arrive.

Type this code into your editor (bear with me if it looks a bit dense, we’ll go back over it soon enough!):

Let’s write a more in-depth test case to prove to ourselves this works. Write the following below the definition of mergeMap:

Here, we have an observable outer that emits 1, waits for 800ms, emits 3, waits three seconds, and then completes. We pipe() this observable into mergeMap which creates a new observable which emits three values equal to 10 * x 500ms apart, and then completes.

If mergeMap works as intended, what we should see within the final subscription is two emissions of 10, followed by one emission of 30 interleaved right before the third emission of 10, since 800ms will have passed after two 10s have been emitted but before the 3rd 10 is emitted, and therefore outer should emit a 3, triggering a new call to createInner(). Finally, the third 10 and remaining two 30s should be triggered.

The best I could do at a pretty marble diagram 😅

And in fact, this is what we see if we inspect the console output from our mergeMap subscription:

Notice the 30 interleaved between the 2nd and 3rd 10

Let’s go back over this code to get a good sense of what it’s doing.

The first part of this code is similar to our map() function: we keep a running tally of the current index, and pass that to project along with the output emitted from the subscribed Observable. However, the logic that manages subscriptions is a bit more involved.

The first thing you’ll notice is the subscriptions set. We’ll need that so we can keep track of both the subscription to the top-level Observable, but also to the inner Observables returned by the project() function. We make sure to register all subscriptions in our subscriptions set, so that we can unsubscribe to all observables when the returned observable is unsubscribed from.

Whenever we receive a new value within subscribe(), we create a new Observable by calling project() with the emitted value and the current index. We then subscribe to that returned Observable, allowing us to forward the created Observable’s values through to our original observer chain, while also accounting for errors and completions. In effect, we merge the output from the created Observables into our output Observables.

Notice that we remove the subscription from our set when an inner Observable completes. Per the Observable contract, we no longer have to unsubscribe from that subscription on completion. However, we do want to continue emitting values from new projected Observables even when the inner Observable completes. Therefore, we specifically do not complete on the returned observable from the operator. However, if the source Observable completes, we make sure we unsubscribe, so that we get no more emissions from the output Observable (note that unsubscribing from the original source Observable has no semantic effect here).

The key takeaways here are twofold:

  • Higher-order mapping operators work by subscribing to returned Observables and forwarding their values through the original Observer chain. In essence, the Observer from the returned Observable acts as a proxy for all of the projected Observables created inside the function.
  • mergeMap and all merge* operators emit observable values as soon as they arrive. Notice how, as soon as an Observable is created, values begin to be emitted. Even if previous inner Observables haven’t yet completed, the code here is impartial to that. This is a key behavior within the merge family of operators.

Hopefully, this begins to lift the veil up over how some of these more advanced higher-order mapping function work. To really solidify our understanding, let’s wrap up by implementing a more complex higher-order mapping function: concatMap.

As usual, we’ll look at the code first, and then go back over it. Type this out into your editor:

Unlike mergeMap, concatMap does not emit values from projected Observables until any currently emitting Observable completes. Contrasting the marble diagram to mergeMap above, we would have:

Notice here, there is no interleaving between the 10s and 30s; the 30s come strictly after the 10s.

Let’s test that this is the case using the same tests for mergeMap we used above. Type the following below the definition for concatMap:

If we run this, we should indeed see that the projected observables emit strictly in the order in which the source observables were emitted:

Let’s go back through the code and analyze how we achieved that.

The first thing to notice is how we introduced the buffer array. In concatMap, order matters, so we need a way to control precisely when we subscribe to Observables. By using a list, and shifting off of that list, we can create a deterministic first-in, first-out ordering of emitted values from each Observable. This is what the buffer and the subscribeTo functions are used for; they help ensure that Observables are subscribed to in the correct manner. When we receive a new value from the source Observable, we create a new inner Observable from that value. Then, we check to see if we’re currently subscribed to an observable by checking to see if we have any subscriptions in our subscriptions set. If so, we push that observable into our buffer where it can await subscription. You can think of this buffer like a work queue. We can only subscribe to one observable at a time, so subsequent observables will have to wait until the currently subscribed one completes.

If we’re not subscribed, we call subscribeTo(projected, obs, subscriptions), which subscribes to the inner Observable. The logic to focus on in subscribeTo is the inner Observable’s completion function. When the inner Observable completes, we check to see if: 1) We no longer have any subscriptions in our subscriptions set (more on this in a bit) and 2) We have any pending observables that need subscription. If this is the case, we immediately subscribe to the next Observable that’s awaiting subscription.

The key takeaway here is that by using a queue-like mechanism to limit concurrent subscriptions, and only subscribing to Observables once a currently subscribed Observable completes, concatMap can guarantee order of emission for its projected observables.

Abstracting higher-order mapping operators

Now at this point, even though everything might make sense to you, you’ll most likely have some questions:

  • Why do we even need a subscriptions set in the first place for concatMap?
  • Can’t we reuse a lot of the logic between mergeMap and concatMap?

You are 100% right in asking these questions. I have chosen to write these higher-order mapping operators in such a way as to guide you towards understanding how they could be further abstracted, eventually arriving at how RxJS handles it. In RxJS, this buffering and subscribing flow is handled in a much more complex, powerful, and elegant way via merge, concat, and friends. If you’re interested, it’s a great exercise to read through those API references and the source code to get a better sense of how they work. For example, merge and concat in RxJS have a notion of “concurrency”. Looking at concatMap, you may be able to imagine how instead of checking, in our subscriptions array, that it is of size 0, we could check if it was of size N where N is the number of allowed concurrent subscriptions. You may then perhaps better understand what they mean in the RxJS docs when they say that merge is equal to concat with a concurrency of infinity. Conversely, if merge had the notion of a buffer built-in, it could represent concat as a merge operation with a concurrency of 1.

A final optional exercise would be to try and refactor mergeMap and concatMap to align more with how it’s actually built in RxJS. Otherwise, if you’re curious but don’t feel like coding it up yourself, Angular University has a comprehensive post on higher-order mapping.

Conclusion

If you’ve gotten this far, give yourself a big pat on the back, and maybe even do a victory dance. You have built a library yourself that can mimic a lot of the functionality of RxJS. You‘ve created not only a fully-functional Observable class, but a suite of operators that you can use for powerful manipulation of asynchronous streams of data. By doing so, you have hopefully gained a much deeper understanding and intuition about how one of the core libraries relied upon by Angular — and the library for reactive programming on the web — does what it does.

I hope that these two tutorials have helped demystify RxJS for you :) I’d love to hear your feedback in the comments, so please let me know how this can be improved!

There is one more part — Part III — which talks about schedulers. Schedulers are a more nuanced topic in RxJS that you may not see as much, but if you’re curious to implement them, read on! If not, thank you so much for reading and happy hacking! 🔁

--

--

Travis Kaufman

Software engineer specializing in UI / UX development. Proud New Yorker, lifelong learner. ⚡️Gryffindor ⚡️