RxJava/RxAndroid : Load from cache before api call using Observable.concat()

I am so new on RxJava and RxAndroid. We have a radio application that we currently working on it. Using Event Bus is a good solution to send data to different layers. But we see that implementing RxJava to our project will save us from busses, threads, sync and more. There are bunch of RxJava tutorial on internet. So I am not going to dive deep on RxJava. I will just show something that I have just learned.

By the way, as I said it before, I am so new on RxJava. RxJava has hundreds of operator to use in specific case. If my method and implementation is wrong, please don’t hesitate to give feedback.

Show Me The Code

Scenario : We want to load Radio List from web service and save list to Local DB using ActiveAndroid. This is the first time app launch scenario. If we already have radio list saved on DB, load it from DB first, return it to UI to show immediately. Sequentially call api to load updated radio list, save it to DB and notify UI and update showing list softly.

DataSource Operations

getRadioList() method return Observable Stream. This is the key point here.

ApiSource Operations

Controller Operations

For now, We have ApiSource and DataSource Observable streams. Now it is time to control over streams.

As you see Controller class, We load from DB and keep that observable on variable radioListDB.

Observable<RadioWrapper> radioListDB =  databaseSource.getRadioList()
.filter(radioWrapper -> radioWrapper.radioList.size() > 0)
.subscribeOn(Schedulers.computation());

Filter operator checks if returned lists size is bigger than zero. We don’t want empty list.

Database operation should be done on Schedulers.computation(). That is why we subscribeOn that scheduler.
Observable<RadioWrapper> radioListApi = apiSource.getRadioList()
.subscribeOn(Schedulers.io());

This Observable is only responsible to getRadioList from Api and execute it on IO thread. Because we used Schedulers.io(). But we wanted to save updated list to DB too. This is why we added map() operator.

Observable<RadioWrapper> radioListApi = apiSource.getRadioList()
.map(radioWrapper -> {

Observable.create(subscriber -> {
databaseSource.save(radioWrapper);
subscriber.onCompleted();
}).subscribeOn(Schedulers.computation()).subscribe();

return radioWrapper;
})

.subscribeOn(Schedulers.io());

While Observable is streaming, We create new observable to perform save operation, subscribe it to itself and we do the operation by using Schedulers.computation(). And we return radioWrapper observable to keep streaming.

It will be better if I show you how concat logic works. A picture is worth a thousand words.

We combine two Observable into one.When first observable completes, then seconds observable starts streaming.
return Observable
.concat(radioListDB, radioListApi)
.observeOn(AndroidSchedulers.mainThread());

We concat radioListDB observable and radioListApi observable. And return combined observable. We are going to update UI with that combined observable. That is why I added observeOn(AndroidSchedulers.mainThread()) Otherwise error will occur.

Now It is time to Subscribe this pretty Observable. It is waiting for us to streaming. Don’t hold it more, Let it stream.

radioListView.showLoading();
subscription = getRadioListUsecase.getRadioList()
.subscribe(radioWrapper -> {
radioListView.onListLoaded(radioWrapper);
radioListView.dismissLoading();
});

When we subscribe that combined observable. If there is data on DB, radio list will appear immediatelly and load updated list after api call completed.

This is how our caching mechanism works. If you have any suggestion about editing my Observables, please let me know. I am new on RxJava. Maybe after some experience, I will update this post(If I found better way to do it)