A Beginner’s Adventure with RxJS: Part II

Nichole Bates
Def Method
Published in
10 min readJan 15, 2018

Welcome back to Implementing RxJS. In Part I of this two-part blog post, I began discussing an Angular2 app I built and some of the RxJS operators I made use of in my main components. As promised, Part II of this blog post will aim to discuss the Rx chain driving the TaggedSongComponent View.

TaggedSongComponent View

One of this app’s features allows a user to choose a song and add tags or comments to that song at different points throughout its duration. These tags can then be displayed in sync with the song whenever it is played.

Essentially, the challenge here is that any time the song begins to play, all of the tags for that song need to be synced up based on the point where playback is started so they show at the correct time. Likewise, any time playback is stopped, tags should be hidden.

Creating Observables From User Events

There are a number of user events that can trigger this syncing of tags. There is the play/pause button for playback control. A user can also scrub to a certain part of the song to begin playback at that point.

At the most basic level, we have a function that accepts an Observable of user events and returns an Observable of UI actions. The logic behind this view needs to be able to handle the different kinds of events and determine what kind of actions should take place.

User Events → UI Actions

The first step in this chain is tracking the actual user events. Similarly to what we have discussed in Part I, we can create Observables in a number of ways:

  1. Observable.of()
  2. Observable.from()
  3. Observable.fromEvent()
  4. Creating a Subject and using subject.next()

In this case we first want to create an Observable from a click event on the play/pause button. Therefore, we’ll use the Observable.fromEvent() operator. As we can see in the following code, whenever the button is clicked, we simply emit a string identifying it as a ‘togglePlaybackEvent’.

let togglePlaybackEvents = Observable.fromEvent(this.playButton.nativeElement, ‘click’)
.map(_ => ‘togglePlaybackEvent’);

A user can also scrub to a point on the Wavesurfer, which is an event handled by the Wavesurfer plugin. However, we can be notified that the user has taken this action. In order to incorporate these events into our Rx chain, we setup seekEvents as a Subject. Similarly to the events emitted by the play/pause button, we’ll emit a ‘seekEvent’ through the seekEvents subject whenever our Wavesurfer.on(‘seek’) callback is called.

seekEvents: Subject<any> = new Subject();this.waveSurfer.on(‘seek’, _ => this.seekEvents.next(‘seekEvent’));

At this point we have two separate user event streams that we would like to combine into one. To do so, we can use the Observable.merge() operator.

Source: http://reactivex.io/documentation/

The merge operator takes the emissions of multiple streams and combines them into one stream in order of emission. Using it here, these two separate Observables are merged together into one Observable, so as the user interacts with the interface, user events are emitted by a single stream.

let userEvents = Observable.merge(
togglePlaybackEvents, this.seekEvents);

Once we have the user events organized into a single stream we will use these events to trigger the syncing of tags. Before doing this however, there is one side effect we have to deal with. If a user interacts with the play/pause button, signified by the ‘togglePlaybackEvent’ emission, it means they want to toggle the song playback, so we must do that. As we discussed in Part I of this blog post, we can use the Observable.do() operator here which is generally used for synchronous side-effects you might need to do mid-chain.

*The Wavesurfer plugin handles scrubbing actions by starting playback at the desired position so we don’t need to worry about that particular side-effect.

userEvents
.do(userEvent => {
if (userEvent == "togglePlaybackEvent") {
togglePlayback();
}
})

We next need to figure out whether we should start or stop showing tags. If the Wavesurfer is playing, we need to start showing tags and sync them based on the current position of the Wavesurfer. Otherwise we need to stop showing tags. We have a helper function called playbackStats() that returns the playback status of the Wavesurfer.

userEvents
.do(...)
.map(_ => {
let stats = playbackStats();
return {
currentTime: stats.currentTime,
command: stats.playbackState
? 'startTagsPlayback' : 'stopTagsPlayback'
}
});

Syncing Tags

