Best RxJava operators for REST applications in Android

Damian Michalak
Appunite Labs
Published in
4 min readNov 17, 2017

There are many different operators in standard RxJava package. Some of them are really robust and complicated to use, others rather simple. But there is one thing that many RxJava operators have in common:

Most of them you will never use

As everyday Android developer who does all the stuff in RxJava, many times I strived to use zip() operator, and every time I failed to do it. I’ve always found something better than it, or a situation that this operator will not cover. I’m not saying that zip() has no usages at all, someone might like it and if that works for you — That’s great. But let’s discuss some operators that I find super useful, and they’re great & easy to use in REST based application.

And here they are:

  • share()
  • replay(1).refCount()

If you already know what they are doing, you might as well leave a clap for me and finish reading at this point.

Hot or Cold ?

One of the most important thing that often is hard to understand is whether the observable is Hot or Cold. There has been many great articles explaining it and I have no intent to do it again, instead I will show you examples of how it works in practice.

After all, does it matter if your call some observable Hot, Cold or Warm?

No.

All that matters is: if it does the job.

In general you may need two kind of observables:

  • observable that remembers last emitted value, and emits it to all new subscribers,
  • observable that does not remember its last emitted value.

Talk is cheap. Show me the code.

Let’s say that in our application we want to download some data and display it. Let’s imagine the easiest way to do it:

val usersObservable = service.getUsers()
.subscribeOn(networkScheduler)
.observeOn(UiScheduler)
.subscribe{ view.update(it) }
.subscribe{ view.update(it) }

There. Now let’s add error handling:

val usersObservable = service.getUsers()
.subscribeOn(networkScheduler)
.observeOn(UiScheduler)
usersObservable
.filter{ it.isNotError() }
.subscribe{ view.update(it) }
usersObservable
.filter{ it.isError() }
.subscribe{ view.showErrorMessage() }

Great. But also let’s add a progress event and empty list for the best UX:

val usersObservable = service.getUsers()
.subscribeOn(networkScheduler)
.observeOn(UiScheduler)
usersObservable
.filter{ it.isNotError() }
.subscribe{ view.update(it) }
usersObservable
.filter{ it.isError() }
.subscribe{ view.showErrorMessage() }
usersObservable
.map(false)
.startWith(true)
.subscribe{ progressLoading.visibility = it }
usersObservable
.map(it.isEmpty())
.startWith(false)
.subscribe{ emptyMessage.visibility = it}

Now… is there something wrong in this code? We can test it.

@Test
fun test() {
val usersOrError = Observable.just(listOf("user1", "user2"))
.mergeWith(Observable.never())
.doOnNext { println(it) }

usersOrError.subscribe()
usersOrError.subscribe()
usersOrError.subscribe()
usersOrError.subscribe()

}

In the above test, there isObservable.just() instead of a REST request. Why mergeWith(never())? Because we don’t want our observable to complete before each subscriber has a chance to subscribe to it. Similar situation (never-ending observable) can be noticed when some request is triggered by user click input. This case will be covered later in the article. Also the four observables used in previous example were simplified to subscribe()only. We can ignore schedulers part, since everything happens in one thread. The final result is:

[user1, user2]
[user1, user2]
[user1, user2]
[user1, user2]

Each subscription to usersOrError observable has triggered println() which means that in real life application we just triggered four requests instead of one. This can be a very dangerous situation. Imagine if instead of potentially harmless GET request, we would make POST or call some other method that changes the state of data or application. The same request will be executed four times and for example four identical posts or comments will be created.

Luckily, we can easily fix it by adding replay(1).refCount().

@Test
fun `test replay refCount operators`() {
val usersOrError = Observable.just(listOf("user1", "user2"))
.mergeWith(Observable.never())
.doOnNext { println(it) }
.replay(1)
.refCount()

usersOrError.subscribe()
usersOrError.subscribe()
usersOrError.subscribe()
usersOrError.subscribe()

}

Result of this test is:

[user1, user2]

Great, we successfully shared our subscription between all subscribers. Now there is no threat of making unnecessary multiple requests. Lets try the same observable with share() operator instead of replay(1).refCount().

@Test
fun `test share operator`() {
val usersOrError = Observable.just(listOf("user1", "user2"))
.mergeWith(Observable.never())
.doOnNext { println(it) }
.share()

usersOrError.subscribe()
usersOrError.subscribe()
usersOrError.subscribe()
usersOrError.subscribe()

}

Surprisingly (or not) the result is the same as previous:

[user1, user2]

To witness difference between share() and replay(1).refCount() let’s make two more tests. This time we will call our fake request after getting click event from the user. Click event will be mocked by aPublishSubject. Additional line:doOnNext{ println("1") } will show which subscriber got the event from usersOrError

First test will be using share() and second one replay(1).refCount.

@Test
fun `test share operator with click`() {
val clickEvent = PublishSubject.create<Any>()

val usersOrError = clickEvent
.flatMap { Observable.just(listOf("user1", "user2")) }
.share()

usersOrError.doOnNext { println("1") }.subscribe()
usersOrError.doOnNext { println("2") }.subscribe()

clickEvent.onNext(Any()) //perform click

usersOrError.doOnNext { println("3") }.subscribe()
usersOrError.doOnNext { println("4") }.subscribe()

}

Result:

1
2

@Test
fun `test replay refCount operators with click`() {
val clickEvent = PublishSubject.create<Any>()

val usersOrError = clickEvent
.flatMap { Observable.just(listOf("user1", "user2")) }
.replay(1)
.refCount()

usersOrError.doOnNext { println("1") }.subscribe()
usersOrError.doOnNext { println("2") }.subscribe()

clickEvent.onNext(Any()) //perform click

usersOrError.doOnNext { println("3") }.subscribe()
usersOrError.doOnNext { println("4") }.subscribe()

}

Result:

1
2
3
4

Conclusion

Both share() and replay(1).refCount() are important operators to handle REST requests, and much more. Every time you need the same observable in multiple places, it’s the best way to go. Just think if you want your observable to remember the latest event and pass it to each new subscriber or maybe you are interested in one-time only operation. Here are some real-life application examples:

  • getUsers() ,getPosts() or similar observable that is used to get the data will most probably usereplay(1).refCount() ,
  • updateUser(), addComment() on the other hand are one-time only operations and in this case share() will do better,
  • passing click event wrapped in Observable — RxView.clicks(view) — should also have share() operator, to be sure that the click event will be broadcasted to each subscriber.

TL;DR

  • share() -> shares the observable to all subscribers, does not emit latest value to new subscribers
  • replay(1).refCount() -> shares the observable to all subscribers and emits latest value to every new subscriber

If you like my work hit ❤ button and let me know what you think in comments.

--

--