Angular — Introduction to Reactive Extensions (RxJS)

How to use observable sequences in AngularJS

Pt — A code experiment on point, form, and space.

Reactive Extensions for JavaScript (RxJS) is a reactive streams library that allows you to work with asynchronous data streams. RxJS can be used both in the browser or in the server-side using Node.js.

In this post we are going to introduce RxJS basic concepts and how we can use them with AngularJS.

Follow me on Twitter for latest updates @gerardsans.

What exactly are asynchronous data streams?

Let’s take each word separately and put it into context.

  • Asynchronous, in JavaScript means we can call a function and register a callback to be notified when results are available, so we can continue with execution and avoid the Web Page from being unresponsive. This is used for ajax calls, DOM-events, Promises, WebWorkers and WebSockets.
  • Data, raw information in the form of JavaScript data types as: Number, String, Objects (Arrays, Sets, Maps).
  • Streams, sequences of data made available over time. As an example, opposed to Arrays you don’t need all the information to be present in order to start using them.

Asynchronous data streams are not new. They have been around since Unix systems, and come in different flavours and names: streams (Node.js), pipes (Unix) or async pipes (Angular 2).

Observable sequences

In RxJS, you represent asynchronous data streams using observable sequences or also just called observables. Observables are very flexible and can be used using push or pull patterns.

  • When using the push pattern, we subscribe to the source stream and react to new data as soon as is made available (emitted).
  • When using the pull pattern, we are using the same operations but synchronously. This happens when using Arrays, Generators or Iterables.
🐒 Observables sequences allows us to use both push and pull patterns

Basic example

Let’s start using a simple observable sequence within an Angular Controller. See it running in this Plunker.

In this example, we used an Observable (rx.Observable) followed by a chain of operators ending with a call to subscribe.

The first operator waits for 1 second and emits values (indefinitely) starting with 0 (interval, delay is set in ms). The second operator takes the first 3 items (take). The third operator, a helper method, let’s us set the counter for each value using the current scope (safeApply uses $scope.$apply only when necessary). Finally, a call to subscribe triggers the execution.

We can also use an ASCII Marble Diagram to describe it:

Note from the diagram above that each operator creates a new stream that we could also reference separately.

🐒 Observables programming has two separate stages: setup and execution.

Observables and Operators

RxJS combines Observables and Operators so we can subscribe to streams and react to changes using composable operations. Let’s introduce these concepts in more detail.

Observables/Observers

Observables get their name from the Observer design pattern. The Observable sends notifications while the Observer receives them. Let’s create a simple Observer.

You can pass in your observer when calling subscribe or by passing onNext, onError and onCompleted callbacks. These are their behaviours:

  • onNext, called for each element in the observable sequence.
  • onError, called only once in case of an error.
  • onCompleted, called only once when the stream finishes.

If we want to stop listening to changes, we can unsubscribe by getting a reference and clean up on $destroy.

Operators

We have seen some already. These are the main categories: creation, conversion, combine, functional, mathematical, time, exceptions, miscellaneous, selection and primitives. You can explore them here.

A list of the most common: merge @, concat @, defer, do, map @, flatMapLatest, fromPromise, fromEvent, takeUntil @, throttle, delay @, empty, catch, if, timer, filter, zip @.

We have seen how Observables and Operators are a powerful combination. Let’s see how we can use them in Angular.

Integration with Angular

RxJS plays well with Angular but instead of writing your own helper functions to bridge the two you can use rx.angular.js, a dedicated library for RxJS and AngularJS interoperability.

Let’s see an example of this integration for: scope, promises and DOM-events.

Integration with Scope

Using observeOnScope we can take a $watch expression and turn it into an Observable. Let’s use an example using a search box to query Wikipedia articles.

This example will take changes to $scope.search and emit objects like the following as the user types

The first operator we used was throttle that delays requests so we don’t overload the server as the user types. Then we used map to take only newValue or the empty string if undefined. We don’t want to repeat queries using the same term so we used distinctUntilChanged. After that, we used flatMapLatest so we only take the latest results ignoring out of order and unfinished ajax calls. Finally we got the results into the scope. Try removing some operators using this Plunker.

Promises

Promises are very helpful for one-off asynchronous operations. If you need a quick overview you can read this post.

Since version 2.2, RxJS integrates with Promises using Rx.Observable.fromPromise. See an example below:

This function returns an observable that will emit the result of the promise when available. Used with flatMapLatest results in a observable containing only the latest values ignoring the rest. You can find a nice graphic explanation here.

DOM-events

Another common use case for RxJS are DOM-events. Let’s build a simple idle user feature using RxJS and Angular. In order to use DOM-events we will use Rx.DOM (HTML DOM bindings for RxJS) through rx.angular.

Rx.DOM must be included separately but includes event binding, Ajax requests, Web Sockets, Web Workers, Server-Sent Events and even Geolocation.

In the code above we detect when the user has been idle for a period of 5 seconds. In order to do that we merged the events coming from keystrokes, mouse (clicks, move, scroll) and taps (for mobile users).

Then we buffered all events for 5 seconds (bufferWithTime in ms) and checked when the resulting sequence was empty so we can assume the user has been idle (filter). The logic inside subscribe is a simple dialog asking the user to keep working or quit. You can find a working example using a directive in this Plunker.

Reactive Programming

We just started scratching the surface, Reactive Programming is a paradigm where asynchronous data streams can be used almost everywhere. Everything is a stream. Repeat with me!

Hope you have enough information to continue exploring RxJS on your own. Thanks for reading! Have any questions? Ping me at @gerardsans

More resources