Observables: for Humans

48.Fourtyeighthours
5 min readDec 4, 2023

--

Intros to RxJS and Observables tend to carry a lot of anti-human non-existent complexity that cause many developers to run away.

Let’s fix this now.

Photo by Alexander Krivitskiy on Unsplash

Warning: if you master Observables, streams, and RxJS, there is no way back. They will change your life and way of thinking forever. You’ll feel your skills are 100x up all of a sudden and you’ll blame the world for not showing you all this before.

Observable

So, in short: what is an Observable?

It’s just like a promise that can resolve more than once. A promise “resolves”, an observable “emits”.

A promise has a .then() method, an observable has a .subscribe() method. They work conceptually the same way, otherwise.

// Promises
aPromise.then(value => doSomething(value))

// Observables
anObservable.subscribe(value => doSomething(value))

So the only difference here is that .then() only ever runs once, and .subscribe() does every time the observable emits a new value.

Subject

A Subject is an Observable you can “pump” data into.

Remember promises? Since they implement the revealing constructor pattern, they can only resolve from within:

const p = new Promise((resolve, reject) => {
// can only call resolve from within
resolve(result);
})

So, Promises, before the recent advent of Promise.withResolvers(), lacked the ability to be resolved from the outside, which sometimes is undesirable. The way you got around this is by exposing its resolve and reject methods:

const ResolvablePromise = () => {
let resolve, reject

const p = new Promise((resolve2, reject2) => {
resolve = resolve2
reject = reject2
}

return {
then: p.then.bind(p),
catch: p.catch.bind(p),
finally: p.finally.bind(p),
// and here we add the final missing methods
resolve,
reject,
}
}

So using the above you could create a promise that can be resolved from the outside:

const p = new ResolvablePromise()

// and then ...
p.then(doSomething)

// and somewhere else:
p.resolve(value)

Great, so, Promises had this limitation, but Subjects do not. The equivalent of the above with Subjects is just a different method call. You call .next() instead of .resolve(). The difference, again, is that you can do this as many times as you want.

const stream = new Subject()

// then you can subscribe to it:
stream.subscribe(console.log)

// then, when you feel ready:
stream.next('new-value')

// and again:
stream.next('another-value')
Photo by Harry Quan on Unsplash

BehaviorSubject

Names are horrendous, sometimes scary, but the concept is still trivial: a BehaviorSubject is a Subject you give an initial value and exposes a .value property.

const stream = new BehaviorSubject(initial)

console.log(`current value: ${stream.value}`)

stream.subscribe(newValue => doSomething(newValue))

Yes, you got it right: Subjects don’t have an initial value, they stay in a “limbo” until someone calls .next()on them, which is perfectly fine in some cases. Same as promises that don’t have a .value exposed anywhere.

If you’re ok with the above, then you’ve learnt the fundamentals. Don’t forget them, as the Observable is becoming a JavaScript standard. In today’s world, where everything is async, it will play a major role, even more than Promises, ’cause they can only resolve once, but users will not just stop clicking, will they?

Creating Observables

Creating a new Observable from scratch is quite similar to creating new Promises:

import { Observable } from 'rxjs';

const p = new Promise<string>(resolve => {
setTimeout(()=>resolve('hello world'), 1000);
});

const o = new Observable<string>(observer => {
setInterval(() => observer.next('hello world'), 1000);
});

// except for one thing: observables are
// lazily evaluated, so they don't
// actually do anything until
// there is an observer watching, so we do:
o.subscribe(data => console.log(data));

Unlike Promises, you rarely create raw Observables from scratch, though.
99% of the time you just use them as standard event sources, such as mouse clicks, mouse movements, window resizes, websocket data, and there are dedicated operators you use to create them, instead.

A simple way to create an observable stream from mouse clicks and print it to the console looks as follows:

import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';

const mouseClicks = fromEvent(document, 'click');

const mouseCoords = mouseClicks.pipe(
map(event => `mousemove: ${event.clientX},${event.clientY}`)
);

mouseCoords.subscribe(console.log);
// Please note, in this case, mouseCoords is subscribed to mouseClicks
// and console.log is subscribed to mouseCoords. This way the whole chain
// has a subscription path, and can work.

Even better, when creating components for your web applications, some templating libraries like Rimmel.js create these streams for you, as they expose ordinary HTML events directly as Observables you can subscribe to.

import { rml } from 'rimmel';
import { map } from 'rxjs/operators';

const component = () => {
const mousemove = new Subject();

const mouseCoords = mousemove.pipe(
map(event => `mousemove: ${event.clientX},${event.clientY}`)
);

return rml`
<div onmousemove="${mousemove}"></div>

<div>Mouse coords: <span>${mouseCoords}</span></div>
`;
};
Photo by Rirri on Unsplash

Completion

When a Promise resolves, it’s resolved, so you don’t expect anything else to happen. With an Observable, data may come through indefinitely.

Sometimes it may be desirable to notify observers that the stream is finished and no more data will be emitted. This is is known as completion, and is important because some operators will wait for it before taking any action.

import { Observable } from 'rxjs';

const stream = new Observable(observer => {
setTimeout(() => observer.next('hello'), 1000);
setTimeout(() => observer.next('world'), 2000);

// here we declare the stream as terminated.
setTimeout(() => observer.complete(), 5000);
});

In web development you’re not going to create many observables this way, so this example is here just to give an understanding of how it works behind the scenes.

Error handling

Just like Promises that can throw and .catch(), Observables have an error handling mechanism, too.

Great error handling is never trivial, even with simple Promises, but at a basic level, this is how it starts.

const stream = new Observable(observer => {
throw new Error('failed');
});

const observer = stream.subscribe(
onNext: (data) => console.log('new data', data),
onError: (error) => console.error('caught', error),
onComplete: () => console.log('complete')
);

If you’re ready for the really juicy stuff, read more about Operators in the next article.

Meantime… what’s your thoughts? Do Observables make sense to you now?

--

--