A Beginner’s Adventure with RxJS: Part I

Something that has been all the rage lately in the programming world is a paradigm known as functional reactive program. Exactly as it sounds, FRP is a powerful combination of reactive programming and functional programming. My goal of this blog post is not to explain what FRP is, but instead to walk through some of the ways I have implemented FRP using the library RxJS. Before continuing, if you’d like to gain more of a general understanding of the paradigm, I suggest reading this blog post. It breaks down what reactive programming is, what functional programming is and how functional reactive programming marries the two concepts.

What is RxJS?

There are many tools and libraries that exist that enable developers to easily implement FRP patterns, perhaps one of the most common being Reactive Extensions. Originally developed by Microsoft as Rx.NET, Rx has been adapted to many languages, including Javascript.

RxJS is a library that makes use of an Observable, which is a stream or a collection of items over time. It follows the Observer Pattern where an Observable is subscribed to by an Observer, to which it actively pushes its emissions. Most importantly with any FRP implementation, in order for an Observable to begin emitting items, a subscription needs to be established by an Observer. The source backing the Observable will not run until this is done — this concept is known as lazy loading.

Once an Observable emits an item, it can undergo a series of transformations before it reaches the Observer. The RxJS library contains a number of operators such as Observable.map(), Observable.delay(), Observable.concat(), and Observable.zip() that can be applied to the Observable in a chain-like fashion, making these transformations easy to do.

Source: http://reactivex.io/documentation/

To help explain RxJS and its operators, I have included a tool commonly used to illustrate the operators called Marble diagrams. The line represents time, whereas the circles represent emissions that the Observable would emit to its subscribers. The end state of the Observable can either be a successful completion (demonstrated by a vertical line) or an error (demonstrated by an X).

Implementing RxJS

The project I chose to work on is one that I knew would introduce interesting challenges I could solve with RxJS while also being something I personally would use. The feature is something I would like to see integrated into Spotify which allows a user to search for a song and annotate it with comments, which are then saved.

SongComponent View

Users of the feature can view a stream of tagged songs and can playback the songs with all users’ tags for that song made visible. Because I couldn’t actually make this feature a part of the Spotify application, I made an Angular2/Node.js app which makes use of the Spotify Web API and the 30 second song previews to illustrate my idea.

TaggedSongComponent View

I made use of RxJS in varying degrees throughout the application, with the most complex implementation being the component which syncs displaying tags with music playback. The scope of this blogpost will cover some of the basic ways I utilized some operators to achieve things like data loading and transformation, and capturing user input. There will be another part to this post where I will explore some of the more complex chains that I built in this project.

Observable and Observer

The first component I would like to discuss is the public feed which loads a feed of all of the publicly tagged songs. This is a simple and common use case of RxJS in Angular2. When the component initializes, it uses a service that prepares an Observable that will make a backend call to fetch the songs from the database when it is subscribed to by an observer.

Here is what the service code looks like:

@Injectable()
export class TaggedSongsService {
constructor(private http: Http) {}
  getAllTaggedSongs() {
return this.http.get(‘/api/tagged_songs’);
}
}

The http.get function is an Observable factory which will create our Observable. That Observable will make the GET request once an Observer is present and will emit the response returned from the backend. There are two important things to note here:

  1. Being an observable, this GET request won’t run, until something subscribes to it.
  2. If we subscribe to the same observable multiple times, it’ll make a backend call for every subscription, unlike a Promise that resolves only once in its lifetime.

If we look at the tagged songs list component which makes use of this service, upon initialization of the component is where I have subscribed to the observable.

ngOnInit(): void {
this.taggedSongsService.getAllTaggedSongs().subscribe(songs => {
this.songs = songs.tracks;
});
}

When a user navigates to the public feed view, the component will get initialized and in turn will subscribe to the Observable created by the TaggedSongsService.

Simple Data Transformation

Ok, so the component initializes and subscribes to the Observable which will make a backend call and emit the response. However, I really only want the JSON body from the response, so how would I do this transformation? Luckily, there is another commonly used RxJS operator called map() that takes a mapping function. It transforms emissions from the upstream Observable by applying the mapping function and forwarding the result downstream. If we examine the below marble diagram we’ll see that the upstream Observable is emitting values 1, 2 and 3. The mapping function we pass is multiplying each emission by 10. As a result our Observer (downstream from the map) receives 10, 20 and 30.

Source: http://reactivex.io/documentation/

In that sense we can use the map() operator to map a response to a JSON object. Here, I am taking the emission and extracting just the JSON to be passed along downstream. (i.e. I’m mapping a Response to a JSON object).

@Injectable()
export class TaggedSongsService {
constructor(private http: Http) { }
  getAllTaggedSongs() {
return this.http.get(‘/api/tagged_songs’)
.map(res => res.json());
}
}

Side-effects & Syncing Observables’ Emissions

So, we’ve discussed the idea of subscribing to an Observable and synchronously transforming an Observable‘s emissions using Observable.map(). Moving onto something a little more complicated, we can look at the component that allows a user to tag a song with comments. In order for a tag to be successfully added, the action is two-fold:

  1. When the song has reached the place where a user wants to place a tag she will first click the Add Tag button.
  2. A text box then pops up where she can enter the text followed by clicking a Submit Text button.

These two actions will be the subject of our next discussion.

Breaking it down to focus on the first step (clicking the Add Tag button), I have the following code.

