Chaining multiple sources with RxJava

Miguel Juárez López
Yammer Engineering
Published in
6 min readFeb 23, 2016

The pattern for loading and displaying data for our Yammer Android app is roughly as follows:

  1. When wanting to show content query two sources: one being the disk (cache), and the other one being the network (API).
  2. Show the cached data while waiting for the network.
  3. When the network request comes back, cache it to disk, then show it.

This is a very straightforward pattern, but things can get tricky real fast when we consider certain factors:

  • Should these two background calls be made in serial or in parallel?
  • What happens if the cache is empty?
  • What about errors?
  • What happens if the network call finishes before the disk call?

I’ve been fiddling with RxJava lately since it’s all the rage in the Android community nowadays and I didn’t want to miss out on the revolution. After reading Dan Lew’s excellent article about a similar topic I decided to give it a go and implement this logic while addressing the points mentioned above by using RxJava.

Example app is available on my github: https://github.com/murki/chaining-rxjava

Serial or parallel?

There is no point in waiting for one call to return in order to fire the other one if we can just fire them both at the same time independently, and one thing RxJava excels at, it’s in helping with chaining multithreaded operations.

For this case we can use the merge operator along with the IO scheduler:

The merge operator will make sure both calls are fired independently and then emit the data once for each source as soon as each completes. The subscribeOn(Schedulers.io()) will make sure each repository call happens on a different background thread (from the same unbounded thread pool).

To double check that this logic work as described I used the excellent Frodo library to add easy logging around my Observables via annotations. I recommend you to always make sure your Observables are running in the way you expect them to while coding.

Assuming we have the following annotated methods:

The logcat should look something like this (some output omitted for brevity):

...
DiskRepository﹕ Frodo => [@Observable#getData -> @Emitted -> 1 element :: @Time -> 560 ms]
...
DiskRepository﹕ Frodo => [@Observable#getData -> @SubscribeOn -> RxCachedThreadScheduler-1 :: @ObserveOn -> RxCachedThreadScheduler-1]
...
NetworkRepository﹕ Frodo => [@Observable#getData -> @Emitted -> 1 element :: @Time -> 2446 ms]
...
NetworkRepository﹕ Frodo => [@Observable#getData -> @SubscribeOn -> RxCachedThreadScheduler-2 :: @ObserveOn -> RxCachedThreadScheduler-2]
...
DomainService﹕ Frodo => [@Observable#getMergedData -> @Emitted -> 2 elements :: @Time -> 2446 ms]
...

The main takeaways from this output are:

  • We assume parallelism is taking place since we can see that each getData() method is running on its own separate thread (RxCachedThreadScheduler-1 vs RxCachedThreadScheduler-2)
  • We confirm both getData() methods are running in parallel since the total time taken to emit the 2 elements by getMergedPhotos() is max(560, 2446) instead of sum(560,2446).
In this diagram we can assume the consumer subscribes to Observable<Data> getMergedPhotos() and updates the UI every time an emission occurs.

Success! It looks like we have achieved parallelism in a very succinct way thanks to RxJava.

Cache network data

At this point our cache will always be empty, since we’re never saving the data to disk. In order to fix this we’ll take advantage of the “side-effects” operators that RxJava offers, in particular doOnNext():

A couple things to be aware of here:

  • The saveData() method doesn’t need to be implemented as an Observable, it will automatically run in the same thread as the caller’s subscription (RxCachedThreadScheduler-2 in this case).
  • The original emission will not happen until the doOnNext() action has been completed (i.e. networkRepository will not emit any data until the caching has been done).
  • If diskRepository.saveData(data) throws an exception it will be automatically wrapped and reported to the stream’s subscriber onError() callback, keeping the interface fluent.
These diagrams are assuming diskData will return faster, but that might not always be the case.

Once again, this was very simple to accomplish thanks to RxJava.

Ignoring the empty cache

If the cache is empty we probably want to ignore it and don’t emit any data at all. For this we can use the Filter operator:

Here we’re just filtering data results that are null from both sources. In the case of DiskRepository this should happen when the cache is empty.

[If you’re not a fan of using nulls to denote the absence of a value, I recommend you to look into replacing them with Optional<T>]

Error Handling

It’s worth noting that any exception happening in the chain would interrupt the entire flow of data (e.g. an error when retrieving the network data would also stop showing the cache). Depending on your requirements this might not be desirable.

For these cases RxJava offers a set of error-recovery operators that allow you to return default values when certain exceptions occur, the simplest of these operators being onErrorReturn.

UPDATE: At the time of writing the original article RxJava (v.1.0.16) didn’t really have a good way to delay asynchronous errors on a stream. The functionality was fixed in v.1.1.1, so I have updated this article to make use of this technique.

But instead of just blindly swallowing the errors we are going to make use of the mergeDelayError operator. The documentation states that it “combines multiple Observables into one, allowing error-free Observables to continue before propagating errors” which sounds exactly like what we need:

Adding this operator would ensure that if an Exception is thrown by diskRepository.getData(), it won’t interrupt the stream. Unfortunately RxJava had a bug that if any Exceptions were being thrown later in the stream (networkRepository.getData() in this case) they would incorrectly cut ahead of the successful emissions and break the flow. In order to fix this, an overload was added in version 1.1.1 for observeOn(Scheduler scheduler, boolean delayError) in order to signal the Scheduler to respect the delaying of errors.

Only the freshest

Just showing the data in the order it is emitted by the Observable can get us into trouble: if for some reason the network call would finish before the disk call, the UI would then be showing the data from the API first, but replacing it by the older disk data later.

This can be avoided by time-stamping the data after the network call returns. The Timestamp operator helps us mark the data with the exact DateTimeOffset in which it was emitted, and if we store these timestamps along with the data, we can then use it to filter out data that is not relevant.

As mentioned above, the consumer (in this case the view) has to make sure to use the overload of observeOn while passing delayError=true

This is where things get a little hairy. As you can see, almost all the calls deal now with Timestamp<Data>, the only method that still returns plain Data is NetworkRepository since this is the one to which we’re attaching the timestamp() operator on. This will effectively transform the Data obtained from the network to Data that has now a DateTimeOffset associated with it.

The second part of the equation is having a way to let our filter query the timestamp of the data that is already being displayed, that way if the network call was shown first, the disk data will be later ignored (filtered out) when it arrives. In the example above the IDisplayedData can be anything that has access to the timestamp of the data being shown in the view (Fragment, Activity, Adapter, etc).

So now our filter is also answering the question is the data arriving now newer than what we’re already showing?

With this we also get the extra benefit of having implemented the UI refresh correctly; if our view is already showing data that has been cached to disk and the user triggers a refresh, the filter will make us ignore the cached data altogether and only show the fresh network data.

Show me the code

If all this is too confusing to follow, a full-fledged real-world example is available on my github repo: https://github.com/murki/chaining-rxjava

I used Retrofit for implementing the network repository, and a basic SharedPreferences-based implementation for the disk repository. Additionally I make use of RxJava’s map operator to transform the data model to a view model.

I’m an Android dev at Engineering Yammer

--

--