Understanding RxJS and Redux-Observable

I’ve been trying to learn RxJS and Redux-Observable. It’s taken me quite a bit of time. In this post, I attempt to make sense of some of the key principles using more understandable language.

To start with, these are some key concepts to understand when learning RxJS:

  • Observable: An interface that listens for incoming notifications over a period of time and pushes them to another interface that reacts to them.
  • Subscription: When an Observable interface starts doing its work, i.e. listening for notifications and pushing them out.
  • Observer: An interface that reacts to data pushed from an Observable.
  • Operators: Functions used to manipulate an Observable’s output, e.g. filter, map, reduce, etc.

Here’s a simple example from the ReactiveX docs of registering event listeners.

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

In RxJS you do this:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.subscribe(() => console.log('Clicked!'));

Let’s see what’s in play here:

  • Rx.Observable.fromEvent(button, ‘click’) is the interface that listens for button click events (Observable).
  • A callback function receives notification of the click event and logs Clicked! to the console (Observer).
  • We call our Observable, passing it the Observer callback (Subscription).

Creating Observables

We can create observables by converting them from:

  • One or multiple values.
Rx.Observable.of(‘foo’, ‘bar’);
  • From an array of values:
Rx.Observable.from([1,2,3]);
  • From an event:
Rx.Observable.fromEvent(document.querySelector(‘button’), ‘click’);
  • From a promise:
Rx.Observable.fromPromise(fetch(‘/users’));
  • An AJAX request:
ajax.getJSON(`http://example.com`);

We can also create observables using Observable.create:

var foo = Rx.Observable.create(function (observer) {
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300);
}, 1000);
observer.complete();
});
foo.subscribe(function (x) { 
console.log(x);
});

Notice observer has next and complete methods. These are what Observables use to interface with Observers. next allows the Observable to push values to the Observer (and allows the Observer to receive values). complete terminates the Observable-Observer interface contract.

mergeMap (flatMap)

Transformation operators allow you to transform the output of Observables. A common one is mergeMap.

mergeMap first transforms each source value to another Observable. It then applies MergeAll to flattens the inner Observables into one Observable.

const source = Rx.Observable.of('Hello');
const example = source.mergeMap(val => Rx.Observable.of(`${val} World!`, `${val} John!`));
const subscribe = example.subscribe(val => console.log(val)); // Hello World!

You might wonder (as I did) how this is different from just using map. And why first transform the source value to an Observable?

const source = Rx.Observable.of('Hello');
const example = source.map(val => `${val} World!`);
const subscribe = example.subscribe(val => console.log(val)); // Hello World!

Same result! However, this example produces one output value for each input value. If you wanted to produce more than one output value per input value, you would need to map to an Observable:

const source = Rx.Observable.of('Hello');
const example = source.map(val => Observable.of(`${val} World!`, `${val} John!`));
const subscribe = example.subscribe(val => console.log(val)); //
Object

But this results in an observable that produces observables. Not very useful. To get it to produce actual values, we need to join the outputted observables together into one stream. We do that using mergeAll.

const source = Rx.Observable.of('Hello');
const example = source.map(val => Observable.of(`${val} World!`, `${val} John!`));
const subscribe = example.subscribe(val => console.log(val)); //
Hello World Hello John

concatMap

How does mergeMap differ from concatMap?

concatMap also first maps each source value to an Observable. The difference is it applies concatAll instead of mergeAll. I think nothing explains it better than the diagrams in this post.

concatAll flattens several Observables into one stream which allows each Observable to finish pushing its values before outputting the values of the next Observable:

-----------------time----------->
{
...{1},
.......{2................................3},
...............{},
........................{4}
}.concatAll()

outputs…

{...1....2................................3..4}

mergeAll flattens several Observables into one stream which outputs values as soon as pushed by any Observable, even if another Observable has not finished pushing its values.

-----------------time----------->
{
...{1},
.......{2................................3},
...............{},
{........................4}
}.mergeAll()

outputs…

{...1....2..................4..............3}

Redux-Observable

Redux-Observable allows us to apply RxJS principles to the realm of Redux. It creates Observables that listen for actions, intercept them before they hit the reducer and push them to store.dispatch (the Observer). Redux-Observable calls these special Observables Epics. Here’s an example from the Redux-Observable docs:

import { ajax } from 'rxjs/observable/dom/ajax';

// Action creators
const fetchUser = username => ({ type: FETCH_USER, payload: username });
const fetchUserFulfilled = payload => ({ type: FETCH_USER_FULFILLED, payload });

// Epic
const fetchUserEpic = action$ =>
action$.ofType(FETCH_USER)
.mergeMap(action =>
ajax.getJSON(`https://api.github.com/users/${action.payload}`)
.map(response => fetchUserFulfilled(response))
);
// Dispatch FETCH_USER action
dispatch(fetchUser('torvalds'));

Here, we have action creators that return pure actions. An Observable (Epic) named fetchUserEpic is created which listens for the FETCH_USER action.

The Observer is store.dispatch and its subscription to the root epic has been abstracted away. Under the hood, it looks like this:

epic(action$, store).subscribe(store.dispatch)

When FETCH_USER is dispatched, fetchUserEpic maps it to another Observable which listens for a JSON response to an AJAX request. Once received, the response is mapped to the FETCH_USER_FULFILLED action.

mergeMap joins all outputted Observables into one stream using mergeAll.

There are a lot more concepts and operators to learn but this much helped me get started.