RxJS in-depth: Github repo ‘utils’ directory review (part 1)

Alexander Poshtaruk
Angular In Depth
Published in
6 min readJan 22, 2020

Curiosity leads to discoveries.

AngularInDepth is moving away from Medium. More recent articles are hosted on the new platform inDepth.dev. Thanks for being part of indepth movement!

RxJS is quite a popular library in Angular world so for me it is always interesting to find out something new about its internals. Today I want to dig into utils folder of its GitHub repo. This article is written not for some practical usage but rather for reverse engineering enthusiasts and my curiosity.

Here is a link to RxJS 6.5 /src/internal/util/ dir. This folder is full of files. Let’s review them one by one. Some of them can bring something interesting 🔍.

#1 ArgumentOutOfRangeError.ts

The main code is next:

function ArgumentOutOfRangeErrorImpl(this: any) {  
Error.call(this);
this.message = 'argument out of range';
this.name = 'ArgumentOutOfRangeError';
return this;
}
ArgumentOutOfRangeErrorImpl.prototype = Object.create(Error.prototype);export const ArgumentOutOfRangeError: ArgumentOutOfRangeErrorCtor = ArgumentOutOfRangeErrorImpl as any;

What is interesting here?

a) We inherit from built-in Error class here. So we can throw calls instance with the message we need.

b) superclass constructor is called in an explicit way: Error.call(this);

Where it is used in RxJS source code?

Let's take a look at takeLast(count) operator — it should emit last count values after the source observable is complete. Of cause it cannot accept negative count — so:

* @throws {ArgumentOutOfRangeError} When using `takeLast(i)`, it delivers an * ArgumentOutOrRangeError to the Observer's `error` callback if `i < 0`

at line 63:

if (this.total < 0) {      throw new ArgumentOutOfRangeError;    }

Why should I know this?

Reading source code can help you to understand how the library works in edge cases. And sometimes it is the only way to understand why it works how it works.

#2 EmptyError.ts

It looks very similar to ArgumentOutOfRangeError

function EmptyErrorImpl(this: any) {
Error.call(this);
this.message = 'no elements in sequence';
this.name = 'EmptyError';
return this;
}

EmptyErrorImpl.prototype = Object.create(Error.prototype);

export const EmptyError: EmptyErrorCtor = EmptyErrorImpl as any;

But the application scope is different.

Where it is used in code?

Let's review first operator — it should emit only the first value (or the first value that meets some condition) emitted by the source Observable.

But what is sequence is empty? We can find an answer at line 69 of rxjs/src/internal/operators/first.ts

* @throws {EmptyError} Delivers an EmptyError to the Observer's `error`
* callback if the Observable completes before any `next` notification was sent.
*

And this happens in lines 86–90:

return (source: Observable<T>) => source.pipe(
predicate ? filter((v, i) => predicate(v, i, source)) : identity,
take(1),
hasDefaultValue ? defaultIfEmpty<T | D>(defaultValue) : throwIfEmpty(() => new EmptyError()),
);

*Interesting remark: you can use take(1) instead of first() with almost the same result:

source$.pipe(take(1))
vs
source$.pipe(first())

The only difference that take operator will not emit EmptyError if source$ completes before producing value.

Why should I know this?
Since first operator uses EmptyError — a final code bundle should contain EmptyError.ts file too. So when you use first operator — a final bundle size will be a bit bigger. You can read more about it here.

#3 Immediate.ts

export const Immediate = {
setImmediate(cb: () => void): number {
const handle = nextHandle++;
tasksByHandle[handle] = cb;
Promise.resolve().then(() => runIfPresent(handle));
return handle;
},

clearImmediate(handle: number): void {
delete tasksByHandle[handle];
},
};

Just a wrapper for putting tasks to microtasks queue. it is used in AsapAction which is used by RxJS AsapScheduler. Schedulers is a separate big topic, you can read more about it in my article here. But just for this case — it schedules data to be emitted in a microtask just after the current macrotask in browser EvenLoop queue.

It is done by this code:

Promise.resolve().then(() => runIfPresent(handle));

Here is a usage example in rxjs/src/internal/scheduler/AsapAction.ts:

return scheduler.scheduled || (scheduler.scheduled = Immediate.setImmediate(
scheduler.flush.bind(scheduler, null)
));

Why should I know this?
Knowing how RxJS schedule tasks (by means of special entity — Scheduler) is the key part of understanding how to write unit tests for RxJS code in Angular applications.

