Unit Testing asynchronous RxJava code (in Kotlin)

Paulina Sadowska
6 min readMay 22, 2018

--

RxJava is a great tool. But if you use it for some time you probably know how many traps are they and how easy is to use it the wrong way. That’s why it’s so important to write unit tests for every piece of your code, especially if it has some complicated logic for asynchronous events.

Of course, testing asynchronous code is not easy. So in this article I wan’t to show you some simple tools and techniques that will help you to write unit tests of asynchronous events with ease.

source

Schedulers

Let’s say we want to test a simple function from our presenter which gets data from an API and passes it to the view. This method can look like this:

fun getSomeData() {
service.getSomeRemoteData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(
onSuccess = { view.showData(it) },
onError = { view.showError() }
)
}

It’ll of course work as expected but the problem will occur if we try to test this method. Even if we mock service.getSomeData() so it returns data immediately the test won’t pass because of this exception:

java.lang.RuntimeException: Method getMainLooper in android.os.Looper not mocked. 

And even if we don’t use mainThread scheduler the problem remains. Using io or computation scheduler will cause calling the service method on the different thread what may cause the test to fail unexpectedly. This happens because in that case code defined in the service.getSomeRemoteData() may (but don’t always have to!) run after assertions in the test.

Using the trampoline

When writing unit tests we should definitely use different schedulers than in regular code. The simplest scheduler that is very useful in tests is trampoline scheduler, available by calling Schedulers.trampoline(). It’s extremely simple: it executes all tasks in a FIFO manner on one of the participating threads. This means that if I replace all schedulers in the getSomeData() method with Schedulers.trampoline() call all the operations will be called on the same thread one after another.

Passing schedulers to presenter

But how can we pass that scheduler to the presenter? There is a way to globally alter the behavior of the scheduler in tests via RxJavaPlugins but it comes with its own shortcomings. The most scalable and convenient approach is to explicitly pass schedulers whenever possible.

To make my code simpler in every project I create interface BaseSchedulerProvider and classes which wrap real schedulers. Then I pass one of them to presenters using Dependency Injection.

interface BaseSchedulerProvider {
fun io(): Scheduler
fun computation(): Scheduler
fun ui(): Scheduler
}

class SchedulerProvider : BaseSchedulerProvider {
override fun computation() = Schedulers.computation()
override fun ui() = AndroidSchedulers.mainThread()
override fun io() = Schedulers.io()
}

class TrampolineSchedulerProvider : BaseSchedulerProvider {
override fun computation() = Schedulers.trampoline()
override fun ui() = Schedulers.trampoline()
override fun io() = Schedulers.trampoline()
}

class TestSchedulerProvider(private val scheduler: TestScheduler) : BaseSchedulerProvider {
override fun computation() = scheduler
override fun ui() = scheduler
override fun io() = scheduler
}

SchedulerProvider class is used in regular code. It wraps Scheduler.io(), Scheduler.computation() and AndroidScheduler.mainThread() calls. TrampolineSchedulerProvider and TestSchedulerProvider are used only in tests.

After explicitly passing scheduler to my presenter getSomeData() method looks like this:

class DemoPresenter(private val schedulerProvider: BaseSchedulerProvider,
private val view: DemoView,
private val service: DemoService) {

fun getSomeData() {
service.getSomeRemoteData()
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.ui())
.subscribeBy(
onSuccess = { view.showData(it) },
onError = { view.showError() }
)
}

Because I have schedulers conveniently wrapped and passed to presenter I can write following test which will pass:

private var view = mock(DemoView::class.java)
private var service = mock(DemoService::class.java)
private var schedulerProvider = TrampolineSchedulerProvider()
@Test
fun simpleTestExample() {
//given
val presenter = DemoPresenter(schedulerProvider, view, service)
given(service.getSomeRemoteData()).willReturn(Single.just(5))

//when
presenter.getSomeData()

//then
then(view).should().showData(5)
}

Beware hidden scheduler!

There is one very important thing that you have to remember when you use this approach. Some time ago I realized that some of my tests fail from time to time and I couldn’t figure out why. I tested a method which looked just like the previous getSomeData(), but it contained parametrized delay like this:

fun getSomeDataWithDelay(delayInMillis: Long) {
service.getSomeRemoteData()
.delay(delayInMillis, TimeUnit.MILLISECONDS)
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.ui())
.subscribeBy(
onSuccess = { view.showData(it) },
onError = { view.showError() }
)
}

