RxJS recipes: ‘forkJoin’ with the progress of completion for bulk network requests in Angular

Adding customization to existing RxJS functions.

Alexander Poshtaruk
Angular In Depth
6 min readJun 10, 2019

--

Loadbar - Pixelbay

AngularInDepth is moving away from Medium. This article, its updates and more recent articles are hosted on the new platform inDepth.dev

Introduction

As it often happens to me — this post is just a prolongation of my StackOverflow activity:-).

Question:

Is there a way to gather a list of observables together to get cumulative result like forkJoin does, but get some kind of a progress while they finishes?

A nice task for creating custom RxJS function. Let's implement it!

What is forkJoin?

Let's take a look at official documentation:

forkJoin(…sources: any[]): Observable<any>

Accepts an Array of ObservableInput or a dictionary Object of ObservableInput and returns an Observable that emits either an array of values in the exact same order as the passed array, or a dictionary of values in the same shape as the passed dictionary.

In other words, forkJoin for Observables works in the same way as Promise.all works for Promises.

Here is a marble diagram:

The possible use-case for it is running many parallel networks requests — for example, fetching some usersList details (if API supports only per user fetch).

Here is an example:

snippet link

Here is a codepen link to play with.

It works nice but what if we need to know some intermediate information like how many requests are resolved already? With current forkJoin implementation, it is impossible but we can create our own version of it 🙃.

OK, so brief agenda how this forkJoinWithPercent should work:

Output:

  1. It returns higher-order(HO) Observable ( ) that emits an array of two Observables: [finalResult$, percent$].

2. percent$ — emits percentage of completion (number);

3. finalResult$ — emits a final array of values at the end (or error if some argument observable emits error value).

Preliminarily usage looks like this:

snippet link

Implementation details:

  1. Our forkJointWithProgress function accepts an array of Observables and should return higher-order Observable.
  2. This returned Observable should emit value [finalResult$, percent$] (we will use of([finalResult$, percent$]) for that)
  3. Side effects will be added to each item in an array of argument Observables to calculate the percentage of completion and emit this value with special percent$ (we will use finalize operator for percentage calculation and use Subject as percent$).
  4. finalResult$ provides standard RxJS forkJoin result.
  5. Result Observable should work independently for each subscriber (wrap functionality with RxJS defer function to provide a clean run for each subscriber — you can read more about this case here).
  6. If some of the argument Observables emits error — it will be propagated to finalResult$ subscribers.

Packtpub.com and I prepared a whole RxJS course with many other details of how you can solve your every-day developer’s tasks with this amazing library. It can be interesting for beginners but also contains advanced topics. Take a look!

Implementation

#1–2 We accept an array of Observables and should return higher-order Observable with [finalResult$, percent$].

Ok, just a footprint of our future function:

snippet link

Given we got arrayOfObservables (our ajax observables), so let’s iterate over and add requested logic:

#3 Adding side effects to argument Observables and calculating percentage.

Here is what we’ve got:

snippet link

We will add more functionality to argument observables in array. We iterate over an array and apply finalize operator for each observable.

finalize operator waits till specific observable completes, then calculate the percentage of completed observables and emit value with percent$ Subject.

#4 Let's call forkJoin for getting a final result value

snipper link

Ok, let's go through this code:

  • forkJoin gets an array of argument observables, subscribes to all of them and waits till they are complete.
  • Once a result is ready — tap operator emits last percentage value (100%) and completes percentage$ Subject (to prevent memory leaks).
  • Final results will be emitted to subscribers.

#5 Wrapping all this function in ‘defer’

RxJS defer function provides new Observable instance (a result of its callback function) for each subscriber. This means that each of our subscribers will get clean run (and counter = 0).

snippet link

Lets review how it works:

  1. We get an array of observables (line1).
  2. Wrap result with RxJS defer to provide a clean run for each subscriber (line 3)
  3. Create counter to calculate the percentage of completion and instantiate percent$ Subject to emit percentage value (lines 5–6).
  4. We create a new array by iterating over the original array and adding new functionality with finalize - calculating percentage value and emitting it if some of the observables completes. (lines 8–15)
  5. Call Rx forkJoin and apply tap operator to be able to send percentage 100 when forkJoin gets a final result. Assign result to finalResult$ variable (lines 17–22).
  6. Return higher-order observable that will emit [finalResult$, percent$].

To wrap up

Here is how it works in a codepen:

forkJoinWithPercent

What about using it in Angular?

This function is published as an npm package as well — rxjs-toolbox so you can use it in your projects.

And here is a Stackblitz Angular demo that uses that package to feed percentage value to load bar:

In Angular

Additional reading.

You can read more about RxJS operators uses cases here:

  1. “Retry vs Repeat”
  2. “RxJS: Managing Operator State” by Nicholas Jamieson.
  3. “RxJS ‘repeat’ operator — beginner necromancer guide”
  4. “Throttling notifications from multiple users with RxJS”
  5. rxjs-multi-scan — neat custom RxJS scan implementation from Lars Gyrup Brink Nielsen.
  6. rxjs-toolkit — RxJS Everyday Custom Operators by Jason Awbrey.
  7. backoff-rxjs — A collection of helpful RxJS operators to deal with backoff strategies by Alex Okrushko.
  8. TimeRange — RxJS custom function that emits a set of values in specified timeouts

Like this article? Tweet about it and follow me on Twitter!

Starting from section 4 of my RxJS video course advances staff is reviewed — so if you familiar with RxJS already — you can find something useful for you as well: higher-order observables, anti-patterns, schedulers, unit testing, etc! Give it a try!

*Special thanks to Lars Gyrup Brink Nielsen, Nicholas Jamieson, Tim Deschryver and Michael Karén for reviewing this post and making many valuable remarks to make it better!

--

--

Alexander Poshtaruk
Angular In Depth

Senior Front-End dev in Tonic, 'Hands-on RxJS for web development' video-course author — https://bit.ly/2AzDgQC, codementor.io JS, Angular and RxJS mentor