Mastering Angular: Dangers and Treasures of RxJS

šŸŖ„ OZ šŸŽ©
6 min readMar 8, 2023

--

ā€œRoad with Cypress and Starā€, Vincent van Gogh, 1890
ā€œRoad with Cypress and Starā€, Vincent van Gogh, 1890

In this article, Iā€™ll show you how to avoid known dangerous pitfalls of RxJS, and will share some useful snippets.

Dangerous Operators šŸ“ā€ā˜ ļø

Some operators can cause infinite loops or make your observable behave not as you expect. Letā€™s start with the most dangerous.

combineLatestWith()

Itā€™s a very handy operator ā€” usually, you want to pass there one or multiple observables and react to them.

// Mocks of the sources, just for illustration
const showNames$: Observable<boolean> = of(false);
const showEmails$: Observable<boolean> = of(false);
const users$ = of([{name: 'a', email: 'a@a'}]);

// Now let's create an observable
// using `combineLatestWith()`

const usersList = users$.pipe(
combineLatestWith(
showNames$,
showEmails$,
),
// when `showNames$` OR `showEmails$` generate a new value,
// we'll get latest values from all of them:
map(([users, showNames, showEmails]) => {
if (showEmails && showNames) {
return users;
}
return users.map((user) => ({
...user,
name: showNames ? user.name : '',
email: showEmails ? user.email : ''
}));
})
);

If at least one of the observables passed to combineLatestWith() emits, youā€™ll get the last emitted values from every observable. But this convenience comes with 2 dangerous pitfalls:

āš ļø #1: It will not emit until every observable produced at least one value.
If you are subscribing to some hot observable, you have no guarantee that you will receive any values at the moment of subscribing.

Every observable passed to combineLatestWith(), should be double-checked ā€” you should be sure that at least one value will be emitted.

If you are not sure about that, use startWith() and pass there some default value that your code will expect and can handle. Otherwise, itā€™s too easy to create an ā€œeternally hangingā€ observable when combineLatestWith() is involved.

const usersList = users$.pipe(
combineLatestWith(
showNames$.pipe(startWith(true)), // šŸ‘ˆ
showEmails$.pipe(startWith(true)),
),
map(([users, showNames, showEmails]) => {
//
})
);

ļøļøāš ļø #2: If your code will trigger one of the observables (passed to combineLatestWith()) to emit a new value, youā€™ll create an infinite loop.

Example:

observable$.pipe(
combineWithLatest(toggle$),
map(([value, currentToggleValue]) => {
if (value && currentToggleValue) {
toggle$.next(true); // infinite loop
}
})
);

distinctUntilChanged()

A very handy operator, what could go wrong?

When you need to detect changes in strings, numbers, and other primitive types, everything is fine. But if you need to detect a change in an object, then this operator might work not as you expect ā€” if the object remains the same, and only some key or sub-key has changed ā€” this operator will not notify you about changes.

In such cases, you can set a ā€œcomparatorā€ argument, or use distinctUntilKeyChanged() operator.

forkJoin()

You mostly will use it to run multiple requests in parallel and get the responses when all of them are complete.

There are 3 dangerous things in this operator:

āš ļø #1: As said in the documentation: ā€œIf any given observable errors at some point, forkJoin will error as well and immediately unsubscribe from the other observablesā€.

In most cases, itā€™s not what you expect, but a workaround is quite easy: add catchError(() => of(someValue)) to every request:

let ob$ = forkJoin([
req1.pipe(catchError(() => of(someValue))),
req2.pipe(catchError(() => of(someValue))),
//...
]);

// or, if you have requests in an array

let ob$ = forkJoin(
requests.map(req => req.pipe(catchError(() => of(someValue))))
);

Although, do not return EMPTY, because of the next point:

āš ļø #2: As said in the documentation: ā€œā€¦whenever any of the given observables completes without emitting any value, forkJoin will complete at that moment as well and it will not emit anything either, even if it already has some last values from other observablesā€.

