Getting to know Observables in JavaScript

Introduction

In this article I will assume that you have a basic understanding of how asynchronous operations are handled in JavaScript and that you know what Promises are and how to use them.

But don’t worry. If you feel like you need to wrap your head around the concepts of asynchronous operations in JavaScript I wrote an article for CodeBuddies a while back trying my best to explain that on a more basic level. The article can be found here: Getting to know asynchronous JavaScript: Callbacks, Promises and Async/Await.

What is an Observable?

An Observable is a representation of any set of values over any amount of time. Let me try to explain that by comparing Observables to Promises:

A Promise is an object that wraps an asynchronous operation and notifies when it’s done.

An Observable is an object that can wrap several asynchronous and synchronous operations and can notify more than once when those operations are done and its results are available.

A Promise represents a single future resolution (value or error) of an operation while an Observable represents zero, one or many future values that can appear at any time.

Disclaimer

At this moment there is no native implementation of Observables in JavaScript. There are a couple of libraries out there that has implemented Observables but my personal preference is RxJS because it’s a powerful and great toolbox for handling most basic and complex cases.

There is also at the time of writing a TC39 proposal at stage 1 to implement Observables in JavaScript.

All examples below are assuming that we use the RxJS implementation of Observables which the TC39 proposal also is based upon.

Comparing Observables to Promises

The comparison between Observables and Promises is probably the best starting point because promises are already widely familiar to the JavaScript community. The comparison comes from that both observables and promises solves similar problems; handling and coordinating asynchronous operations.

With that said, observables are not the next evolution of promises. It should not be a complete replacement of promises.

Promise vs. Observable by example

Here is an example on using a Promise in JavaScript:

someAsyncOperationThatReturnsAPromise()
.then(function(result){
// Do something with the result
})
.catch(function(error){
// Handle error
});

And here is an example on using an Observable:

