Learn how to use Observable with ReactiveX — Javascript ⚾️
In Javascript’s world, how to handle the async event is a big issue. You may come up with Promise function, Async function, etc. But sometimes you need to control a series of tasks with the async event and it’s always hard to deal with. For example, there is no method for stopping the promise when it’s executing. It still has some solutions to work around, but it makes code hard reading and maintaining.
ReactiveX provides a good pattern to control the async event. It combines the Observer pattern with the Iterator pattern and functional programming with collections. So that it makes sequences of events like an array and uses the operators like the Lodash. There have several topics for you to understand the good parts of ReactiveX below:
- What is the difference between the “rx” and “rxjs” from npm packages?
- Start to create a sample observable and subscribe it!
- Introduce some of the core operators.
- How to use it with React project?
What is the difference between the “rx” and “rxjs” from npm packages?
First, let’s figure out the difference between the “rx” and “rxjs”. In short, “rxjs” is rewrite of “rx” and is the latest production-ready version of RxJS which have better performance, better modularity, better debuggable call stacks while staying mostly backward compatible.
Start to create a sample observable and subscribe it!
Before creating an observable, let’s recognize the essential concepts in RxJS:
- Observable: represents the idea of an invokable collection of future values or events.
- Observer: is a collection of callbacks that knows how to listen to values delivered by the Observable.
- Subscription: represents the execution of an Observable, is primarily useful for canceling the execution.
- Operators: are pure functions that enable a functional programming style of dealing with collections with operations like
map
,filter
,flatMap
, etc.
Reference: http://reactivex.io/rxjs/manual/tutorial.html
Let’s create a source observable by the operator of create, and internally produce new events. Then, subscribe the source Observable. (I used the previous version of Rx.js which imports all of the operators to let us get the autocompletion)
// create.jsimport Rx from "rx";
const source = Rx.Observable.create(observer => {
observer.onNext("yingray say hi!");
observer.onNext("frank say hi!");
observer.onCompleted();
});
const subscription = source.subscribe(
value => console.log(`Next: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log("Completed!")
);
Introduce some of the core operators.
Let’s recognize the core operators and find the examples below:
range.js
interval.js
map.js
scan.js
FlatMap vs ConcatMap
baseball.js: https://github.com/yingray/rx-demo/blob/master/baseball.js
flatMap result
.
.
.
1006 ms: 120: H
.
.
.
2008 ms: 103: H
.
.
.
3008 ms: Ball landing
3009 ms: 151: swing and miss
.
.
.
.
4010 ms: Ball landing
4010 ms: 98: H
.
.
.
.
.
.
6011 ms: Ball landing
concatMap result
.
.
.
1004 ms: 120: H
.
.
.
.
.
.
3006 ms: Ball landing
3007 ms: 103: H
.
.
.
.
.
.
.
5009 ms: Ball landing
5009 ms: 151: swing and miss
5009 ms: 98: H
.
.
.
.
.
.
.
7011 ms: Ball landing
How to use it with React project?
We back to the web application and figure out how to make good use of ReactiveX on React project. There is an example which needs to fetch the RESTful API to get the data from another service or 3rd party.
const getData = () => fetch(‘https://yingray-services/data’);
In this case, while the component has already mounted, it fetches the request to the service which is the async event and returns a promise object.
MyPage extends React.Component { componentDidMount() {
getDate()
.then(response => response.json())
.then(data => this.setState({ data }));
}}
Let’s convert to the observable from promise object. In the latest version of Rx.js, we use the operator of ‘from’. Next, let’s make a subscription to subscribe this observable. Don’t forget to let it know how to deal with the success event and fail event.
import { from } from 'rxjs';MyPage extends React.Component { componentDidMount() { const observable = from(
getData()
.then(response => response.json())
); this.subscription = observable.subscribe(
(data) => this.setState({ data }),
(err) => console.error(err),
); }}
Finally, to avoid the forceUpdate issues, let’s do unsubscribe before the component is unmounted. You must pay attention to the scope of including the promise because it only stops watching the observable but not the execution of the promise. (It means it may execute the response.json() after it’s unsubscribed)
componentWillUnmount() {
if (this.subscription && !this.subscription.isStopped) {
this.subscription.unsubscribe();
}
}
References
- Tutorial: http://reactivex.io/rxjs/manual/overview.html
- Documentation: http://reactivex.io/documentation/operators.html
- Node module: https://github.com/ReactiveX/rxjs
- React integration: http://reactivex.io/rxjs/manual/tutorial.html#react
- Pipeable Operators: https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md
- The introduction to Reactive Programming you’ve been missing: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
- Techbridge: https://blog.techbridge.cc/2017/12/08/rxjs/