RxJS: nosy combineLatest vs selfish withLatestFrom

Wojciech Trawiński
Rx way
Published in
5 min readJul 10, 2018

When I first came across object-oriented programming I was told that everything is a class. I’ve been using RxJS for a couple of months and a similar cliche comes to my mind, namely that in RxJS everything is a stream. Treating your application’s data as a bunch of observables usually requires some sort of interaction between the data sources. In this article I would like to you to get familiar with combineLatest and withLatestFrom operators by analysing a real case I had to tackle in my job.

Problem description

On a regular basis I work as a JavaScript developer in one of the most popular Polish publishing house. One day, as a change from everyday duties, migrating Flash applications to Angular, we were asked to develop application enabling rendering a graph of quadratic function based on provided coefficients. One of the requirements was to render immediately the resulting equation, whereas a corresponding graph was expected to be rendered after clicking a submit button. Let’s see how brilliantly RxJS tackled the aforementioned problem.

Operators in the limelight

As the article’s title suggests the two mighty operators that were used in the application were combineLatest and withLatestFrom.

I called combineLatest a nosy operator, since when any observable that the resulting stream is composed of emits a value, it emits the latest value from each. You can compare it to the news presenter sitting in a room who has to prepare a short news service every hour. He has an access to several data streams including: politics$, economy$, sport$, weather$ and his job is to pick the most recent news from each area. We can conclude that he creates the following stream: combineLatest(politics$, economy$, sport$, weather$).

On the contrary, withLatestFrom can be perceived as a selfish operator, as the resulting stream emits a value only when the source observable emits one and just append the latest value from the stream passed to the withLatestFrom operator. Imagine a speedway journalist who has to write a paper before every match weekend. He has an access to two data streams: speedwayNews$ and weather$. Everytime a new notification from the source stream (speedwayNews$) is sent he also needs some information on the weather forecast (weather$). That’s why we can say that he creates the following stream: speedwayNews$.pipe(withLatestFrom(weather$)).

Implementation

Below you will find the source code of the example. I will go step by step and explain each part of the code in details.

The live example is available here: https://jsfiddle.net/axo4kjL3/57/

First, let’s have a look at the template. It is just for demonstration purpose, so it lacks styling and you can see placeholders for rendering both equation and graph. However, the main aim of the article is to get familiar with RxJS operators so please forgive me this simple solution. On the left, there is a form with three inputs for providing a quadratic function’s coefficients and a button enabling graph rendering. In addition, there is a live preview of the resulting equation. On the right side, there is a placeholder for the function’s graph which is only updated after clicking the render button.

Without further ado, let’s dive into JavaScript code. I attached RxJS library using CDN and a I exposed global rxjs object to import from. Next, I defined some constants and added helper function to imitate equation and graph rendering. Finally, let’s have a look at RxJS part of the code. There is always one question that need to be answered when you develop your application using the reactive library, namely which streams of data you will need to implement application logic. Take your time to analyse the problem and try to come up with a list of streams you expect to encounter.

I ended up with the following data streams:

  • graphBtn$ which emits when a user clicks the render button,
  • equation$ which emits a quadric function’s coefficients when a user interacts with the form,
  • graph$ which provide a quadric function’s coefficients when a user clicks the render button.

In the next step you need to create the above-mentioned observables.

The first stream is quite easy to compose, since you can use fromEvent operator, which ships with RxJS library, and indicate which event on which target element will trigger value emission. In addition, I used debounceTime operator to avoid unnecessary rendering everytime an incessant user clicks the render button. As a result, I was only interested in an event after which there was a pristine period of time defined in DEBOUNCE_TIME constant.

The second stream makes use of the first celebrity of the article: combineLatest operator. Each coefficient input in the form is a source of values that together define a quadric function’s coefficients. So I had to somehow subscribe to all these streams and get the resulting value in a form of array. How do I got an observable for each input? The magic happens in getCoeffObservable function. It creates an observable for each input using a built-in fromEvent operator and applies two operators via pipe. The first one is a custom operator that doesn’t ship with RxJS library and was defined above the function. Let’s see what the uniqueSettledInputValue operator does. Firstly, it waits specified time after a user interacts with an input to emit new value. Next, InputEvent object is mapped to the value it contains which is crucial to proper usage of distinctUntilChanged operator, which stop value propagation, if it is the same as the last emitted one. If the order was swapped and distinctUntilChanged was called with a stream of InputEvent objects, every input change would be considered as a new value, since object references would be compared instead of values provided by a user. That’s it. Custom, reusable operator which can be exported and used everytime you need to deal with stream coming from an input. The startWith operator is a workaround for combineLatest peculiarity, namely that the stream returned from combineLatest will not emit a value until each stream it is composed of has emitted one.

Last but not least, graph$ observable is a stream defined with the second rock star of the paper: withLatestFrom operator. It simply says that everytime a graphBtn$ emits a value, which means a user has clicked the render button, I want to emit the value and attach the latest emitted value from equations$ stream, so that I have access to a quadric function’s parameters.

Conclusions

I hope you enjoyed the article and got familiar with the two operators. They are extremely powerful and widely used especially in applications which use ngrx/store to manage state.

Feel free to ask any questions.

If you would like me to cover any RxJS related topic in next articles just let me know!

--

--