RxJS recipes: ‘forkJoin’ with the progress of completion for bulk network requests in Angular
Adding customization to existing RxJS functions.
As it often happens to me — this post is just a prolongation of my StackOverflow activity:-).
Is there a way to gather a list of observables together to get cumulative result like
forkJoindoes, but get some kind of a progress while they finishes?
A nice task for creating custom RxJS function. Let's implement it!
What is forkJoin?
Let's take a look at official documentation:
forkJoin(…sources: any): Observable<any>
ObservableInputor a dictionary
ObservableInputand returns an
Observablethat emits either an array of values in the exact same order as the passed array, or a dictionary of values in the same shape as the passed dictionary.
In other words, forkJoin for Observables works in the same way as Promise.all works for Promises.
Here is a marble diagram:
The possible use-case for it is running many parallel networks requests — for example, fetching some usersList details (if API supports only per user fetch).
Here is an example:
Here is a codepen link to play with.
It works nice but what if we need to know some intermediate information like how many requests are resolved already? With current forkJoin implementation, it is impossible but we can create our own version of it 🙃.
OK, so brief agenda how this forkJoinWithPercent should work:
- It returns higher-order(HO) Observable ( ) that emits an array of two Observables: [finalResult$, percent$].
*Higher-order (HO) Observable - observable that emits other observables, so data flow should be handled with special flattening operators like mergeMap, switchMap, concatMap, etc... you can read more about it here and here.
2. percent$ — emits percentage of completion (number);
3. finalResult$ — emits a final array of values at the end (or error if some argument observable emits error value).
Preliminarily usage looks like this:
- Our forkJointWithProgress function accepts an array of Observables and should return higher-order Observable.
- This returned Observable should emit value [finalResult$, percent$] (we will use of([finalResult$, percent$]) for that)
- Side effects will be added to each item in an array of argument Observables to calculate the percentage of completion and emit this value with special percent$ (we will use finalize operator for percentage calculation and use Subject as percent$).
- finalResult$ provides standard RxJS forkJoin result.
- Result Observable should work independently for each subscriber (wrap functionality with RxJS defer function to provide a clean run for each subscriber — you can read more about this case here).
- If some of the argument Observables emits error — it will be propagated to finalResult$ subscribers.
#1–2 We accept an array of Observables and should return higher-order Observable with [finalResult$, percent$].
Ok, just a footprint of our future function:
Given we got arrayOfObservables (our ajax observables), so let’s iterate over and add requested logic:
#3 Adding side effects to argument Observables and calculating percentage.
Here is what we’ve got:
We will add more functionality to argument observables in array. We iterate over an array and apply finalize operator for each observable.
finalize operator waits till specific observable completes, then calculate the percentage of completed observables and emit value with percent$ Subject.
#4 Let's call forkJoin for getting a final result value
Ok, let's go through this code:
- forkJoin gets an array of argument observables, subscribes to all of them and waits till they are complete.
- Once a result is ready — tap operator emits last percentage value (100%) and completes percentage$ Subject (to prevent memory leaks).
- Final results will be emitted to subscribers.
#5 Wrapping all this function in ‘defer’
RxJS defer function provides new Observable instance (a result of its callback function) for each subscriber. This means that each of our subscribers will get clean run (and counter = 0).
Lets review how it works:
- We get an array of observables (line1).
- Wrap result with RxJS defer to provide a clean run for each subscriber (line 3)
- Create counter to calculate the percentage of completion and instantiate percent$ Subject to emit percentage value (lines 5–6).
- We create a new array by iterating over the original array and adding new functionality with finalize - calculating percentage value and emitting it if some of the observables completes. (lines 8–15)
- Call Rx forkJoin and apply tap operator to be able to send percentage 100 when forkJoin gets a final result. Assign result to finalResult$ variable (lines 17–22).
- Return higher-order observable that will emit [finalResult$, percent$].
To wrap up
Here is how it works in a codepen:
You can read more about RxJS operators uses cases here:
- “Retry vs Repeat”
- “RxJS: Managing Operator State” by Nicholas Jamieson.
- “RxJS ‘repeat’ operator — beginner necromancer guide”
- “Throttling notifications from multiple users with RxJS”
- rxjs-multi-scan — neat custom RxJS scan implementation from Lars Gyrup Brink Nielsen.
- rxjs-toolkit — RxJS Everyday Custom Operators by Jason Awbrey.
- backoff-rxjs — A collection of helpful RxJS operators to deal with backoff strategies by Alex Okrushko.
- TimeRange — RxJS custom function that emits a set of values in specified timeouts
Starting from section 4 of my RxJS video course advances staff is reviewed — so if you familiar with RxJS already — you can find something useful for you as well: higher-order observables, anti-patterns, schedulers, unit testing, etc! Give it a try!