It happens not so often, but it still might happen (if some observable in the list returned EMPTY or actually didnā€™t complete). And in your app it might create quite a nasty bug ā€” it will look like your app doesnā€™t respond and nothing happens.

To avoid it, you can use defaultIfEmpty():

const ob$ = forkJoin(
requests.map(req => req.pipe(
catchError(() => of(someValue)),
defaultIfEmpty(someValue)
))
);

āš ļø #3: As said in the documentation: ā€œif there is an observable that never completes, forkJoin will never complete eitherā€.

It is pretty much possible for HTTP (XHR) requests to hang. In this case, an easy solution is timeout() operator.

For other (non-HTTP, hot) observables there is a chance that you are subscribing to watch an already emitted event. There is no common solution for such cases.

takeUntil()

The dangers of this operator are easy to avoid: it should be last in the sequence of the operators. With two exceptions: it should be before shareReplay({refCount: false}) and before aggregating operators (list).

It might sound a bit tedious, but you can just delegate control to the linter and forget about it.

More information you can find in this article.

Careless usage of switchMap()

If you read my previous article, you know the difference between switchMap(), concatMap(), mergeMap() and exhaustMap(). But, turns out, not everyone has read that article, and people carelessly use switchMap() in places where it might cause non-desired effects.

It doesnā€™t mean that switchMap() is bad or has some flaws ā€” itā€™s a super-useful and good operator, people just misuse it.

Read my previous article and use the correct operators ;)

Unbound Methods

People are lazy, so we can write code like this:

ob$.pipe(
catchError(this.errHandler)
);

It will work until this.errHandler() does not use this ā€” because this will be something unexpected.

Itā€™s pretty easy to avoid:

ob$.pipe(
catchError(() => this.errHandler())
);

Treasures šŸ’Ž

takeUntil()

This operator will let you avoid memory leaks in Angular Components ā€” just add it (last, as noted) to the list of operators before you subscribe() and the issue is solved! Itā€™s the most simple and most convenient solution.

There is a super-handy linter for that.

You do not need this operator if you subscribe using async pipe.

finalize()

When composing the sequence of operators, if you think ā€œafter all that ends we should do thisā€, then you might want to use finalize().

It is much more reliable than just adding some code to subscribe() or map(), tap(), catchError() ā€” this operator will be executed even if an observable completes without emitting any values. And in some cases, itā€™s pure gold.

ob$.pipe(
tap(() => this.patchState({loading: true})),
finalize(() => this.patchState({loading: false})),
exhaustMap(() => this.api.createNewSomething())
);

first()

Use this operator when you expect that an observable will emit only one value (like an HTTP request), but you canā€™t guarantee it ā€” usually, it happens in functions/methods/effects where you accept some observable as an argument.

Without this operator, very curious side effects and bugs might appear, if an observable will start emitting multiple values when your code is not ready for that.

throttleTime()

If you ever tried to reproduce this logic: ā€œaccept first emitted value, start some side effect, then ignore any consecutive values for N milliseconds, then if there was some new value emitted during this time ā€” emit it tooā€, then throttleTime() would save your day.

It works with similarities to debounceTime(), although its config argument has parameters leading and trailing, and it will help you to create more sophisticated behaviors.

Example for the logic, mentioned above:

input$.pipe(
throttleTime(250, undefined, {leading: true, traling: true}),
distinctUntilChanged(),
switchMap((input) => this.api.loadItems(value))
);

tapResponse()

And the last in this list will be a third-party operator (not from RxJS).
You can take it from ngrx/component-store library.

This operator will help you to donā€™t forget to handle error cases:

ob$.pipe(
tapResponse(
(result) => this.handle(result),
// the next argument is required, so you will not forget it
(err) => console?.error(err)
)
);

If you know any other dangers and treasures of RxJS ā€” share them in the comments!

--

--