At this point we have a mapping from user events to objects that describe what we should do with the tags. For the sake of this post, let’s call these objects tag commands. From the above function we can see that we have two types of commands:

  1. ‘startTagsPlayback’
  2. ‘stopTagsPlayback’

When we receive a ‘startTagsPlayback’ command as an emission, we know that we want to start showing the tags as the song is playing. But in order to do this, we first need to calculate the subset of all the tags that come at or after the playback start point.

let createShowTagsActions = (currentTime, tags) => {
tags.filter(tag => tag.time > currentTime)
}

Here, we take the tags array and return an array that includes only tags where their show time is more than or equal to the current playback time. We won’t be displaying tags that have already passed, so we have no reason to deal with them.

Great, so we have the tags we want to show, but how do we actually display these tags? Well, displaying a tag is actually made up of two actions — a show action and a hide action. Just as we want to display the tags at the correct time, we also want them to disappear a certain period of time after showing in order to create a pop up effect. So how can we do that?

Concatenating Observables

To achieve the desired effect, we need to create an observable that will emit the show action followed by the hide action. Each of the actions will have a reference to the tag that is being shown/hidden. To do so, we can use the concat operator which allows us to concatenate two Observables together. The following function builds the observable that emits a show action followed by a hide one.

let createTagActionsObservable = (tag, currentTime) => {
let showAction = {'tag': tag, 'action': 'show'};
let showTagObservable = Observable.of(showAction);
let hideAction = {'tag': tag, 'action': 'hide'};
let hideTagObservable = Observable.of(hideAction);

return Observable.concat(showTagObservable, hideTagObservable);
}
Source: http://reactivex.io/documentation/

Unlike the merge operator which combines emissions in the order that they are emitted, the concat operator combines all of the emissions from the first Observable followed by all of the emissions of the second Observable. So currently, we can subscribe to one of these concatenated Observables for a single tag and we’ll get two emissions back to back, one to show the tag followed by one to hide it.

Emitting Observables With Delays

So now that we have the show and hide emission for a tag being emitted in the correct order, we have to handle the timing aspect of these emissions — we don’t want to show the tag and immediately hide it. We want to give users a chance to read the comment, so we want to wait two seconds before hiding the comment pop up. Returning back to our function that creates this Observable for a single tag, we see that we can do this by adding on the delay() operator when we create the hideTagObservable.

let createTagActionsObservable = (tag, currentTime) => {
let showAction = {'tag': tag, 'action': 'show'};
let showTagObservable = Observable.of(showAction);
let hideAction = {'tag': tag, 'action': 'hide'};
let hideTagObservable = Observable.of(hideAction).delay(2000);

return Observable.concat(showTagObservable, hideTagObservable);
}
Source: http://reactivex.io/documentation/

The delay operator takes a measure of time, and returns an Observable that delays all the emissions of its upstream. In our case, we’re applying a two second delay to the hideAction observable. So now we can subscribe to this concatenated observable and we will get the emitted action to show this tag and two seconds later we will get the emitted action to hide it.

Almost there, but we have one additional layer of complexity here. We also don’t want to show the tag right away. Instead, we know that we need to show the tag at a specific time in the song’s playback, indicated by the tag object’s time value. To do this, we can use the delay operator again when creating the showTagObservable.

let createTagActionsObservable = (tag, currentTime) => {
let showAction = {'tag': tag, 'action': 'show'};
let timeToShowTag = (currentTime - tag.time) * 1000;
let showTagObservable = Observable.of(showAction)
.delay(timeToShowTag);
let hideAction = {'tag': tag, 'action': 'hide'};
let hideTagObservable = Observable.of(hideAction).delay(2000);

return Observable.concat(showTagObservable, hideTagObservable);
}

We calculate the delay based on the current time in playback that the playback action was taken. If we play from the beginning of the song (time 0) and the tag is to be shown at second ten, then we emit that tag to be shown with a delay of ten seconds.

So now, for each individual tag, we will receive the show action at the correct time in the song’s playback to show the tag and two seconds later we will receive the hide action to hide it again.

