Rx.js Operators, Part II

Armen Vardanyan
Angular In Depth
Published in
6 min readDec 24, 2019

AngularInDepth is moving away from Medium. More recent articles are hosted on the new platform inDepth.dev. Thanks for being part of indepth movement!

It’s been more than 2 years after I published my first article on Rx.js, and now it’s time to talk more about its operators. Lots of stuff has changed since then; we didn’t even have the .pipe method back when I wrote the first piece.

In the meantime I used Rx.js more and more in the applications I’ve been working on, and the main insight I got from that experience is this:

Developers’ biggest problem with Rx.js is not knowing more operators

The amount of code that was made better just by using one or another (or a combination of) operators has been ridiculous. Of course, expecting someone to know all of them is an overkill (granted there are over a 100 operators in Rx.js), but at least understanding some of the important groups can make coding (especially in Angular) so much easier.

So, without a further ado, let’s dive in

debounceTime vs auditTime

If you ever tried to make an autocomplete input with Rx.js, then you probably heard about debounceTime: this operator, as the documentation says,

Emits a value from the source Observable only after a particular time span has passed without another source emission.

Which means, if we debounceTime(3000), for example, we will be getting notifications after the source has sort-of settled down for at least 3000 milliseconds. This is useful if we want to make a request for information from a server after the user finished typing, and don’t want to hit the server with useless requests before the user has actually finished typing. Here is an example:

If we now remove the debounceTime call and type “Hello” really fast, we will make 5 requests to https://some-url?q=, 4 of which are completely unnecessary. But with debounceTime we will only hit the server after the user stops typing.

So far so good. But what is auditTime then and what use cases does it have?

As the documentations states, auditTime

Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.

This may feel a lot like debounceTime, but it really is not. While both of this functions ignore some of the emissions based on a time interval, there is a significant difference: debounceTime waits after each emission for a given time period, and if there are no new emissions, it will allow that last emission to pass; auditTime does not care about emissions; instead, it comes back every given interval, checks if there have been any emissions in between the previous checking and the current one, and if there are, it lets the last one of them to pass. Imagine having an Observable of Facebook messages coming from all your different friends. If you auditTime(3000) this Observable, and in 3 seconds Anthony and Julia send you a message in that order, then at the top of the checking you will receive a message from Julia, but the message from Anthony will be lost forever.

But what is a use case for this operator? Imagine a situation where we receive emissions from a source that emits very frequently, say, a WebSocket conveying live data on stock exchange rates fluctuations. Every time we receive an emission from that source, a chart representing stock values is being repainted, depending on new values, which is a costly operation. Because this rates can change very fast, and the user won’t notice the smallest fluctuations that take place in <500ms, we can auditTime it to only repaint the chart every 500 milliseconds:

scan vs reduce

Both of this functions are aggregating emitted values, for example, counting the sum of numbers in an Observable of an Array. The main difference is that reduce emits only once — as soon as the source Observable completes. So we can use it to calculate the average age of users, for example:

In this example, we map ages of people to a tuple containing the age and the number 1 (which is the count). We then add up the ages and increment the count separately, still keeping the tuple, and after the observable completes, map the the result to sum / count, which is the average.

scan, on the other hand, emits each aggregated value. Every time it adds up a new number, an emission will fire. Here is a use case: imagine a webpage that displays donations from different people. Every time someone makes a donation, and Observable notifies us of the amount being donated. It does not show us the entire collected money, it only shows the amount of a new donation. So we essentially want to aggregate the donations as they arrive and update the DOM every time a new donation arrives:

distinct vs distinctUntilChange vs distinctUntilKeyChange

The name of the distinct operator speaks for itself — it only allows emissions that haven’t already happened. Here is a quick example:

This example will log 1, 2, 3, 4 , omitting the duplicate emissions of 2 and 4.

What is a good use case for this operator? Mainly, disallowing repeating actions on the same data entry. For example, say we have a Subject which fires a product object every time the user deletes one. Of curse, we can only delete a product once, but user may click on the button twice before our request is fulfilled, so we may want to use only distinct values. In this case we want the distinct operator to work on the user id-s instead of the object references, but Rx.js has got us covered: the distinct operator can accept a mapping function which can help it determine by what part of the emission to make the distinction. Here is an example:

But what if we don’t want two subsequent identical emissions, but two identicals in the overall stream are acceptable? For example, say we have a game in which the player has to answer some questions from different fields of science. One can choose from math, physics, history, geography and literature. The user chooses a field of science and is presented with the question. But we have to ensure that the player must answer question from at least two different areas, so they cannot choose the same field twice in a row. Here is where distinctUntilChanged comes to play:

This will now allow duplicates, but not one after another.

Of course, as with the distinct operator, we may provide a function to check distinction on a nested value rather then the emitted value itself. But Rx.js went as far as to provide a shorthand for this: distinctUntilKeyChanged. Here is how it works:

This code will log “Chair, Table, Chair”.

takeWhile vs takeUntil

Sometimes we just need manual methods of stopping emissions from an Observable source. Actually, there are three possible scenarios:

  1. Take only a fixed amount of emissions
  2. Take emissions while they satisfy a constraint (example: read a list of numbers while they are lower than 5)
  3. Take emissions until some external event happens.

Actually, the names of the methods that Rx.js has for this are exactly descriptive of these scenarios. I am not going to cover the take operator, as it is pretty straightforward. As to the other two, here is a simple example:

This will only log “1, 2, 3”. This behavior may seem similar to filter, but it is very important to understand that takeWhile actually unsubscribes from the source completely — filter on the other hand will just disallow some emissions. It will resume after some other emissions do satisfy the constraint. takeWhile, on the other hand, will cut off forever.

But what if I want to stop emissions based on some external event? Say, I want to log something every second, but only for 10 seconds? So I will need to stop the emissions after that amount of time?

We can actually achieve it by using takeWhile:

This works fine, but looks pretty terrible. As a matter of fact, Rx.js does provide a way of handling things like this: the takeUntil operator accepts an Observable, and will continue reading emissions until the said Observable emits some value. Thus we can use it by passing an Observable to it, which will fire in 10 seconds (using timer). Here is the code:

This pattern is actually quite popular in Angular, when unsubscribing from Observables in our components. I elaborate on it in my article Harnessing the Power of Mixins in Angular.

Conclusion

As I mentioned, Rx.js contains hundreds of operators, each of them accomplishing some fascinating features. I will continue talking about them in my followup articles.

Follow me on Medium and Twitter for more on Angular, Rxjs, React, and Javascript in general.

--

--