Intro to Reactive Extensions with RxJS
RxJS is a reactive extensions library that allows for asynchronous/event based programs by utilizing observable streams that uses the observer pattern (object/subject that maintains a list of dependent observers, and notifies them automatically of state changes using callbacks) and provides an Observable type that allows for async handling of events as a collection.
Observables, Subscriptions (Streams)
Each observable contains satellite types:
- Observer (collection of methods that listens to the Observable values emitted)
- Subscription (represents connecting to the observable, and can be used to cancel the execution of an observable),
- Operators (built in pure functions that provide ways of dealing with collections: map, filter, reduce, every, etc),
- Subjects (allows for casting to multiple Observers), and Schedulers (handles when subscriptions starts and notifications are delivered).
The observable stream acts like an event emitter and delivers three events: next, error, and complete which can be consumed by an observer (an object that contains 3 callbacks for each of the notification from an Observable). To view the values pushed by an Observable you must subscribe to it (you can also pass in the observer object or the next callback directly to the subscription).
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
},
});
console.log('just after subscribe');
/*
When subscribing to an observer you can also just pass the next()
callback directly as an argument:
*/
observable.subscribe(x => console.log('Observer got a next value: ' + x));
The subscription refers to an Observable execution when calling .subscribe() on the observer, and can be cancelled with .unsubscribe() to ensure that you are cleanly executing/disposing the Observable resource.
import { interval } from 'rxjs';
const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();
The RxJS library offers a variety of handy functions like the fromEventPattern that creates an Observable to register event handlers:
import { fromEventPattern } from 'rxjs';
function addClickHandler(handler) {
document.addEventListener('click', handler);
}
function removeClickHandler(handler) {
document.removeEventListener('click', handler);
}
const clicks = fromEventPattern(
addClickHandler,
removeClickHandler
);
clicks.subscribe(x => console.log(x));
// Whenever you click anywhere in the browser, DOM MouseEvent
// object will be logged.
Operators & Flow Control
Another powerful function of RxJS is the flow control operators offered. The pipe() function can be called on an observable instance and provides Pipeable Operators that can be called on the observable instance and returns a new instance based on the original observable.
Pipeable Operators include functions (many of which shares the same functionality of Array methods) like map(), filter(), mergeMap() that can offer the ability to control the flow of events through observables and create a new Observable.
A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified.
import { of, map } from 'rxjs';
const source$ = of(1, 2, 3);
const mapped = source$.pipe(map(x => 10 * x));
/*
Logs:
10
20
30
*/
mapped.subscribe(x => console.log(x));
import { of, filter } from 'rxjs';
const source$ = of(0, 1, 2, 3, 4);
const filtered = source$.pipe(filter(x => x % 2 === 1));
/*
Logs:
1
3
*/
filtered.subscribe(x => console.log(x));
Flow control operators give you granularity on how events are handled/emitted, and allows you to transform the values being passed through the observable in a safe way that preserves the original Observable.
Some of the most commonly used flow control (filter) operators are take(), takeUntil(), takeWhile(), takeLast(), etc that allow you to control how (or when) events are grabbed from the Observable.
take() takes one parameter — the
count
(number) and will only emit the firstcount
values emitted by the Observable.
import { interval, take } from 'rxjs';
const intervalCount = interval(1000);
const takeFive = intervalCount.pipe(take(5));
takeFive.subscribe(x => console.log(x));
// Logs:
// 0
// 1
// 2
// 3
// 4
takeUntil() lets values pass until a second Observable,
notifier
, emits a value. Then, it completes.
import { interval, fromEvent, takeUntil } from 'rxjs';
const source = interval(1000);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe(x => console.log(x));
takeWhile() will emit values until provided expression returns false
// RxJS v6+
import { of } from 'rxjs';
import { takeWhile } from 'rxjs/operators';
//emit 1,2,3,4,5
const source$ = of(1, 3, 6, 4, 7, 2);
//allow values until value from source is greater than 4, then complete
source$
.pipe(takeWhile(x => x < 5))
// log: 1,3
.subscribe(x => console.log(x));
takeLast() will emit the last n emitted values before completion
// RxJS v6+
import { of } from 'rxjs';
import { takeLast } from 'rxjs/operators';
const source = of('Ignore', 'Ignore', 'Hello', 'World!');
// take the last 2 emitted values
const example = source.pipe(takeLast(2));
// Hello, World!
const subscribe = example.subscribe(val => console.log(val));
Some additional flow control operators that are very handy are distinct() or distinctUntilChanged() which allows you to control when the Observable will emit — either only distinct values, or if it is distinct compared to the previous emitted value respectively.
distinct() returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
import { of, distinct } from 'rxjs';
of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)
.pipe(distinct())
.subscribe(x => console.log(x));
// Outputs
// 1
// 2
// 3
// 4
distinctUntilChanged() returns a result
Observable
that emits all values pushed by the source observable if they are distinct in comparison to the last value the result observable emitted.
import { of, distinctUntilChanged } from 'rxjs';
of(1, 1, 1, 2, 2, 2, 1, 1, 3, 3)
.pipe(distinctUntilChanged())
.subscribe(console.log);
// Logs: 1, 2, 1, 3
RxJS also provides a way to create new observables with Creation Operators that are used as a standalone function to create Observables with predefined behaviors, or join other Observables.
of() creates an observable sequence based on the arguments it receives — each argument becomes a next notification in sequence
import { of } from 'rxjs';
of(10, 20, 30)
.subscribe({
next: value => console.log('next:', value),
error: err => console.log('error:', err),
complete: () => console.log('the end'),
});
// Outputs
// next: 10
// next: 20
// next: 30
// the end
Similar to of(), from() also creates an observable from objects, promises, arrays, iterable objects or any Observable-like object.
import { from } from 'rxjs';
const array = [10, 20, 30];
const result = from(array);
result.subscribe(x => console.log(x));
// Logs:
// 10
// 20
// 30
Getting Started
To get started using RxJS install via npm:
npm install rxjs
If using NPM, importing RxJS functions (operators) and other modules can be done individually or as an entire set:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(map((x) => x + '!!!')); // etc
import * as rxjs from 'rxjs';
import * as operators from 'rxjs';
rxjs.of(1, 2, 3).pipe(operators.map((x) => x + '!!!')); // etc;
The RxJS website also contains great reference(s) like the API list and glossary of terms, in addition to a operator decision tree tool that helps you identify what operator you would need depending on the behavior you wish to achieve: