Rxjs: In 5 minutes!

Asynchronous programming was never so easy.

What is Reactive Programming?

An API for asynchronous programming
with observable streams.

What is stream, observable ?

Let’s dive in.

Key concepts —

  1. Stream
  2. Observable
  3. Observer
  4. Subscription
  5. Operator
  6. Subject

Stream

A stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches. — Wikipedia

Alternatively, We can say that the stream is a sequence of ongoing events ordered in time. For example number of button click(s) in 1 Second. So all clicks will be grouped as a stream.

Stream in RxJs

In RxJS, we can create streams using —

  • From one or multiple values
  • From array of values
  • From an event
  • From a Promise
  • From iterable/any other data structure
  • From a callback

Thats it! This much understanding about stream is enough to proceed.

Well, our main goal is to operate on such streams. Means, The stream is the subject which being observed.

Observable

In Reactive programming, observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits. So we can turn an array, promise, or iterable into an observable. lets see how with examples.

// Example 1. From random data
// observer.next(x) function emits data
var myObservable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
console.log('just before subscribe');
myObservable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');

Now you know that,

  • Observable is a representation of stream to operate on later.
  • We can create custom observable using Rx.Observable.create method, which returns and object having subscribe method on it.
  • subscribe method takes an object as a param which is nothing but observer.
  • Observer: An object with next, error and complete methods on it. Or Just an next function. We will discuss about observer later.

Rx.Observable class has many functions to create observables from different kind of data/streams such as events, event patterns, arrays, promises, single/multiple value, any kind of data structure/primitive data types and so on.

Examples -

// From event
var clicks = Rx.Observable.fromEvent(document, 'click'); clicks.subscribe(x => console.log(x));
// Results in:
// MouseEvent object logged to console every time a click
// occurs on the document.
----------------------------------------------
// Converts an array to an Observable
var array = [10, 20, 30];
var result = Rx.Observable.from(array);
result.subscribe(x => console.log(x));
// Results in the following:
// 10 20 30
----------------------------------------------
// Convert the Promise returned by Fetch to an Observable
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));
----------------------------------------------
result.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
// Or just next fn
result.subscribe((x) => {
console.log('got value ' + x)
})
----------------------------------------------------
// See more examples here
// http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html

Observer

Observer can be an Object with An object with next, error and complete methods on it and it can be passed to observable.subscribe(observer) method.

Observable will call the Observer’s next(value) method to provide data. And similarly calls Observer’s error(err) or Observer’s complete() methods once.

/* With an observer */
var observer = {
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
};
// Or just an next function like then() in promises
var observer = (x) => {
console.log('Completed');
}
// example passing Observer to subscribe function

myObservable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});

Subscription

The “listening” to the stream is called subscribing.

Calling subscribe method of Observable we can listen to stream.

To invoke the Observable, we need to subscribe to it.

Just creating Observables will not going to listen streams, we must have to invoke the Observable by calling subscribe method.

For example —

var clicks = Rx.Observable.fromEvent(document, 'click'); 
// Subscribe to observable 
// so that we can listen stream changes
let clicksSubscription = clicks.subscribe(x => {
console.log(x)
});
// Later we can stop listening to streams 
clicksSubscription.unsubscribe();

Operators

Now we have Observables. Lets understand Operator.

Now again read/remember “Operators: Also Just Functions” section from this article by Ben Lesh. He greatly explained about what is Operator. Just read and then come back here.

Done ? Great!

Now hell you know that,

  • A pure function which takes one Observable as input and generates another Observable as output.
  • Subscribing to the output Observable will also subscribe to the input Observable.

Think Operators as an Lodash functions.

Lets create one custom operator function that multiplies each value received from the input Observable by 10

function multiplyByTen(input) {
var output = Rx.Observable.create(function subscribe(observer) {
input.subscribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));
// Result In:
// 10
// 20
// 30
// 40

There are operators for different purposes, and they may be categorized as: creation, transformation, filtering, combination, multicasting, error handling, utility, etc. here you will find all of them.

You can visit official website and choose operators as per your need.

Some examples -

1. Get the maximal value of a series of number
Rx.Observable.of(5, 4, 7, 2, 8)   
.max()
.subscribe(x => console.log(x));
// Result In: 8
2. Map every click to the clientX position of that click
var clicks = Rx.Observable.fromEvent(document, 'click'); 
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));
3. Chaining operators
const input = Rx.Observable.fromEvent(node, 'input')
.map(event => event.target.value)
.filter(value => value.length >= 2)
.subscribe(value => {
// use the `value`
});

Chaining operators

Each operator in the chain modifies the Observable that results from the operation of the previous operator.

Subject

In RxJs-

  • Plain Observables are unicast.
  • Subject is a special type of Observable that allows values to be multicasted to many Observers.
  • Subjects maintain a registry of many listeners/sunscriber.

Lets understand unicasting vs multicasting first!

Unicast: A transmission/stream sends IP packets to a single recipient on a network.

Example :- Plain Observable

Explanation:- you have to do .subscribe(). If you subscribe multiple times against the original observable, you are not sharing the original observable. You are creating a new observable every time you subscribe.

Multicast: A transmission sends IP packets to a group of hosts on a network.

Example :- RxJs Subject, Promises

Explanation:- You have to do .subscribe() as many times as you want against that original observable.

Wants to dive deeper ?

Again thank you to Ben Lesh. Lets have a look at “Rx Subjects” section from this article.

Thats it!

I hope you enjoyed this article and it was worth reading!

Also don’t forget to give me a 👏 so that it will be the message for others.

Credits —