Combining Realm, RxJava, Retrofit and Kotlin to create a data-safe server request chain in Android— Part 1

Recently I’ve completed an overhaul of the way we download data from our server to our application. It included the combination of Retrofit requests, using RxJava to chain the operations along, saving the data to RealmDB and cleaning things up with Kotlin extension functions.

I wanted to achieve several things during this refactor:

  1. Find a way to smoothly combine RealmDB’s transactions and Retrofit’s requests with RxJava’s operations.
  2. Create an infrastructure for additional “download handlers” which will be added in the future, and make the current ones more robust and open for enhancements.
  3. Ensure that all the requests are executed first, and only once all the downloaded information has been verified and processed, will the current DB be cleared and updated.
  4. Ensure that should the “request chain” fail, the existing data will not be compromised.
  5. Make error handling simpler, and ensure the logs are more concise.

I will cover these goals over multiple articles, but we’ll go over the first one here: How can we get RealmDB’s transactions and Retrofit’s requests to work with RxJava’s operations?

Everything is Single

Let’s show a basic frame of what we hope to achieve here:

class UsersRequestHandler {
fun getUsers() : Single<RealmUsers> {
return queryUsersAsync()
.flatMap { saveUsersToRealmAsync() }
}
}

In this sample we have 2 asynchronous operations chained together:

  1. First we execute an async server request for the results.
  2. Following that, we use the flatMap, which accepts a lambda expression which results in another async operation.

The flatMap method is the key to our goal. It allows us to stack together, or “chain” together multiple asynchronous operations, or Singles, and have them execute sequentially. Should one of the Singles in the chain fail, the chain will stop, and the onError callback will be invoked in the subscription.

We want to convert the operation in each library to a Single and chain them all together using flatMap.

Those of you who are familiar with RxJava might ask “Why should we use Single and not Observable”?

The answer is: Observable is designed to emit multiple events, while Single is designed to emit only one. Since we’re starting our chain from a request which either succeeds or fails, the Single class is perfect for our needs since it’s meant to do exactly that — execute a single operation which either returns a result or fails.

With that out of the way, let’s get to our libraries.

RxJava already has a nice way to interact with Retrofit, so we’ll review it first:

A retrofit request would usually look like this:

interface UsersRequest {
@GET("users")
fun getUsers(): Call<List<User>>;
}

Here we have a request, which when executed will give us a result through Retrofit’s Callback interface which we can enqueue to the request. Now, let’s create a factory class to instantiate this request:

object RetroFactory {
private val okHttpClient = OkHttpClient.Builder()
.build()

@JvmStatic
fun <T> client(clientClass: Class<T>): T {
val client = okHttpClient.newBuilder()
.build()
val builder = Retrofit.Builder()
.baseUrl(SERVER_URL)
.client(client)
.addConverterFactory(GsonConverterFactory.create(GsonBuilder()
.create()))
.build()

return builder.create(client)
}

@JvmStatic
inline fun <reified T> client(): T {
return client(T::class.java)
}
}

Notice that the second parameterless method is declared separately because inline methods can’t access private members like the okHttmpClient so it’s added for cleaner use.

Now, we can invoke the request like so:

val request = RetroFactory.client<UsersRequest>()
val response = requests.getUsers()
respone.enqueue(
object : Callback<String> {
override fun onFailure(call: Call<String>, t: Throwable) {
// handle the error
}

override fun onResponse(call: Call<String>, response: retrofit2.Response<String>) {
// handle the response
}
})

While this sample works on its own, we want to perform these operations using RxJava. Here’s how we’ll do it:

First, we’ll change the request so instead of a Call result we’ll get Single:

@GET("users")
fun getUsers(): Single<List<User>>;

Secondly, we’ll add an RxJava call factory to our retrofit factory (otherwise, our application would crash!)

val builder = Retrofit.Builder()
.baseUrl(SERVER_URL)
.client(client)
.addConverterFactory(GsonConverterFactory.create(GsonBuilder()
.create()))
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build()

Now we can handle the request like this:

val request = RetroFactory.client<UsersRequest>()
requests.getUsers()
.subscribe({
// handle success
},{
// handle error
})

Much cleaner!

Note - that if you stop reading here and want to invoke the request, know that it must not be from the main thread, as you would get an exception and crash your application, so you should switch the request to a new thread, like this:

Single.just(false)
.observeOn(Schedulers.newThread())
.flatMap {
val request = RetroUtils.client<UsersRequest>()
request.getUsers()
}.subscribe({
//handle success
},{
//handle error
})

Now we can start working on RealmDB — which sadly doesn’t interface with RxJava when it comes to transactions, but it does have an async transaction api which can be invoked using executeTransacitonAsync. It looks like this:

realmInstance.executeTransactionAsync({
// transaction goes here like
// it.insert(someRealmModel)
}, {
// handle success
}, {
// handle failure
})

Looks a lot like a Rx subscription method already, doesn’t it? We have a transaction, and a callback for success and failure. So let’s wrap a Rx Single around it!

Single.create<Boolean> { emitter ->
realmInstance.executeTransactionAsync({

}
, {
emitter.onSuccess(true)
}, {
emitter.onError(it)
})
}

To those unfamiliar, Single's create method accepts an expression with an emitter as a parameter. The resulting Single will have its onNext or onError invoked whenever the emitter calls for its success method or its error method — perfect for wrapping transactions.

Now let’s just clean it up and write it as an extension function for easier use:

fun Realm.executeAsync(transaction: (Realm) -> Unit): Single<Boolean> {
return Single.create<Boolean> { emitter ->
executeTransactionAsync(transaction, {
emitter.onSuccess(true)
}, {
emitter.onError(it)
})
}
}

With this simple-to-use method we can execute async transactions and have them handled using Single. But there is one problem, RealmDB transactions must be executed on a thread with a looper. To solve this, we will invoke the created Single on a thread with the main looper:

fun Realm.executeAsyncOnMainThread(transaction: (Realm) -> Unit): Single<Boolean> {
return Single.just(true)
.observeOn(AndroidSchedulers.from(Looper.getMainLooper()))
.flatMap { _ ->
Single.create<Boolean> { emitter ->
executeTransactionAsync(transaction, {
emitter.onSuccess(true)
}, {
emitter.onError(it)
})
}
}
}

While this solution works fine, we will provide a better answer for our specific architecture later on.

Realm, Rx and Retrofit… ASSEMBLE!

Now let’s see how all 3 libraries work together:

class UsersRequestHandler {
fun getUsers() : Single<RealmUsers> {
val client = RetroFactory.client<UsersRequest>()

return Single.just(false)
.observeOn(Schedulers.newThread())
.flatMap { client.getUsers() }
.flatMap { results ->
Realm.getDefaultInstance().executeAsync { r ->
r.insert(it)
}
}
}

interface UsersRequest {
@GET("users")
fun getUsers(): Single<List<RealmUsers>>
}
}

And to using the handler:

fun loginUser() {
UsersRequestHandler().getUsers()
.observeOn(AndroidSchedulers.mainThread())
.subscribe ({
// move user along
},{
// handle error
})
}

This concludes the first part of this series, part 2 will be coming soon!