Rxjs: In 6 minutes!

Mohan Dere
Apr 19, 2018 · 6 min read

Asynchronous programming was never so easy.

Image for post
Image for post

What is Reactive Programming?

An API for asynchronous programming
with observable streams.

What is stream, observable ?

Let’s dive in.

Key concepts —

Stream

A stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches. — Wikipedia

Alternatively, We can say that the stream is a sequence of ongoing events ordered in time. For example number of button click(s) in 1 second. So all clicks will be grouped as a stream.

Stream in RxJs

In RxJS, we can create streams using —

  • From one or multiple values
  • From array of values
  • From an event
  • From a Promise
  • From iterable/any other data structure
  • From a callback

That’s it! This much understanding of stream is enough to proceed.

Well, our main goal is to operate on such streams. Which means, the stream is the subject which is observed.

Observable

In Reactive programming, observer subscribes to an observable. Then that observer reacts to whatever item or sequence of items the observable emits. Lets see how with examples.

// Example 1. From random data
// observer.next(x) function emits data
var myObservable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
const observer = {
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
};
console.log('just before subscribe');
myObservable.subscribe(observer);
console.log('just after subscribe');
Image for post
Image for post

Rx.Observable class has many functions to create observables from different kind of data/streams such as events, event patterns, arrays, promises, single/multiple value, any kind of data structure/primitive data types and so on.

Examples -

// From event
var clicks = Rx.Observable.fromEvent(document, 'click'); clicks.subscribe(x => console.log(x));
// Results in:
// MouseEvent object logged to console every time a click
// occurs on the document.
----------------------------------------------// Converts an array to an Observable
var array = [10, 20, 30];
var result = Rx.Observable.from(array);
result.subscribe(x => console.log(x));
// Results in the following:
// 10 20 30
----------------------------------------------// Convert the Promise returned by Fetch to an Observable
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));
----------------------------------------------result.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
// Or just next fnresult.subscribe((x) => {
console.log('got value ' + x)
})
----------------------------------------------------
// See more examples here
// http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html

Observer

Observer can be an Object with An object with next, error and complete methods on it and it can be passed to observable.subscribe(observer) method.

Observable will call the Observer’s next(value) method to provide data. And similarly calls Observer’s error(err) or Observer’s complete() methods once.

/* With an observer */
var observer = {
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
};
// Or just an next function like then() in promisesvar observer = (x) => {
console.log('Completed');
}
// example passing Observer to subscribe function

myObservable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});

Subscription

The “listening” to the stream is called subscribing.

Calling subscribe method of Observable we can listen to stream.

To invoke the Observable, we need to subscribe to it.

Just creating Observables will not going to listen streams, we must have to invoke the Observable by calling subscribe method.

For example —

var clicks = Rx.Observable.fromEvent(document, 'click'); // Subscribe to observable 
// so that we can listen stream changes
let clicksSubscription = clicks.subscribe(x => {
console.log(x)
});
// Later we can stop listening to streams
clicksSubscription.unsubscribe();

Operators

Now we have Observables. Lets understand Operator.

Now again read/remember “Operators: Also Just Functions” section from this article by Ben Lesh. He greatly explained about what is Operator. Just read and then come back here.

Done ? Great!

Now hell you know that,

  • A pure function which takes one Observable as input and generates another Observable as output.
  • Subscribing to the output Observable will also subscribe to the input Observable.

Think Operators as an Lodash functions.

Lets create one custom operator function that multiplies each value received from the input Observable by 10

function multiplyByTen(input) {
var output = Rx.Observable.create(function subscribe(observer) {
input.subscribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));
// Result In:
// 10
// 20
// 30
// 40

There are operators for different purposes, and they may be categorized as: creation, transformation, filtering, combination, multicasting, error handling, utility, etc. here you will find all of them.

You can visit official website and choose operators as per your need.

Some examples -

1. Get the maximal value of a series of numberRx.Observable.of(5, 4, 7, 2, 8)   
.max()
.subscribe(x => console.log(x));
// Result In: 8
2. Map every click to the clientX position of that clickvar clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));
3. Chaining operatorsconst input = Rx.Observable.fromEvent(node, 'input')
.map(event => event.target.value)
.filter(value => value.length >= 2)
.subscribe(value => {
// use the `value`
});

Chaining operators

Each operator in the chain modifies the Observable that results from the operation of the previous operator.

Subject

In RxJs-

  • Plain Observables are unicast.
  • Subject is a special type of Observable that allows values to be multicasted to many Observers.
  • Subjects maintain a registry of many listeners/subscriber.

Lets understand unicasting vs multicasting first!

Unicast: A transmission/stream sends IP packets to a single recipient on a network.

Example :- Plain Observable

Explanation:- you have to do .subscribe(). If you subscribe multiple times against the original observable, you are not sharing the original observable. You are creating a new observable every time you subscribe.

Image for post
Image for post

Multicast: A transmission sends IP packets to a group of hosts on a network.

Example :- RxJs Subject, Promises

Explanation:- You have to do .subscribe() as many times as you want against that original observable.

Image for post
Image for post

Wants to dive deeper ?

Again thank you to Ben Lesh. Lets have a look at “Rx Subjects” section from this article.

Thats it!

I hope you enjoyed this article and it was worth reading!

Also don’t forget to give me a 👏 so that it will be the message for others.

Image for post
Image for post

Credits —

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch

Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore

Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store