The magic of ReactiveX: debouncing inputs to async functions

Sam Parkinson
Mettle Adventures
Published in
7 min readApr 6, 2018

When a function is “expensive” we want to try to reduce how many times we perform that operation. When the input varies quickly a common way to reduce the number of function calls is to “debounce” the input. However, when the function is async this creates some difficulties in providing an efficient solution and also a good user experience. This blog post provides our solution to this problem using the fantastic stream library ReactiveX.

We recently created a product which contained many tables of data. Each of these tables was paginated, sortable, and filterable by searching for text contained in any of the columns displayed in the table. Each time a change was made to the pagination, sorting, or filtering, a REST API call was required to fetch the new data. We needed a solution to this problem that was simple, easily testable, robust, and efficient.

In this case the operation was expensive due to a requirement for network and server resources on each API call. Some other examples of where we may want to debounce the inputs to an “expensive” operation include:

  • Zooming, scrolling on a map or large tiled image triggering fetching of tiles from an API and rendering them
  • Rendering of images or maps in scrolling views
  • Autocomplete search / input boxes that require API calls

We can solve this with ReactiveX!

Enter my current favourite software library: ReactiveX. ReactiveX is a library for functional programming with asynchronous data streams.

ReactiveX provides tools for creating, manipulating, combining, splitting and filtering these streams of information, and allows a programmer to work with these asynchronous streams functionally. With ReactiveX, anything can become a stream and the functional programming approach often removes the need for any state, which generally leads to simpler, and more robust code.

Building functional pipelines of stream creation, filtering and manipulation takes some serious thought, but the results are worth it. In our experience, code handling streams of async data is more stable and is easier to debug once it has been created with ReactiveX.

Also brilliantly, ReactiveX is implemented in most of the popular coding languages. At Mettle we work cross-platform and commonly use Java, Kotlin, Swift, Python and JS. All of the above have implementations of ReactiveX, and so anything we learn can be applied across all of our projects!

I am about to get into the meat of the problem here. This article assumes you have a decent knowledge of Observables and the ReactiveX framework. If you are new to Observables or ReactiveX, I highly recommend checking out this link https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 and having a play first.

A working example

I have created an example project that demonstrates the techniques described in this article, and it is available for you to look at here.

This example project renders a simple list view component that connects to the awesome Star Wars API. It has controls for pagination, changing the data source, and searching in the API. All of these inputs create streams for our ReactiveX pipeline.

We use Marble Diagrams to visualise inputs and desired outputs when using ReactiveX

The marble diagram

I always start my ReactiveX implementations with a marble digram of the input streams, and the desired output. Our marble diagram of inputs and desired outputs looks something like this:

This would be ideal. How do we create an RX pipeline to do this?

Some important features of this debounced async function:

  • We do not want to miss any user inputs: The most recent set of inputs must always produce a result
  • We only want to show current and relevant results: Current results should always be related to the inputs as they are right now, or be null.
  • We want to reduce the number of calls to our async function: Rapid changes in inputs should not result in rapid calls of our async function
  • We would like abandoned calls of the async function to cancel gracefully and efficiently: Ideally, any background processes or network calls that are abandoned will exit with the least use of further resources.
  • We want to obtain responses as quickly as possible: We would like to use this function for all input events (which may be user interaction with pagination controls as well as a search box as in our example code), and we would prefer it if only rapid changes in input are debounced.

So what is our approach to get us from our input events to our output?

Step 1: Combining throttle, debounce and distinctUntilChanged to produce a rapid response, debounced input stream.

A commonly documented approach to the problem of debouncing an input stream, but retaining a rapid response to individual changes in input, is to combine the throttle and debounce operators. Read a blog post that describes this approach here.

Our nicely debounced synchronous input stream

This has significantly reduced the rate of change of input values. For non-async problems this is often a solid approach to solving this problem. However, if we add an async process on the end of this some problems appear.

Step 2: Use map and switch to perform the async operations and always obtain the latest output.

