Testing RxJava code made easy

There’s a nice little hidden treasure in RxJava 2 that was also backported to RxJava 1. It’s the test() function that allows you to easily test RxJava 2 streams. It has a bunch of useful assertions that we’ll dive into.

Let’s take an easy example. We want to assert that an Observable is being subscribed to, emits one value, completes and has no errors.

Observable.just(1)
.test()
.assertSubscribed()
.assertValues(1)
.assertComplete()
.assertNoErrors()

It’s quite a lot of statements that we need to do here. Luckily there’s assertResult()

Observable.just(1)
.test()
.assertResult(1)

It’ll do all of the above for us already. It also works for Completable, Maybe and Single.

Completable.complete()
.test()
.assertResult()
Maybe.empty<Int>()
.test()
.assertResult()
Maybe.just(1)
.test()
.assertResult(1)
Single.just(1)
.test()
.assertResult(1)

As a matter of fact all of those functions outlined here in this article will work on any RxJava 2 reactive type since they share the same test infrastructure.

Let’s say we want the same assertions but with the difference that we don’t want the stream to be completed yet so that it could emit more data. There’s assertValuesOnly for us:

Observable.create<Int> { it.onNext(1) }
.test()
.assertValuesOnly(1)

The opposite of that would be that the stream is still open but nothing has been emitted yet.

Observable.never<Int>()
.test()
.assertEmpty()

We can also assert index based by using the assertValueAt:

Observable.just(1, 2)
.test()
.assertValueAt(0, 1)

Even better if we have some complex data structure we can use the assertValueAt that takes a predicate.

Observable.just(1, 2)
.test()
.assertValueAt(0) { it == 1 }

The opposite of assertValue is assertNever that checks that a value was never seen.

Observable.just(1, 2)
.test()
.assertNever(3)

There are also a few helpful assertions for checking whether an error was emitted or not.

Observable.error<Int>(RuntimeException())
.test()
.assertFailure(RuntimeException::class.java)

This is one of my favorite ones, since it checks that a subscriber subscribed to the Observable, got an error, no values were emitted and did not complete. Just what we wanted. With all of the extra benefits.

Let’s see how we could check that values were sent and an error was raised.

Observable.error<Int>(RuntimeException())
.startWith(Observable.just(1, 2))
.test()
.assertFailure(RuntimeException::class.java, 1, 2)

It’s that easy. There’s also assertFailureAndMessage that allows us to check the exception message as well:

Observable.error<Int>(RuntimeException(“Whops”))
.test()
.assertFailureAndMessage(RuntimeException::class.java, “Whops”)

Conclusion

I hope I convinced you that the above functions are handy, more precise and easier to read than something like this:

val o = TestObserver<Int>()
Observable.just(1)
.subscribe(o)
o.assertValues(1)
.assertSubscribed()
.assertComplete()
.assertNoErrors()

I explicitly didn’t talk about all of the blocking assertions and asynchronous helper methods since I like my test to run synchronously and leverage another treasure of RxJava that I’ll write about next time.