Clean Architecture & RxJava — changing the repository source on the fly

Part 1: The basic idea

Recently we at grandcentrix had to implement a different polling logic based on a mutating state: in the default state we were fine to read data from the local cache or query the backend if that cache is empty; in another state we needed to hit the backend for updated data in a regular interval.

This mutating state could be everything in your app: location, network or bluetooth settings, time etc.

While heavily using Clean Architecture and RxJava in the app, we wanted to make the state so irrelevant as possible, not interrupting any subscriptions when the state changes, optimally just start to emit regularly to any existing subscribers. But which part of a Clean Architecture application is responsible for checking and observing that state?

Should it be the presenter that checks the state and executes the use case either once or every x seconds? Pretty fast we said no, the presenter should not know anything about the business logic. Also we needed the logic at different places, so it should be the use case to decide, right?

Well, optimally a use case only transforms data it gets from the data layer resp. the repository, applies some logic on the data, like mapping or filtering, and doesn’t decide how or when to get the data.

Further, when reading the note from Fernando Cejas about the data layer, it sounds like an awesome fit:

The idea behind all this is that the data origin is transparent for the client, which does not care if the data is coming from memory, disk or the cloud, the only truth is that the data will arrive and will be got.

So, the repository is already responsible for deciding what data gets returned from where, why not let it decide how often to return it, too? Combined with RxJava this means that our repository does a new network call every thirty seconds, pushes the new data on the upstream and neither the use case, nor the presenter have to care about the state at all, they just transform resp. display the data they receive. Even better, the repository can react on network or connectivity issues by falling back to the local cache, so no other component has to care about errors (although they could)!


The basics

Let’s assume the following repository interface:

Depending on one of two states we want to make the Observable emit either once or in an interval. Let’s call the two functions that serve the data for either state defaultStateObservable and specialStateObservable and put them in a new class:

Now, let’s make Emitter observe any state changes and call a method when the state changes:

Now we would have to subscribe to the specific Observable depending on the state. The problem is, our repository only has one method to get the data. If a subscriber is already subscribed to defaultStateObservable() and now we need to switch to specialStateObservable(), we cannot tell the subscriber to resubscribe.


Subject to the rescue!

We let the subscriber subscribe to a Subject so we don’t have to interrupt the subscription but can still control which state observable feeds the Subject! Let’s take a look:

Now we can change the data source on the fly when the state changes while our subscriber doesn’t know anything about it 😎. Each emission of one of the Observables will be delegated to the Subject, so our subscriber receives the event.

But, wait…

But subscribing to the Observables although now one requires the data right now? 🤔 Also we’re leaking the subscription as we never dispose.

Let’s fix this by subscribing and disposing to/from our Observables depending on if our Subject has a subscriber or not:

Woah, a lot of changes. Let’s go through it step by step.

When someone subscribes to our Subject by calling stateObservable()we call an action based on the subscription and disposal event. If this is the first subscriber to the Subject we also subscribe to the state Observable, if the last subscriber disposes from the Subject we also dispose from the state Observable, so we’re only subscribed when someone actually requests data.

Also, when the state changes, we now first check if we even have subscribers. If not, we do nothing, as the onSubscribeConsumer will later trigger the subscription to the internal Observable. Otherwise, for simplification, we just dispose from the current Observable and subscribe to the correct Observable for the new state.


Well, and that’s it already. Now we have a working data stream that changes the emission frequency based on the state, while any subscribers just stay subscribed.

Let’s put the parts quickly together and use the Emitter in a repository implementation:

In the next part we will have a look at how to implement defaultStateObservable() and specialStateObservable() so they can handle server and connection errors flawlessly, without interrupting the upstream, automatically retry when e.g. internet becomes available again and of course, implement the reoccurring API call.