I released FREE video-course about main unit testing tools for RxJS code. You can get it here!

If you are curious about Schedulers topic in RxJS take a look at:

  1. So how does RxJS QueueScheduler actually work?” which also contains also common Schedulers intro.
  2. RxJS: applying asyncScheduler as an argument vs with observeOn operator
  3. Videos by Michael Hladky:
    RxJS schedulers in depth
    RxJS Schedulers from outer space

Do you like the article? You can find more interesting RxJS staff in my video-course “Hands-on RxJS”. It may be interesting as for beginners (Section 1–3) as well as experiences RxJS developers (Sections 4–7). Buy it, watch it and leave comments and evaluations!

#4 ObjectUnsubscribedError.ts

function ObjectUnsubscribedErrorImpl(this: any) {
Error.call(this);
this.message = 'object unsubscribed';
this.name = 'ObjectUnsubscribedError';
return this;
}

ObjectUnsubscribedErrorImpl.prototype = Object.create(Error.prototype);

...
export const ObjectUnsubscribedError: ObjectUnsubscribedErrorCtor = ObjectUnsubscribedErrorImpl as any;

This error is thrown when someone tries to emit with closed Subject.

Where is it used in RxJS source code?

For example, in a rxjs/src/internal/Subject.ts we throw this error in case someone wants to emit using Subject which is forcefully unsubscribed all the observers by calling special Subject.unsubscribe method.

const subj = new Subject();
let subsciption = subj.subscribe(console.log);
subj.next('Before force unsubscription');
subj.unsubscribe();
subj.next('After force unsubscription'); // Throw here!

You can check this behavior in a codepen playground.

I asked Nicholas Jamieson (RxJS core team) for possible use-cases of Subject.unsubscribe method and he did a favor to answer me:

There isn’t a usecase that I would consider valid. IMO, it should be removed. All it does is drop references to all subscribers and ensure it can’t emit further notifications. It just confuses people.
AFAICT, in .NET dispose is the terminology - instead of unsubscribe - and that makes a little more sense. With the method named unsubscribe, it's misleading.

#5 TimeoutError.ts

function TimeoutErrorImpl(this: any) {
Error.call(this);
this.message = 'Timeout has occurred';
this.name = 'TimeoutError';
return this;
}

TimeoutErrorImpl.prototype = Object.create(Error.prototype);

...
export const TimeoutError: TimeoutErrorCtor = TimeoutErrorImpl as any;

If we make a search on TimeoutError in RxJS github repo we can find out that it is used for timeout operator implementation:

export function timeout<T>(due: number | Date,                           scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T> {       return timeoutWith(due, throwError(new TimeoutError()), scheduler);
}

Pay attention to interesting timeoutWith operator. We can find a description of it in its source code:

`timeoutWith` is a variation of `timeout` operator. It behaves exactly the same, still accepting as a first argument either a number or a Date, which control — respectively — when values of source Observable should be emitted or when it should complete.

The only difference is that it accepts a second, required parameter. This parameter should be an Observable which will be subscribed when source Observable fails any timeout check.

So from that phrase, we can do two interesting conclusions:
1. throwError(new TimeoutError()) is the initially erred Observable that will throw with TimeoutError if no source$ emissions happen during the specified timeout.

2. By using timeoutWith operator we can provide our own Observable that will be subscribed in case no source$ emissions happen.

Quite interesting!

Takeaways

Let's wrap up what we've learned today:

  1. takeLast(count) will throw on negative values.
  2. Where is a diff between take(1) and first() operators: take(1) will produce a smaller bundle size.
  3. RxJS uses Promise.resolve().then for so-called Immediate value emission (used by AsapScheduler).
  4. You can drop all subscribers with Subject.unsubscribe method. And if after that you will try to emit — Subject will throw an error.
  5. There is an interesting timeoutWith operator, that allows to provide observable which will be subscribed if source$ will not emit in specified timeout.

I hope you enjoyed the article and that it will encourage you to do (and blog:) your own discoveries!

If you like it — plz clap. If you want part 2 — clap twice :-)

Let’s keep in touch on Twitter.

Cheers!

--

--

Alexander Poshtaruk
Angular In Depth

Senior Front-End dev in Tonic, 'Hands-on RxJS for web development' video-course author — https://bit.ly/2AzDgQC, codementor.io JS, Angular and RxJS mentor