Observable Operators: for Humans

48.Fourtyeighthours
5 min readMay 7, 2024

--

Intros to RxJS tend to carry a lot of anti-human complexity that cause many developers to shy away.

Let’s get this fixed now.

If you haven’t yet, please check out Observables: for Mortals first. This article assumes an understanding of Promises, Observables, Subjects.

Photo by Roxana Maria on Unsplash

Why do you need operators at all?

Plain-old JavaScript operators such as + - * / are designed to handle numbers and do that kind of well.

Similarly, array methods such as Array.map(), Array.reduce(), Array.sort()etc, have been created for arrays and handle those well (or at least, to some extent).

Even Promises have their own operators like await, utility methods or operator functions you call with .then() .catch(), Promise.all(), etc.

Similarly, Observable streams have their own operator functions.

Operators vs Operator Functions

We began talking about operators and operator functions. Just to be sure we’re all on the same page, let’s clarify the difference and use a few examples.

Simple operators take one or two operands.

// JavaScript Operators
!a
a +b
c ** 3

You’ll most likely have been using operator functions on Arrays and Promises.

// JavaScript operator functions
arr.map(x => x*2)
arr.sort((a, b) => a >= b)
p.then(res => fetch(result.url))

So nothing particularly scary here. Binary operators like “ / ” have their fixed behavior you can’t change. Operator functions are functions that perform the task of custom operators for which you define their behaviour.
With observable streams it’s the same.

Getting started with operators

map(mapFn)

The most simple yet important RxJS operator, actually the one you’ll use most often is map. It’s similar to Promise.then() in some ways, as both take a value in input and return an async value as output. The difference is that Promise.then() only runs once, whilst the observable map runs as many times as the source emits.

// Double the source with Promises
const sourcePromise = somePromise();
const targetPromise = sourcePromise.then(x => x *2);
// print the result
targetPromise.then(console.log);
// Double the source with Observables
const sourceStream = someObservable();
const targetStream = sourceStream.pipe(
map( x => x *2 )
);
// print the results
targetObservable.subscribe(console.log);

map(), just like all RxJS operators, can be chained multiple times in observable streams just like multiple .then().then().then() calls can be chained to promises.

const stream = source.pipe(
map( x => x *2 ),
map( x => x +1 ),
map( x => 100 -x ),
);

There are similarities and differences between Array.map() and Observable map.

Array.map(fn) applies the provided transformation function to each item of the source array and returns a new transformed array, synchronously.

observable.pipe(map(transformationFn)) applies the provided transformation function to each item emitted by the source observable, item by item, asynchronously.
Whenever the source emits, the transformation function is run and the result is re-emitted. This is the fundamental difference between the two, and more in general between sync array operators vs async observable operators.

filter(filterFn)

just like Array.filter() this takes input events and only re-emits them — asynchronously, item by item — if filterFn returns a truthy value.

reduce(reduceFn)

Very similar to Array.reduce(), just not as useful in practice: this performs a reduce operation on the source observable but waits for the whole stream to complete (which might actually take a long time, or never happen at all on some infinite streams) before emitting the result.
It’s listed here to let you pay attention to the difference and the next operator.

scan(reduceFn)

This is like a step-by-step reduce, a progressive equivalent of the reduce operator above, except this is actually more useful in practice and has a less intuitive name. It runs the same reduceFn and behaves the same as reduce, except it doesn’t wait for the source observable to complete, so it emits values as soon as they come in.
Progressive counters are a good use case for this operator.

const clicks = fromEvent(document, 'click');
// this emits the current count every time you click.
const counter = clicks.pipe(
scan(acc => acc+1, 0)
);
counter.subscribe(console.log); // 0, 1, 2, ...

merge(...otherStreams)

A quite trivial, yet powerful and useful operator is merge. It takes two or more source streams and combines them into one. Whenever any of them emits, the corresponding value is re-emitted. First-in, first-out.

import { merge } from 'rxjs';
const source1 = anObservable();
const source2 = anObservable();
const source3 = anObservable();
const output = merge(source1, source2, source3);output.subscribe(console.log);

take(n)

as the name suggests, this operator only takes the first n items emitted from the source, then completes.

const source = someObservable();
const just3 = source.pipe(
take(3)
);
just3.subscribe(someObserver);

takeUntil(stream2)

this operator takes items from the source observable until stream2, another observable, emits something.

import { fromEvent, interval } from 'rxjs';
const timer = interval(1000);
const click = fromEvent(document, 'click');
const stoppable = timer.pipe(
takeUntil(click)
);
// count up until you click
stoppable.subscribe(console.log);

switchMap( ()=>otherStream )

If you’ve studied electronics, this operator might remind you of the NPN transistor a little bit.

Every time the source observable emits a value, this will yield to a new otherStream. Values from the source stream are never forwarded, only otherStream.

import { fromEvent } from 'rxjs';
import { switchMap } from 'rxjs/operators';
const source = fromEvent(document, 'click')
const time = interval(1000); // emits every 1s
// Watch the syntax: stream2 needs to be a function returning a new stream!
const clickTracker = source.pipe(
switchMap(() => time)
);
clickTracker.subscribe(someObserver);

One of the top use cases for switchMap() is drag’n’drop, actually one of the most-often cited RxJS examples on many tutorials.

Using operators for web development

Observables and observable operators turn to be a great choice for web UI components: all DOM events can be treated as observable streams (especially with libraries like Rimmel, where this is natively supported), so all we have to do is to implement our view-models and stitch them to our templates.

import { interval } from 'rjxs';
import { rml } from 'rimmel';
const component = () => {
const stream = interval(1000);
return rml`
Once every second: <span>${stream}</span>
`;
};
document.body.innerHTML = component();

Online documentation

Now, if you feel comfortable with the basics, the next step will be the online documentation. Some of the most popular RxJS docs, including the official ones, need a considerable amount of effort to understand initially.

Lots of visual diagrams (called marble diagrams) may turn very useful if you’re a visual learner.

Playgrounds

If you’re just starting out, I can’t recommend enough https://reactivex.io/learnrx/. It is one of the most useful tutorials you need to help transform your mindset from imperative to functional-reactive in practice.

After that, if you’re doing UI, it’s best to start with a playground. Rimmel.js is undoubtedly the UI library with the best support for Observable streams.

You can play with it using several little examples on Codepen, or for more elaborate ones, there are many on Stackblitz, too.

Happy streaming!

--

--