Demystifying RxJS, Part III: Building our own Schedulers

Travis Kaufman
6 min readNov 27, 2019

--

This is the third and final part in a series called Demystifying RxJS, where we build our own miniature version of RxJS in order to gain a deep fundamental understanding of how it works. If you’ve read Part II, you can continue where you left off. Or, you start here and follow along by forking the CodeSandbox containing the completed code for Part II. Or, check out this CodeSandbox containing the complete miniature library.

Over the last two parts of this series, we’ve built our own miniature version of RxJS, complete with Observables and operators. In this final part, we’re going to dive a bit deeper and talk about schedulers. This is “closer to the metal”, so to speak, within RxJS than Observables and operators. Especially if you’re mostly using RxJS through Angular, you may be able to get all the understanding you need just by reading Part I and Part II. However, if you’d like even more insight into how RxJS works, or you’re struggling with something in your code you think has to do with the precise time at which an Observable emits a value, read on.

Schedulers are a mechanism that gives you fine-grained control over exactly when values from an Observable are emitted, without having the observer code itself having to know the details over that timing. I have found this to be particularly important when you need to ensure that, when designing APIs which return Observables, you don’t inadvertently trigger the release of a demonic hell spawn destined on bringing humanity to its knees and orchestrating the demise of life in this universe as we know it.

Let me explain.

Consider this example Observable-based API service, which fetches data from an endpoint, and caches that data in-memory after it is returned, using its URI as a key:

const cache = new Map<string, any>();function get<T>(endpoint: string): Observable<T> {
if (cache.has(endpoint)) {
return of(cache.get(endpoint) as T);
}
return http.get(endpoint).pipe(
tap(data => {
cache.set(endpoint, data);
}),
shareReplay(1), // So we only ever set to the cache once
);
}

A service like this might be useful if you’re designing an analytics application where a client might make multiple changes to a page that would in theory trigger multiple API requests, but the data being returned is unlikely to change within a single session.

Now let’s say we use the API like so:

function fetch() {
console.log('[fetch] Before subscribe');
get('/data').subscribe(data => console.log('Response:', data));
console.log('[fetch] After subscribe');
}
console.log('Initiating first fetch');
fetch();
// Then, sometime later...console.log('Initiating second fetch');
fetch();

Here’s what the code will output the first time it is used:

Initiating first fetch
[fetch] Before subscribe
[fetch] After subscribe
[fetch] Response: ...

But what about the second time? You’d think it would be the same as the first, right? Alas:

Initiating second fetch
[fetch] Before subscribe
[fetch] Response: ...
[fetch] After subscribe

In the second subscribe, the subscription fires in-between the before/after console logs, whereas for the first subscription, it fires after the two logs.

This makes sense if we recall how we implemented of() in Part I. Notice how, if we don’t have a response in our cache, we call from(fetch(...)). fetch returns a Promise, which is always evaluated asynchronously, after the currently executing call stack ends. However, for of(), when we subscribe we simply iterate over all of the given values and call next(), passing each of those values synchronously. Thus, while from , when given a promise, will always emit the value from the promise asynchronously, of will always emit its values synchronously.

In my opinion, this presents a major problem in terms of API consistency. If you’ve read Isaac Schlueter’s Designing APIs for Asynchrony article (and if you haven’t, and you’re designing JS APIs, I highly recommend you do), then you may recall one of the most important — if not the most important — passages in that article:

If you have an API which takes a callback,
and sometimes that callback is called immediately,
and other times that callback is called at some point in the future,
then you will render any code using this API impossible to reason about, and cause the release of Zalgo.

Unfortunately, given the way our Observables currently work right now, our subscription logic is sometimes called immediately, and sometimes not. Therefore, it is releasing Zalgo. And I think we can all agree that this is not an ideal situation.

Definitely not ideal. Image Credit: https://creepypastavillains.fandom.com/wiki/Zalgo

So how can we prevent this? One way is to always use from and wrap our cached responses in a Promise. That’s okay, but it’s pretty clunky. What we really need is a way to give our Observable class better control over when emissions of data are scheduled. Hence, the need for schedulers arises.

To continue down the path of building our own miniature version of RxJS, let’s implement a basic version of schedulers ourselves. It will include:

  • Two schedulers, one that emits values immediately and one that emits values asynchronously by using setTimeout.
  • Support for schedulers within our Observable class
  • An observeOn operator, similar to the one found in RxJS, which creates a new Observable equivalent to the source observable but using the scheduler specified.

By building this, we can see how schedulers allow us to easily control the timing of emissions from observables, and can therefore help us fix the inconsistency issue with our API code above (and therefore keep Zalgo where it belongs: far, far away from our universe).

Scheduler interface and instances

Let’s start by creating a Scheduler interface similar to RxJS, but — like most of what we’ve written in this series — a lot simpler. Type the following just above the Observable class definition:

Our scheduler interface will simply have a single function, schedule(), that accepts a work callback to be executed at the scheduler’s discretion.

Now, let’s make two concrete instances of this interface: a syncScheduler, that simply executes work immediately, and an asyncScheduler, which defers the execution of work via a setTimeout() call. Type the following right below the scheduler interface definition:

If you’re a confused as to how setTimeout causes the execution of JavaScript code to be deferred, I highly recommend reading John Resig’s article on JS Timers, which has stood the test of time as one of the best articles out there on how JavaScript timing functions work.

Observable modifications

Now that we have our schedulers, we’ll need to make use of them in our Observables. Change the Observable constructor as well as the subscribe() method to look like the following:

Notice how in this code, when we create our observer within subscribe, we create a proxy observer that calls this.scheduler.schedule within its interface methods, delegating the actual callbacks within that method. This separation allows for an additional intermediary layer where the observer is still responsible for signaling events regarding the asynchronous data stream, but the scheduler is responsible for determining precisely when those events arrive to listeners.

We can now start using our different schedulers within our Observables, but doing so would require us to always use the Observable constructor, which is a bit clunky. Instead, let’s create a nicer API but adding an observeOn() operator that we can use as an argument to .pipe() in order to force the resulting observable to use the correct scheduler. Type the following at the very bottom of your code file:

We can prove that this works, like we did for our operators in Part II, by writing out a simple test program:

Upon doing so, we can see that the observable values are always emitted after the just after subscribe console log.

The key takeaway here is how schedulers can be used to provide fine-grained control of when observable subscription logic is executed. This can be used in situations that require specifying exactly how an event should arrive. In the case of our example RxJS API code, we can use schedulers to ensure that the API subscription timing is always consistent by passing asapScheduler — the scheduler used for Observables that deal with promises — as the final argument to of():

return of(cache.get(endpoint) as T, asapScheduler)

Conclusion

By now, you’ve built pretty much want constitutes the core of RxJS. I hope that by doing so you have developed a foundational understanding of how all of the primitive mechanisms in RxJS work. My real hope is that you should have no problem using the library effectively.

To explore further, I suggest checking out https://rxjs.dev/guide/subject and the dev guide on schedulers. Also, if you’ve enjoyed working on this version of RxJS and want to work on the real version, I’m sure they would happily accept contributions :)

--

--

Travis Kaufman

Software engineer specializing in UI / UX development. Proud New Yorker, lifelong learner. ⚡️Gryffindor ⚡️