someAsyncOperationThatReturnsAnObservable()
.subscribe(
(value) => { // Do something with each emitted value },
(error) => { // Handle error },
() => { // Called once the Observable is done emitting values}
);

Once a Promise has resolved and provided a result it can no longer be used. That is not the case with Observables. Each value emitted from an Observable will end up in the callback function that is the first argument of the .subscribe(..) call.

Creating Promises vs. creating Observables by example

Here is an example of how to create a new Promise:

function getAsyncData(someValue){
return new Promise(function(resolve, reject){
getData(someValue, function(error, result){
if(error){
reject(error);
}
else{
resolve(result);
}
})
});
}

And here is an example on how to create a new Observable:

function getAsyncDataObservable(someValue){
return new Observable(function(observer){
getData(someValue, function(error, result){
if(error){
observer.onError(error);
}
else{
observer.onNext(result);
}
})
});
}

A Promise can result in one of two outcomes: resolve or reject. It can do so once and only once.

An Observable on the other hand can call its observer.onNext() multiple times. This will result in the function passed as the first argument to .subscribe(..) being called once for each time observer.onNext() is called within that Observable.

To make this even more clearer take a look at the following example:

function getAsyncDataObservable(someValue, someOtherValue){
return new Observable(function(observer){
getData(someValue, function(error, result){
if(error){
observer.onError(error);
}
else{
observer.onNext(result);
}
});
getSomeOtherData(someOtherValue, function(error, result){
if(error){
observer.onError(error);
}
else{
observer.onNext(result);
}
});
});
}

Here we are doing two asynchronous operations in parallell. Given that no errors occurs, the first one that finishes will call observer.onNext first and then the second one will call observer.onNext when it finishes. This will result in the function passed as the first argument to .subscribe(..) being called twice; once for each of the outcomes of getSomeOtherData and getData respectively in any order depending on what operation finishes first.

A note on error handling

If an error occurs, that is when observer.onError is called, the Observable is interrupted and stops emitting values. Take a look at the example above again. If the first operation to finish has an error, resulting in observer.onError being called, the Observable will not emit the result when the second operation finishes. In fact it will not emit any further values at all.

A note on subscriptions

An Observable is just a function that takes an observer object. That function will not be called until the Observable is subscribed to. An Observable is subscribed to when you call .subscribe(..) on it.

A Promise on the other hand will run its asynchronous operation at the moment it is created.

The key difference here is that Promises are executedeagerly while Observables are executed lazily which I demonstrate with the following example:

// The async operation will be executed here and a Promise is returned
const promise = someAsyncOperationThatReturnsAPromise();
// Returns an Observable but the async operation is not executed at this point.
const observable = someAsyncOperationThatReturnsAnObservable();
// Instead the async operation within the Observable is executed here.
observable.subscribe(
(value) => { // Do something with the result },
(error) => { // Handle error }
);

Hot and cold observables

An Observable emits values from one or more data sources. A data source can be everything from a single primitive value or an array to an asynchronous call to a database or REST-service.

If a data source starts producing values only once an Observable is subscribed to it is called a cold observable and we can be sure that we do not miss out on any values from the data source prior to subscribing.

If a data source produces values independent of the creation of an Observable it is called a hot observable and we might miss out on some values prior to subscribing.

All the examples above are cold observables. Data is not pulled from the data source until we subscribe.

Examples of hot observables would be the kind that emits values from a WebSocket connection or from mouse events in the browser. In those scenarios the data source is the event submission from a WebSocket or the user clicking the mouse which can happen prior to Observable creation and subscription.

Some examples of cold and hot observables:

// Cold Observable since it will only start pulling values from the array when subscribed to.
const someArray = [1,2,3,4,5,6];
function createObservable(){
return new Observable(function(observer){
try{
someArray.forEach(value => observer.onNext(value))
} catch(error){
observer.onError(error);
}
});
}
// Hot Observable since clicks can occur at any time and we can miss out on clicks up until we subscribe.
const button = document.getElementById("button");
function createMouseEventObservable(){
return new Observable(function(observer){
button.addEventListener('click', () => {
observer.onNext():
});
return () => { button.removeEventListener('click'); } });
}
// Cold Observable since the data wont be fetched until we subscribe.
function createFetchObservable(){
return new Observable(function(observer){
fetch('someUrl')
.then(response => observer.onNext(response))
.catch(error => observer.onError(error))
});
}
// Hot Observable since messages can be emitted via the WebSocket prior to subscription.
const socket = new WebSocket('ws://localhost:8080');
socket.addEventListener('open', function (event) { socket.send('Hello Server!')}; );
function createWebSocketObservable(){
return new Observable(function(observer){
socket.addEventListener('message', function (event) {
observer.onNext(event.data);
});
});
}

When to use observables

It might be a bit overkill to use a library like RxJS to just do single asynchronous requests to a REST-service. In that case you might as well be using a promise based API. Especially since most http modules like fetch or axios are using promises.

If, on the other hand, you have a case where coordination and control flow between several asynchronous operations are needed then observables and a library like RxJS is probably something you should get familiar with.

Another case where using observables would be a good idea is when you have a data source that emits several events asynchronously over time. WebSockets, DOM-events, data streams etc.

Operators in RxJS

RxJS true power comes from its operators. Operators operate on an Observable and returns a new Observable thus making them chainable or pipeable.

As stated before an Observable can emit zero, one or many values as the result of one or more asynchronous operations. An operator can operate on each and one of those values emitted by such Observable and create new observables that in itself can emit zero, one or many values as the result of even more asynchronous operations. This means that we can create an escalation of asynchronous calls like this:

const { from } = require('rxjs');
const { mergeMap } = require('rxjs/operators');
from(fetch('api.com/products')).pipe(
mergeMap(listOfProductIds => from(listOfProductIds)),
mergeMap(productId =>
from(fetch(`api.com/products/${productId}`))
)
).subscribe((productDetails) => console.log(productDetails));

The are a multitude of operators in RxJS. One good way to start learning them is to just grab two or three of the most common one and just start using them.

Here we use mergeMap and from which is two really good operators to get you started when combining several asynchronous calls using e.g fetch(..) .

from is a so called creation operator that is used to create an Observable from a data source like an array or a Promise. Since fetch() returns a Promise we can pass that to the from operator in order to create an Observable out of a Promise.

mergeMap maps a new Observable to the first one for each value emitted. In this case the value emitted to the first mergeMap is the response from the fetch('api.com/products') call. That mergeMap then returns a new Observable which is created with another from using the response (which in this scenario is an array of product id’s) as a data source. When passing an array to from it will return an Observable that emits all the values from that array one at a time.

So the result of the first mergeMap is an Observable that emits zero, one or more product id’s which will go through the next operator in line one at a time. In this case the next operator in line is another mergeMap that takes each product id and returns a new Observable for each and one of them based on another fetch call.

What will end up in the callback of the subscription is the response of each fetch(`api.com/products/${productId}`).

Thank you for reading

It was not easy for me to wrap my head around all this when I first started learning RxJS and the concept of Observables. I hope this has improved your mental model over what Observables are. I will write more articles about specific use cases where RxJS has helped me a lot in the wild.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade