Rxjs Operators — Throttle and Join

John Au-Yeung
Feb 27 · 4 min read
Photo by Nathan Anderson on Unsplash

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at the throttle and join operators, including the throttle , throttleTime , combineLatest and concatAll .

Filtering Operators

Throttle

The throttle operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable, then repeats the process.

It takes up to 2 arguments. The first is the durationSelector , which is a function that takes a value from the source Observable then returns an Observable or Promise that computes the throttling duration for each source value.

The second argument is an optional config object to define the leading and trailing behavior. Default value is defaultThrottleConfig .

It returns an Observable that performs the throttle operation from the source.

For example, we can use it as follows:

import { interval } from "rxjs";
import { throttle } from "rxjs/operators";
const interval$ = interval(1000);
const result = interval$.pipe(throttle(ev => interval(5000)));
result.subscribe(x => console.log(x));

The code above has an interval$ Observable that emits a number every second. The values from it are pipe d into the throttle operator, which takes a function that returns the interval(5000) Observable, which emits a number every 5 seconds.

Therefore, the throttle operator will emit values from interval$ every 5 seconds since our throttle callback function returned interval(5000) .

Then we should get every 5th number logged in the console.log .

throttleTime

throttleTime emits a value from the source Observable then ignores subsequently emitted values for duration milliseconds then repeats the process.

It takes up to 3 arguments. The first is the duration , which is the time to wait before emitting another value after emitting the last value. It’s measured in milliseconds or the time unit of the optional scheduler .

The second argument is the optionalscheduler , which defaults to async . It’s used for setting the timing of the emission.

The last argument is the config , which is optional. It defaults to defaultThrottleConfig . We can pass in an object to define the leading and trailing behavior. Default value is{ leading: true, trailing: false }.

For example, we can use it as follows:

import { interval } from "rxjs";
import { throttleTime } from "rxjs/operators";
const interval$ = interval(1000);
const result = interval$.pipe(throttleTime(5000));
result.subscribe(x => console.log(x));

The above works like our previous throttle example, except that we change throttle(ev => interval(5000)) to throttleTime(5000) , which do the same thing.

We emit numbers from $interval every 5 seconds.

Then we get the same numbers logged as the example above.

Photo by Marius Masalar on Unsplash

Join Operators

combineAll

The combineAll operator flattens Observable of Observables by applying combineLatest when they complete.

It takes one optional argument, which is a project function to map each value emitted to something we want.

Once the outer Observable completes then it subscribes to all collected Observables and combines their values as follows:

  • Every time an inner Observable emits, the output Observable emits
  • When the returned Observable emits and a project function is specified then project is called as the values arrive and it’ll manipulate each value with the project function
  • If there’s no project function then the most recent values is emitted by the returned Observable

For example, we can use it as follows:

import { of } from "rxjs";
import { map, combineAll } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));
const result = higherOrderObservable.pipe(combineAll());
result.subscribe(x => console.log(x));

The code above maps the of$ Observable to child Observables. It returns of(“a”, “b”, “c”) for each value emitted by of$ .

Then we use combineAll to combine the latest values from each of the child Observables in higherOrderObservable into an array. Then each of these arrays is emitted by the result Observable.

Then we get:

["c", "c", "a"]
["c", "c", "b"]
["c", "c", "c"]

as the console.log output. The first 2 Observables completed before the last one, so we get 'c' from them. Then the as the third one emits, the last value in the array is added, so we get 'a' , 'b' and 'c' respectively from them.

concatAll

The concatAll operator converts higher-order Observables by concatenating the inner Observables in order,

It takes no parameters and returns an Observable that emits the values from all inner Observables concatenated.

For example, we can use it as follows:

import { of } from "rxjs";
import { map, concatAll } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));
const result = higherOrderObservable.pipe(concatAll());
result.subscribe(x => console.log(x));

The of$ Observable’s emitted values is pipe d and each value from of$ is mapped to the of(“a”, “b”, “c”) Observable.

Then we pipe the results from the inner Observables in higherOrderObservable which are 3 of(“a”, “b”, “c”) Observables mapped from of$ have their emitted values combined by concatAll .

In the end, we get:

a
b
c
a
b
c
a
b
c

The throttle operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable and repeats.

throttleTime emits a value from the source Observable then ignores subsequently emitted values for a given amount of time and repeats.

combineAll flattens Observable of Observables by combining the latest values when they complete.

concatAll operator combines the emitted values of inner Observables in order.

JavaScript in Plain English

Learn the web's most important programming language.

John Au-Yeung

Written by

Web developer. Subscribe to my email list now at http://jauyeung.net/subscribe/ . Follow me on Twitter at https://twitter.com/AuMayeung

JavaScript in Plain English

Learn the web's most important programming language.

More From Medium

More from JavaScript in Plain English

More from JavaScript in Plain English

32 funny Code Comments that people actually wrote

10.2K

More from JavaScript in Plain English

More from JavaScript in Plain English

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade