Magic with Kotlin, RxJava and the repository pattern

Pau Picas
Stuart Tech
Published in
6 min readJun 27, 2018

After a few months of hard development at Stuart we have recently released a new version of the app for customers.

In this new version we wanted to try out some new architectural ideas, but they were too difficult to implement based on existing code. For this reason we decided to rewrite the app completely from scratch. At the same time this gave us a good opportunity to start using Kotlin and make use of all the advantages that it provides.

This new architecture had to be maintainable and easy to test, but at the same time we didn’t want to sacrifice our development speed and add undesired delays to the project. Finally we opted for a simple solution in which for the majority of screens of the app we combine the use of MVP and repository patterns as follows:

This is it? I want my money back!

Well, we would agree that until now we have not seen anything revolutionary. But the thing gets better when we see how we use the repository pattern. Because all the different repositories implemented in the app follow the next principles:

  1. The calls to the repository are always asynchronous.
  2. Callers to a method of the repository will receive via callback a single or stream of results.
  3. Each repository is defined by an interface and consumers of the repository do not have to worry about how it is implemented.
  4. A repository can have several implementations where each one has a single responsibility and can extend other implementations using the decorator pattern.
  5. The final configuration is determined by the dependency container (For example Dagger) that combines the different implementations.

Show me the magic

What we want to achieve is that when we do a call like this:

The callback that we pass to the method subscribe receives the Job that it will be obtained from a cache memory or by making a request to the API. After this, the callback will receive updates of the Job every time there is a change in the backend and it’s sent to the app via WebSocket or push message. All of this without having to worry about anything else. It’s just magic :)

Therefore when the subscription is made the following tasks are performed automatically:

  • Check if the Job is in the cache, and if so return it.
  • If it’s not in the cache, make a request to the REST API and fill the cache with the Job that will be returned.
  • Connect to the WebSocket server to receive Job updates while the subscription lasts.
  • Listen for push notifications from Firebase Cloud Messaging to receive Job updates.

What’s a JobRepository?

Let’s see for example the definition of a possible interface to get the Jobs associated to a logged in customer.

Here we see how the function getJob satisfies points 1 and 2, and returns an RxJava Observable<Job> where a client can subscribe and receive an specific Job followed by all updates of the same.

On the other hand the function getJobs, which also satisfies points 1 and 2, but returns a Single<List<Job>> instead of an Observable. This is because the Jobs list is paginated and is not possible to receive updates.

Show me an implementation of JobRepository!

Bellow we will see a first implementation of the JobRepository interface that will obtain the results using calls to the Stuart REST API.

First we define the interface StuartApi and let Retrofit do its magic. Next, we define ApiJobRepository, which uses StuartApi to make the real calls to the REST API.

You will have noticed the call to ModelMapper::mapApiJob. This method transforms from ApiJob to Job objects. We do this because we prefer to use different classes for parsing the responses from the API and for the internal logic of the app. So if the API changes, we only need to modify ModelMapper. Apart from this, it also allows us to have a data model that does not have a 1:1 correspondence with the API, and thus can be more adapted to the functionalities of the app.

Now let’s add some cache memory

First we need a cache for our Jobs. That’s why we have defined JobCache, but we have omitted it here to keep the article short.

In the class CachedJobRepository, notice how the constructor accepts the parameter delegate of the type JobRepository to implement the decorator pattern and extend the functionality of another implementation. Here Kotlin’s delegation functionality is very useful (Note the use of syntax by delegate), saving us many lines of code.

The method getJob is implemented in a way that first tries to get a Job from JobCache, and when it doesn’t exist, the functionality is delegated to the class specified in the constructor using the RxJava operator switchIfEmpty. When this happens it captures the result using the operator doOnSuccess and the cache is filled by calling jobCache.putJob().

The method getJobs on the other hand does not return any cached results, but it listens for updated Jobs and fills the cache as well.

Time for push notifications!

We have defined a PushUpdates interface that can be implemented for each of the different push notification sources available. This source can be for example WebSockets or Firebase Cloud Messaging.

Note also how in the implementation of getJob from class SubscribedJobRepository first we get the Job from the delegate and then we concatenate the updates that come from PushUpdates. Since PushUpdates publishes updates for all Jobs, this should be filtered by jobId with the operator filter.

PushUpdates for WebSockets

The first class we see here is WebSocketPushUpdates, and it’s the one that implements PushUpdates. To do this, it exposes jobSubject as an Observable. Additionally it listens for subscriptions on jobs with doOnSubscribe operator and starts or stops the service WebSocketService depending on whether someone is subscribed or not. When WebSocketService starts, ServiceConnection receives onServiceConnected that will be responsible for redirect the Job updates to jobSubject.

The class WebSocketService is an Android Service that, creates a connection to the WebSocket when it’s started and publishes the received Job updates from the server.

Let’s add FCM as well to have various sources

First we see the object FirebasePushUpdates that implements PushUpdates in a very simple way.

Below we have FirebaseMessagingService, that receives the push notifications from Firebase. When a notification arrives, the method onMessageReceived is called where the Job is extracted from the message and sent to FirebasePushUpdates.

And finally in order to be able to combine several implementations of PushUpdates we have created the class MergePushUpdates that receives in the constructor a list of PushUpdates and combines them using the operator merge.

How is it all connected?

First we configure our JobRepository in a kind of homemade dependency injection container that we have created as an example. Notice how we are connecting the different JobRepository implementations among them like it was a pipeline. In this way we are extending the functionality of the previous ones that we decorate.

In a real case it would be more recommendable to use something like Dagger instead of our homemade DependencyContainer. Maybe in a small sized app is fine to use a custom solution. But on a project that can grow considerably, configure all the dependencies can be annoying if you don’t have the right tool.

Finally we see how we use our JobRepository in the example Activity MainActivity.When onResume is called the subscription is made and all the magic happens.

Conclusions

In this article we saw how the decorator pattern allows you to have testable repository classes that follow the SOLID principles. Since they are like pieces of a pipeline it’s really easy to add or remove functionality when is needed.

Although in all the examples we use RxJava, we could use any other implementation of Reactive Streams or mechanism similar to the observer pattern. Here RxJava is just a tool, and to be honest it’s a very good one, but what matters most is how we use it.

Like what you see? Join us, we’re hiring. 🚀

--

--