Magic with Kotlin, RxJava and the repository pattern
After a few months of hard development at Stuart we have recently released a new version of the app for customers.
In this new version we wanted to try out some new architectural ideas, but they were too difficult to implement based on existing code. For this reason we decided to rewrite the app completely from scratch. At the same time this gave us a good opportunity to start using Kotlin and make use of all the advantages that it provides.
This new architecture had to be maintainable and easy to test, but at the same time we didn’t want to sacrifice our development speed and add undesired delays to the project. Finally we opted for a simple solution in which for the majority of screens of the app we combine the use of MVP and repository patterns as follows:
This is it? I want my money back!
Well, we would agree that until now we have not seen anything revolutionary. But the thing gets better when we see how we use the repository pattern. Because all the different repositories implemented in the app follow the next principles:
- The calls to the repository are always asynchronous.
- Callers to a method of the repository will receive via callback a single or stream of results.
- Each repository is defined by an interface and consumers of the repository do not have to worry about how it is implemented.
- A repository can have several implementations where each one has a single responsibility and can extend other implementations using the decorator pattern.
- The final configuration is determined by the dependency container (For example Dagger) that combines the different implementations.
Show me the magic
What we want to achieve is that when we do a call like this:
The callback that we pass to the method
subscribe receives the Job that it will be obtained from a cache memory or by making a request to the API. After this, the callback will receive updates of the Job every time there is a change in the backend and it’s sent to the app via WebSocket or push message. All of this without having to worry about anything else. It’s just magic :)
Therefore when the subscription is made the following tasks are performed automatically:
- Check if the Job is in the cache, and if so return it.
- If it’s not in the cache, make a request to the REST API and fill the cache with the Job that will be returned.
- Connect to the WebSocket server to receive Job updates while the subscription lasts.
- Listen for push notifications from Firebase Cloud Messaging to receive Job updates.
What’s a JobRepository?
Let’s see for example the definition of a possible interface to get the Jobs associated to a logged in customer.
Here we see how the function
getJob satisfies points 1 and 2, and returns an RxJava
Observable<Job> where a client can subscribe and receive an specific Job followed by all updates of the same.
On the other hand the function
getJobs, which also satisfies points 1 and 2, but returns a
Single<List<Job>> instead of an
Observable. This is because the Jobs list is paginated and is not possible to receive updates.
Show me an implementation of JobRepository!
Bellow we will see a first implementation of the
JobRepository interface that will obtain the results using calls to the Stuart REST API.
First we define the interface
StuartApi and let Retrofit do its magic. Next, we define
ApiJobRepository, which uses
StuartApi to make the real calls to the REST API.
You will have noticed the call to
ModelMapper::mapApiJob. This method transforms from
Job objects. We do this because we prefer to use different classes for parsing the responses from the API and for the internal logic of the app. So if the API changes, we only need to modify
ModelMapper. Apart from this, it also allows us to have a data model that does not have a 1:1 correspondence with the API, and thus can be more adapted to the functionalities of the app.
Now let’s add some cache memory
First we need a cache for our Jobs. That’s why we have defined
JobCache, but we have omitted it here to keep the article short.
In the class
CachedJobRepository, notice how the constructor accepts the parameter
delegate of the type
JobRepository to implement the decorator pattern and extend the functionality of another implementation. Here Kotlin’s delegation functionality is very useful (Note the use of syntax
by delegate), saving us many lines of code.
getJob is implemented in a way that first tries to get a Job from
JobCache, and when it doesn’t exist, the functionality is delegated to the class specified in the constructor using the RxJava operator
switchIfEmpty. When this happens it captures the result using the operator
doOnSuccess and the cache is filled by calling
getJobs on the other hand does not return any cached results, but it listens for updated Jobs and fills the cache as well.
Time for push notifications!
We have defined a
PushUpdates interface that can be implemented for each of the different push notification sources available. This source can be for example WebSockets or Firebase Cloud Messaging.
Note also how in the implementation of
getJob from class
SubscribedJobRepository first we get the Job from the
delegate and then we concatenate the updates that come from
PushUpdates publishes updates for all Jobs, this should be filtered by
jobId with the operator
PushUpdates for WebSockets
The first class we see here is
WebSocketPushUpdates, and it’s the one that implements
PushUpdates. To do this, it exposes
jobSubject as an
Observable. Additionally it listens for subscriptions on
doOnSubscribe operator and starts or stops the service
WebSocketService depending on whether someone is subscribed or not. When
onServiceConnected that will be responsible for redirect the Job updates to
WebSocketService is an Android Service that, creates a connection to the WebSocket when it’s started and publishes the received Job updates from the server.
Let’s add FCM as well to have various sources
First we see the object
FirebasePushUpdates that implements
PushUpdates in a very simple way.
Below we have
FirebaseMessagingService, that receives the push notifications from Firebase. When a notification arrives, the method
onMessageReceived is called where the Job is extracted from the message and sent to
And finally in order to be able to combine several implementations of
PushUpdates we have created the class
MergePushUpdates that receives in the constructor a list of
PushUpdates and combines them using the operator
How is it all connected?
First we configure our
JobRepository in a kind of homemade dependency injection container that we have created as an example. Notice how we are connecting the different
JobRepository implementations among them like it was a pipeline. In this way we are extending the functionality of the previous ones that we decorate.
In a real case it would be more recommendable to use something like Dagger instead of our homemade
DependencyContainer. Maybe in a small sized app is fine to use a custom solution. But on a project that can grow considerably, configure all the dependencies can be annoying if you don’t have the right tool.
Finally we see how we use our
JobRepository in the example Activity
onResume is called the subscription is made and all the magic happens.
In this article we saw how the decorator pattern allows you to have testable repository classes that follow the SOLID principles. Since they are like pieces of a pipeline it’s really easy to add or remove functionality when is needed.
Although in all the examples we use RxJava, we could use any other implementation of Reactive Streams or mechanism similar to the observer pattern. Here RxJava is just a tool, and to be honest it’s a very good one, but what matters most is how we use it.
Like what you see? Join us, we’re hiring. 🚀