Intro to Reactive Extensions with RxJS

Anthony Mai
Digital Products Tech Tales
7 min readAug 18, 2023

RxJS is a reactive extensions library that allows for asynchronous/event based programs by utilizing observable streams that uses the observer pattern (object/subject that maintains a list of dependent observers, and notifies them automatically of state changes using callbacks) and provides an Observable type that allows for async handling of events as a collection.

Observables, Subscriptions (Streams)

Each observable contains satellite types:

  • Observer (collection of methods that listens to the Observable values emitted)
  • Subscription (represents connecting to the observable, and can be used to cancel the execution of an observable),
  • Operators (built in pure functions that provide ways of dealing with collections: map, filter, reduce, every, etc),
  • Subjects (allows for casting to multiple Observers), and Schedulers (handles when subscriptions starts and notifications are delivered).

The observable stream acts like an event emitter and delivers three events: next, error, and complete which can be consumed by an observer (an object that contains 3 callbacks for each of the notification from an Observable). To view the values pushed by an Observable you must subscribe to it (you can also pass in the observer object or the next callback directly to the subscription).

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});

console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
},
});
console.log('just after subscribe');

/*
When subscribing to an observer you can also just pass the next()
callback directly as an argument:
*/

observable.subscribe(x => console.log('Observer got a next value: ' + x));

The subscription refers to an Observable execution when calling .subscribe() on the observer, and can be cancelled with .unsubscribe() to ensure that you are cleanly executing/disposing the Observable resource.

import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();

The RxJS library offers a variety of handy functions like the fromEventPattern that creates an Observable to register event handlers:

import { fromEventPattern } from 'rxjs';

function addClickHandler(handler) {
document.addEventListener('click', handler);
}

function removeClickHandler(handler) {
document.removeEventListener('click', handler);
}

const clicks = fromEventPattern(
addClickHandler,
removeClickHandler
);
clicks.subscribe(x => console.log(x));

// Whenever you click anywhere in the browser, DOM MouseEvent
// object will be logged.

Operators & Flow Control

Another powerful function of RxJS is the flow control operators offered. The pipe() function can be called on an observable instance and provides Pipeable Operators that can be called on the observable instance and returns a new instance based on the original observable.

Pipeable Operators include functions (many of which shares the same functionality of Array methods) like map(), filter(), mergeMap() that can offer the ability to control the flow of events through observables and create a new Observable.

A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified.

Marble Diagram for map() operator
import { of, map } from 'rxjs';

const source$ = of(1, 2, 3);

const mapped = source$.pipe(map(x => 10 * x));

/*
Logs:
10
20
30
*/


mapped.subscribe(x => console.log(x));
Marble Diagram for filter() operator
import { of, filter } from 'rxjs';

const source$ = of(0, 1, 2, 3, 4);

const filtered = source$.pipe(filter(x => x % 2 === 1));

/*
Logs:
1
3
*/

filtered.subscribe(x => console.log(x));

Flow control operators give you granularity on how events are handled/emitted, and allows you to transform the values being passed through the observable in a safe way that preserves the original Observable.

Some of the most commonly used flow control (filter) operators are take(), takeUntil(), takeWhile(), takeLast(), etc that allow you to control how (or when) events are grabbed from the Observable.

take() takes one parameter — the count (number) and will only emit the first count values emitted by the Observable.

Marble Diagram for take() operator
import { interval, take } from 'rxjs';

const intervalCount = interval(1000);
const takeFive = intervalCount.pipe(take(5));
takeFive.subscribe(x => console.log(x));

// Logs:
// 0
// 1
// 2
// 3
// 4

takeUntil() lets values pass until a second Observable, notifier, emits a value. Then, it completes.

Marble Diagram for takeUntil() operator
import { interval, fromEvent, takeUntil } from 'rxjs';

const source = interval(1000);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe(x => console.log(x));

takeWhile() will emit values until provided expression returns false

Marble Diagram for takeWhile() operator
// RxJS v6+
import { of } from 'rxjs';
import { takeWhile } from 'rxjs/operators';

//emit 1,2,3,4,5
const source$ = of(1, 3, 6, 4, 7, 2);

//allow values until value from source is greater than 4, then complete
source$
.pipe(takeWhile(x => x < 5))
// log: 1,3
.subscribe(x => console.log(x));

takeLast() will emit the last n emitted values before completion

Marble Diagram for takeLast() operator
// RxJS v6+
import { of } from 'rxjs';
import { takeLast } from 'rxjs/operators';

const source = of('Ignore', 'Ignore', 'Hello', 'World!');
// take the last 2 emitted values
const example = source.pipe(takeLast(2));
// Hello, World!
const subscribe = example.subscribe(val => console.log(val));

Some additional flow control operators that are very handy are distinct() or distinctUntilChanged() which allows you to control when the Observable will emit — either only distinct values, or if it is distinct compared to the previous emitted value respectively.

distinct() returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.

Marble Diagram for distinct() operator
import { of, distinct } from 'rxjs';

of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)
.pipe(distinct())
.subscribe(x => console.log(x));
// Outputs
// 1
// 2
// 3
// 4

distinctUntilChanged() returns a result Observable that emits all values pushed by the source observable if they are distinct in comparison to the last value the result observable emitted.

Marble Diagram for distinctUntilChanged() operator
import { of, distinctUntilChanged } from 'rxjs';

of(1, 1, 1, 2, 2, 2, 1, 1, 3, 3)
.pipe(distinctUntilChanged())
.subscribe(console.log);
// Logs: 1, 2, 1, 3

RxJS also provides a way to create new observables with Creation Operators that are used as a standalone function to create Observables with predefined behaviors, or join other Observables.

of() creates an observable sequence based on the arguments it receives — each argument becomes a next notification in sequence

Marble Diagram for of() operator
import { of } from 'rxjs';

of(10, 20, 30)
.subscribe({
next: value => console.log('next:', value),
error: err => console.log('error:', err),
complete: () => console.log('the end'),
});

// Outputs
// next: 10
// next: 20
// next: 30
// the end

Similar to of(), from() also creates an observable from objects, promises, arrays, iterable objects or any Observable-like object.

Marble Diagram for from() operator
import { from } from 'rxjs';

const array = [10, 20, 30];
const result = from(array);

result.subscribe(x => console.log(x));

// Logs:
// 10
// 20
// 30

Getting Started

To get started using RxJS install via npm:

npm install rxjs

If using NPM, importing RxJS functions (operators) and other modules can be done individually or as an entire set:

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3).pipe(map((x) => x + '!!!')); // etc
import * as rxjs from 'rxjs';
import * as operators from 'rxjs';

rxjs.of(1, 2, 3).pipe(operators.map((x) => x + '!!!')); // etc;

The RxJS website also contains great reference(s) like the API list and glossary of terms, in addition to a operator decision tree tool that helps you identify what operator you would need depending on the behavior you wish to achieve:

Operator Decision Tree

RxJS Examples

RxMarbles Diagrams

--

--