this.position = Observable.fromEvent(this.button.nativeElement, ‘click’)
.do(_ => {
this.render.TextBox();
})
.map(_ => this.toCurrentPosition());
SongComponent View

I start by creating an Observable that emits click events for my Add Tag button. Now every time the user clicks on that button, my Observable will emit an event. I don’t care so much about the actual emission, but I do want to do a couple of things any time this happens. First I use the Observable.do() operator to pop up a text box on the UI. The Observable.do() operator is generally used for synchronous side-effects you might need to do mid-chain. I then use the familiar Observable.map() to obtain two important pieces of information: the current time in the song and the position the tag needs to be on the Wavesurfer view. I return the two key pieces of information provided by other entities within the component, as we can see below:

toCurrentPosition() {
return {
time: waveSurfer.getCurrentTime(),
currentPosition: getTagPosition()
};
}
getTagPosition() {
let currentPostion = getPosition();
let waveWidth = getWaveWidth();
return (currentPostion / waveWidth) * 100;
}

The second part of the tagging process is the actual tag text that is submitted via the textbox. I could have taken a similar approach and made onClick events on the Submit Text button an Observable. However, because what I am really interested in is the text in the textbox, I would have had to take an extra step to map the click event to the input text. Therefore, I’ve taken another approach where I instead use a regular onClick event which passes the text as an argument in the onClick function defined in the component. Once received in the component, the text is emitted through a Subject.

Note: We should usually avoid subjects since they can be tricky to deal with. Especially when building bigger chains. For this particular case though, I chose to use a Subject to demonstrate its uses.

A Subject is an Observable and an Observer, meaning you can subscribe to a Subject. For example, if you call subject.next(“hello”), then your Observer will receive “hello” as an emission. If you call subject.completed(), your Observer will receive the completed event, and the same goes when calling with an error.

Using a Subject like this is a way to make an Observable out of something that is not one. Note that in the SongComponent I have defined a global variable tagTextSubject and set its type to be a Subject.

export class SongComponent implements OnInit {
clicks: Observable<any>;
tagTextSubject:Subject<any> = new Subject();
}

When the submitText onClick function is executed with the input text, I call tagTextSubject.next(text), allowing the text value to be emitted from this Observable.

submitText(text: string): void {
this.tagTextSubject.next(text);
}

Great, so now the text is the second Observable associated with a single tag. In order to create the tag object from the first Observable, emitting time and position, to the second Observable, emitting the user’s input, I can apply the Observable.zip() operator to sync the two observable emissions.

Source: http://reactivex.io/documentation/

As illustrated below, the Observable.zip() operator takes the two Observables as the first two arguments and combine function as the third argument.

Observable.zip(
this.position,
this.tagTextSubject,
(position, tagText) => {
return {
position: position,
text: tagText
}
}
).subscribe(val => {
this.renderAndStoreTag(val)
});

Observable.zip() joins the last two emissions from each Observable together. So the most recently emitted position item (with the current time and position of the tag) is zipped together with the most recently emitted tagTextSubject item (the text of the tag).

The third argument in Observable.zip() determines exactly how I want the emissions from the Observables joined. Here, I combine the first object containing the position and time of the tag with the text of the tag into a single tag object. Observable.zip() emits objects containing the tag info which are passed along to the renderAndStoreTag function. This function adds each tag object to an array of tags (which will be passed to the backend to save all tags upon clicking the save button) and also creates and shows the UI for the tag.

Asynchronous Data Transformation

Great, so we’ve just mastered a small RxJS chain — we’re nearly experts! At this point I would like to wrap up part one of this blog post by taking a step back to look at one more operator which is a bit more complicated. To do that we must backtrack a bit to the initialization of the SongComponent we were just in. Whenever the component is initialized, we create the following chain and subscribe to it:

this.route.params
.switchMap(params => {
return this.songService.getSong(params[‘track_id’]);
})
.subscribe(song => {
this.song = song.name;
this.image = song.album.images[1].url;
this.artist = song.artists[0].name;
this.waveSurfer.load(song.preview_url);
}, error => console.log(error));

A user navigates to this view by clicking on the song title in the list of tagged songs in the Feed view. When this is done, the user is redirected to the song view and the song’s Spotify ID is passed along as a parameter of this url. However, we have no other information about the song at this point. Therefore, we need to map the id to a Spotify song object — but that means making an async call to the Spotify API to retrieve the song object by ID.

In the past we’ve used Observable.map() to synchronously map an emission to some other type. Here, since we don’t have the song object yet and have to retrieve it asynchronously, we can’t use Observable.map(). But fear not, there are a few operators that allow for asynchronous mapping in RxJS. They all behave in the same way but have semantic differences (a potential topic for another blog post). The general pattern is, we provide a mapping function that maps upstream emissions to an Observable. Whatever Observable we return, RxJS will subscribe to and forward its emissions downstream. For our purposes here, I have used Observable.switchMap().

Source: http://reactivex.io/documentation/

Now, whenever the params Observable emits a song ID, I map that to an Observable created by the service that fetches the song details from backend. As a result, my Observer receives full song objects instead of just IDs thus allowing me to perform the proper initialization of the component with the pertinent song information.

That concludes my introduction to some basic implementations of RxJS. I hope you’ve learned a little bit about how easy it can be to implement FRP with a helpful library such as this. Please check back soon to read Part II of my journey in applying RxJS where I will focus on the complex chain that is the driving force behind the TaggedSongComponent view.