In tests I passed delayInMillis = 0 and TrampolineSchedulerProvider to the presenter so everything seemed fine. But somehow the tests sometimes failed.

To spot the cause of the problem let’s see the delay method from Single class.

@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single<T> delay(long time, TimeUnit unit, boolean delayError) {
return delay(time, unit, Schedulers.computation(), delayError);
}

As you can see delay and many other RxJava operators like timeout or debounce use Schedulers.computation() internally! So even when we explicitly pass some scheduler to subscribeOn() method, the code will be executed on thread from computation scheduler.

So if you want your code to be testable you have to remember to always explicitly pass scheduler to such methods. It is extremely important because when you don’t do it sometimes the tests may start to fail from time to time, seemingly at random. And it’s really hard to debug problems which don’t always occur.

fun getSomeDataWithDelay(delayInMillis: Long) {
service.getSomeRemoteData()
.delay(delayInMillis, TimeUnit.MILLISECONDS, schedulerProvider.io())
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.ui())
.subscribeBy(
onSuccess = { view.showData(it) },
onError = { view.showError() }
)
}

Testing delays with TestScheduler

But what if we want to test if delays or timeouts in our code works as expected? Of course we can’t use Thread.sleep(time) in tests because it will make them very slow and indeterministic. That’s when the TestScheduler becomes very useful.

Using TestScheduler gives us a capability of manually advancing time. If we don’t do it the time remains frozen. This means that no tasks scheduled in the future on this Scheduler are ever executed until we manually advance time.

Let’s say we want to tests getSomeDataWithDelay method from the previous example. The test of that method may look like this.

private val testScheduler = TestScheduler()
private var testSchedulerProvider = TestSchedulerProvider(testScheduler)
@Test
fun delayTestExample() {
//given
val presenter = DemoPresenter(testSchedulerProvider, view, service)
given(service.getSomeRemoteData()).willReturn(Single.just(5))
val delayInMillis = 1000L

//when
presenter.getSomeDataWithDelay(delayInMillis)

//then
then(view).should(never()).showData(anyInt())
testScheduler.advanceTimeBy(delayInMillis, TimeUnit.MILLISECONDS)
then(view).should().showData(5)
}

As you can see showData was called only after I manually advanced time by 1000 milliseconds using testScheduler.advanceTimeBy. But if I didn’t manually advance time any event won’t be called even if the delay was set to zero.

More complex tests with TestObserver

There is another really great tool which helps to test asynchronous code. Let’s say we have a following observable and we want to check if the event with expected value was emitted exactly after one second.

val observable = Observable.just(6).delay(1, TimeUnit.SECONDS, testScheduler)

We may try to use observable.blockingFirst() to get the result from the observable but if we using testScheduler that method will just wait forever because no event will be called if we don’t manually advance time in testScheduler. This is where the TestObserver becomes handy.

TestObserver an observer that records events and allows making assertions about them. We can pass it to subscribe method of the observer and check if assertions about events are correct. The example usage of TestObserver is shown below.

@Test
fun testSubscriberTestExample() {
//given
val observable = Observable.just(6).delay(1, TimeUnit.SECONDS, testScheduler)
val testObserver = TestObserver<Int>()

//when
observable.subscribe(testObserver)

//then
testScheduler.advanceTimeBy(950, TimeUnit.MILLISECONDS)
testObserver.assertNotTerminated()
testScheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS)
testObserver.assertValue(6)
testObserver.assertComplete()
}

As you can see thanks to TestScheduler and TestObserver we can very simply check if after 950ms no event was recorded and observer was not terminated. After another 60ms we checked that the value 6 was returned and the observer completed.

TL;DR

  • To test asynchronous RxJava code it’s best to explicitly pass proper scheduler to class under test.
  • Trampoline scheduler is great for testing. It executes all tasks in a FIFO manner on one of the participating threads.
  • TestScheduler gives strict control over time. It’s great for testing delays or timeouts.
  • It’s really important to explicitly pass scheduler to delay, timeout, debounce and other operators which use computation scheduler internally. Even passing scheduler to subscribeOn method won’t change the scheduler which those operator internally use.
  • Use TestObserver if you want to check if events on observable were called. It records events and allows making assertions about them.

I hope this article will help you write better unit tests and you won’t fall into so many traps that I fell 😅. As always you can find the code with presented examples in the repository.

Sources:

Reactive Programming with RxJava. Creating Asynchronous, Event-Based Applications — Tomasz Nurkiewicz and Ben Christensen

--

--