We first need to map our inputs to a set of observables that are managing the async function call. The output of the observables will be the async response to a set of input values. Then we need to do something with the output from these mapped observables. We will first investigate performing a flatmap of the debounced input values to observables.

Throw in an async function and we hit some problems!

This approach has three problems:

  1. Async function calls may take different amounts of time to complete, which could cause outputs to be received in the wrong order (results 3 & 4 are in the wrong order).
  2. All async operations have been allowed to run to completion, even if the results are not needed.
  3. Many of the responses do not relate to the inputs as they are right now, (e.g. response 2 arrives after input 3) which will often lead to a misleading user experience.

We can tackle the first issue, and to some extent the second, by changing our flatmap for a map and switch in series as shown below:

This got rid of a nasty bug!

Another great property of using the switch is that dropped observables will be automatically unobserved and will call any cancelling procedures defined in the observable. We can use this to cancel incomplete network requests or background tasks, perfect! We use this in the demo code to cancel axios HTTP requests that are not needed anymore.

So we are getting close, but we still have some problems. Reviewing the input and output streams from above:

Getting close… but as a user we are still see some strange results

The output from (2) is received after the input has changed to (3). This will cause a confusing user experience in many cases. (2) and (3) may relate to searches for ‘Luke’ and ‘Anakin’ respectively. Showing responses for ‘Luke’ after the user had searched for ‘Anakin’ would lead to some confusion!

Step 3: Create a cancelling stream from the original input stream to cancel redundant and outdated requests.

Finally, to clear up the issues above, we add a cancelling stream to our function. In the description of this below:

  • we delay our debounced/throttled input stream by 1 ms before mapping it to our async function call,
  • we map each change in input to an observable which returns a single null value immediately,
  • we merge the single null value observables with the async function call observables before our switch,
  • and finally, we add another distinctUntilChanged to tidy up.

Why? All will become clear below!

Looks like we might be there!

Our cancelling observables slide into the stream of async function observables immediately when the input values change. This cancels any redundant requests and also provides us with a null output, which we can use to update our UI into a suitable loading state as appropriate.

Now we can compare this to what we were aiming for:

Not bad! I reckon this will do

Not at all bad. Obviously the debouncing and delaying of the input stream has resulted in some slightly longer waits for data (note the delayed result from input 9), and this has also meant that some results are not returned before the input changes again (note the missing result from input 3), but in reality the effect is minimal.

In our example code which is written in JS, the formulation looks like this:

const debounceTime = 500
// merge debounce and throttle to get desired instant, debounced behaviour
const debouncedThrottledInputStream = Rx.Observable
.merge(
inputStream.debounceTime(debounceTime),
inputStream.throttleTime(debounceTime),
)
.distinctUntilChanged()
// map changing inputs to axios request after delaying by 1ms
const asyncObservablesStream = debouncedThrottledInputStream
.delay(1)
.map(inputs =>
Rx.Observable.create(observer => {
const CancelToken = axios.CancelToken;
const source = CancelToken.source();
axios.get(
`https://swapi.co/api/${inputs[0]}`,
{
params: { page: inputs[2], search: inputs[1] },
cancelToken: source.token,
}
)
.then(response => observer.next(response))
.catch(error => observer.error(error))
return () => {
// cancel request when disposed
source.cancel('Operation canceled by a new input.')
}
})
)
// map input changes to just(null)
const cancellingStream = inputStream
.map(inputs => Rx.Observable.of(null))
// merge, switch and subscribe
this.requests$ = Rx.Observable
.merge(
asyncObservablesStream,
cancellingStream
)
.switch()
.subscribe(this.updateFromResponse, console.error)

I am sure this could be improved further, so am very keen to hear your thoughts and suggestions… comment away! Please also download the example code and use as you will.

--

--

Sam Parkinson
Mettle Adventures

I am a Director, and the CTO of Mettle Studio Ltd. I love anything related to Software Engineering and Data Science