We can apply the function to every tag that we intend to show. This will result in a tag actions Observable for each tag. But again, our goal here is to flatten this all into one stream so we can go ahead and merge these Observables together.

let createShowTagsActions = (currentTime, tags) => {
let allActions = Observable.empty();
tags
.filter(tag => tag.time > currentTime)
.forEach(tag => {
let actions = createTagActionsObservable(tag, currentTime);
allActions = Observable.merge(actions, allActions);
})
return allActions;
}

Once we create an actions observable for a tag, using the merge operator again, we can merge each of those Observables into a single Observable, initialized by using the Observable.empty() operator. Now we can subscribe to this Observable and we will receive the emission for the first tag to show at the calculated time, followed two seconds later by its associated hide action, followed then by the show tag emission of the next tag at the calculated time, followed two seconds later by its hide action and so on.

Now that we have handled the more complex logic of what to do when we receive the ‘startTagsPlayback’ command, we can easily handle the logic for the ‘stopTagsPlayback’ command. When this happens we simply want to emit a hide all tags action because we don’t want to show any tags.

let hideAll = Observable.of({tag: null, action: 'hideAll'});

Now that we have created Observables for each command we need to tell Rx which one to use.

From User Events to Actions

At this point, we have translated user events into commands which we can further map into actions that the UI can handle. We are mapping a value (‘startTagsPlayback’ or ‘stopTagsPlayback’) to an actions stream which will be flattened into the UIActions stream. For this we have chosen to use the switchMap() operator as the mapping operator in order to cancel any outstanding actions that haven’t been emitted yet. To learn more about how switchMap() works, please refer to Part I of this blog post.

userEvents
.do(...)
.map(_ => {
let stats = playbackStats();
return {
currentTime: stats.currentTime,
command: stats.playbackState
? 'startTagsPlayback' : 'stopTagsPlayback'
}
})
.switchMap(cmd => {
let hideAll = Observable.of({tag: null, action: 'hideAll'});
if(cmd.command === 'startTagsPlayback') {
return Observable.concat(hideAll,
createShowTagsActions(cmd.currentTime, savedTags));
} else {
return hideAll;
}
});

If we get ‘startTagsPlayback’ command we return the chain with show/hide pairs. In this case, we need to take one more step and prepend a ‘hideAll’ action to this Observable using the concat() operator again. This is just some basic clean up that ensures any tags which might still be showing from a previous event are hidden and we start with a blank slate.

If we receive a ‘stopTagsPlayback’ command, we simply hide all tags by emitting a ‘hideAll’ action.

Handling Tag Actions

Finally, the TaggedSongComponent subscribes to this Observable and handles all incoming actions.

uiActions.subscribe(tagAction => {  
if(tagAction['action'] === 'hideAll') {
this.hideTags();
} else if(tagAction['action'] === 'show'){
this.showTag(tagAction['tag'])
} else {
this.hideTag(tagAction['tag']);
}
}, error => console.log(error));

As we know, the tag action is an object that has a few key pieces of information on it: the individual tag’s key information and the action that the UI should take.

const TagAction = { 
"tag": {
"position": 12.5,
"text": "This song is awesome!",
"time": 1.06
},
"action": "show"
}

The tag Observable is emitted according to its delay, at which point, because the action is ‘show’, we find the tag in the UI based on it’s tag.position value and change it’s display property to block. Two seconds later, the same tag’s hide Observable is emitted with the ‘hide’ action, at which point we find the tag in the UI and switch the display property back to ‘none’. Likewise, if the action is ‘hideAll’ then we hide all of the tags by switching their display property to ‘none’. So now the UI can react in different ways based on the actions.

So there you have it, an example of using RxJS to drive asynchronous, time-based content. This of course is only scratching the surface of what RxJS is capable of, but it’s been a great way for me to become acquainted with the library as a beginner. I hope it’s been useful for you as a reader. In the future, I hope to follow up this post with another that discusses how I was able to test this chain — so check back soon!

--

--