Repository pattern using RxJava and Room
Mobile apps often have multiple screens that use data from the same API endpoint. This presents two potential difficulties:
- When should we load the data from the cache and when from the network?
- If another screen modifies the data, how do the other screens update themselves?
Using RxJava and Room we can create a repository pattern that solves both of the above issues. RxJava allows us to combine data from multiple sources and Room allows us to monitor the database and emit changes to a particular query.
To show this pattern in action imagine a simple weather app. The app has 3 tabs that show you today’s temperature, the wind speed and another tab to change your location.

All screens use data from the same API call. We want to call this endpoint only once, cache it and then allow that data to be used by all 3 tabs. If the user changes their location we should retrieve new data, and send it to all the tabs that are subscribed toWeatherData
.
//http://api.openweathermap.org/data/2.5/
The Repository
The WeatherRepository is the access point for the data. It is used by the presenters to display the data on the screen. We create 3 interactors for the network, database and memory and they all contain a method getWeatherData()
which returns an observable of the WeatherData
that they currently hold. The repository loads the data from memory or the database if available and if not, it calls for it from the network.
public Observable<WeatherData> getWeatherData() {
Observable<WeatherData> memoryObservable = memoryInteractor.getWeatherData().toObservable()
Observable<WeatherData> databaseObservable = databaseInteractor.getWeatherData(currentLocationName).toObservable()
Observable<WeatherData> networkObservable = networkInteractor.getWeatherData(currentLocationName).toObservable()Observable.concat(memoryObservable, databaseObservable, networkObservable))
.filter(data -> data.name.equals(currentLocationName))
.filter(WeatherData::isDataInDate)
.firstElement()
.subscribe((weatherData) -> {}, this::handleNonHttpException)
return memoryInteractor.getWeatherDataObservable()
}
Observable.concat()
combines all the observables items in serial. Dan Lew’s article explains how firstElement() will only subscribe to an observable if needed to, in order to emit the first item. Therefore if memoryObservable or databaseObservable emits an item that is in date, the network will not get called. We then return the memory observable to listen to further changes to the data.
As all 3 tabs are created at the same time, we will call this method simultaneously. To stop the network being called 3 times we need to add a check to this method whether a network call is already in progress.
//If null we haven't made a call to the network yet
//If disposed the network call is complete
if (dataProviderDisposable == null || dataProviderDisposable.isDisposed()) {
dataProviderDisposable = Observable.concat(...)
}
return memoryInteractor.getWeatherData()
We hold the observable that made the last check for data and store it in a disposable dataProviderDisposable
. If this has been disposed of, it means the observable’s stream has finished and we returned data from the cache, the network call succeeded or failed and we should therefore check again.
The Interactors
Network Interactor
If there is no cache, we subscribe to the network interactor. When called it converts the response from the API into our WeatherData
object, then save it to the database and into memory so that it is cached.
public Single<WeatherData> getWeatherData(String city) {
return apiService.getWeather(city)
.map(WeatherData::copyFromResponse)
.doOnSuccess(data -> sessionService.saveLocation(data.name))
.doOnSuccess(databaseInteractor::saveData)
.doOnSuccess(memoryInteractor::saveData);
}
}
Retrofit provides a handy adapter to convert our API calls into observables, which makes creating our network observable much easier.
public interface ApiService {@GET("weather")
Single<CityForecastResponse> getWeather(@Query("q") String city);}
Database Interactor
If there is no data in memory but there is in the database we subscribe to the database interactor. We then save it into memory, so the next time we access the data it is passed from memory.
public Observable<WeatherData> getWeatherData(String name) {
return weatherDatabase.weatherDao().getWeather(name).toObservable()
.subscribeOn(Schedulers.io())
.doOnNext(memoryInteractor::saveData);
}
Room allows us to define our queries in the form of observables. We use a Maybe
that will emit the value if it exists, or complete without an error if it doesn’t.
@Query("SELECT * FROM WeatherData WHERE name = :name")
Maybe<WeatherData> getWeather(String name);
Alternatively Room allows us to return an observable that emits every time the data in the database changes, this can be very useful if you only use a database cache.
Memory Interactor
We have to implement the memory observable ourselves, using a RxJava Subject. This allows us to create an observable that we can call onNext()
after creation. Specifically we will use a BehaviorSubject that when subscribed will emit the latest object (if any), then all subsequent emissions after the subscription.
The saveData()
method stores the WeatherData
in memory and calls onNext on the subject that will be alive for as long as the application is running.
BehaviorSubject<WeatherData> observable;WeatherData weatherData;public WeatherMemoryInteractorImpl() {
observable = BehaviorSubject.create();
}public void saveData(WeatherData weatherData) {
this.weatherData = weatherData;
observable.onNext(weatherData);
}
Our getWeatherData()
method returns a Maybe
that emits data if it exists and completes without an error if it doesn’t.
public Maybe<WeatherData> getWeatherData() {
return weatherData == null ? Maybe.empty() : Maybe.just(weatherData);
}
We create another method getWeatherDataObservable()
that emits the latest value and any subsequent changes to the data. As we are using BehaviourSubject, this instantly emits the last item emitted on subscription.
public Observable<WeatherData> getWeatherDataObservable() {
return observable;
}
Presenter
Now we have created our repositories and interactors. How can we retrieve WeatherData
?

When accessing data use the WeatherRepository. This provides us with data from the fastest data source available and then returns the observable on the memory which will emit changes to the data.
weatherRepository.getWeatherData()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(view::showWeather)
When we want to retrieve new data, use the WeatherNetworkInteractor which always call the network. This is used when a user changes their location and we want to load new data. It will save to the cache and emit the new values to all the observers listening to WeatherData
from the repository.
networkInteractor.getWeatherData(name)
.subscribe(data -> {
//Don't do anything - the repository will update the data
}, error -> view.showError())
Conclusion
This is one example of how you create a reactive data layer using RxJava and Room. This approach is made to be customisable, so you can modify it to fit your particular use case. The memory and database interactor may not be needed at the same time, as some data models may not need to be queried and only stored in memory. Please feel free to check out the code for the weather app here: