Testing asynchronous RxJava code using Mockito

Fabio Collini
8 min readNov 22, 2016

RxJava is used in many Android applications to manage background tasks, in this post will see how to test this code using Mockito and some RxJava utility classes. The example is written using RxJava 2, the same concepts are available also in RxJava 1 (in the demo project you can find both versions of the same example).

Let’s see a simple example, we need a class (created using Retrofit) to execute server calls to StackOverflow API:

public interface StackOverflowService {

@GET("/users")
Single<UserResponse> getTopUsers();

@GET("/users/{userId}/badges")
Single<BadgeResponse> getBadges(@Path("userId") int userId);
}

Using getTopUsers method we can retrieve a list of the top users, getBadges returns a list of badges of a specific user.

We can combine these two methods in a new class using RxJava, loadUsers method retrieves a list of the top 5 StackOverflow users (UserStats class contains user information and the badges list):

public class UserService {
private StackOverflowService service;

public UserService(StackOverflowService service) {
this.service = service;
}

public Single<List<UserStats>> loadUsers() {
return service.getTopUsers()
.flattenAsObservable(UserResponse::items)
.take(5)
.flatMapSingle(this::loadUserStats)
.toList();
}

private Single<UserStats> loadUserStats(User user) {
return service.getBadges(user.id())
.subscribeOn(Schedulers.io())
.map(BadgeResponse::items)
.map(badges -> UserStats.create(user, badges));
}
}

The io scheduler is used in the loadUserStats method to execute the calls in parallel. Spoiler alert: there is a little bug in this class! We’ll see how to fix it in a while.

First Mockito test

The UserService class uses dependency injection, the StackOverflowService object is a constructor parameter. So we can easily test it using Mockito:

public class UserServiceTest {

@Rule public MockitoRule rule = MockitoJUnit.rule();

@Mock StackOverflowService stackOverflowService;

@InjectMocks UserService userService;

@Test public void emptyTest() {
userService.loadUsers();
}
}

This test is very easy, it creates the RxJava single but it doesn’t invoke the subscribe method. It’s easy but it doesn’t work! Executing the test we get a NullPointerException:

java.lang.NullPointerException
at UserService.loadUsers(UserService.java:21)
at UserServiceTest.emptyTest(UserServiceTest_2.java:19)

We haven’t defined any behaviour using Mockito, so all the methods return just null. Sometimes null can be a good default value but in this example we have a strange NullPointerExeption executing the test.

Changing default Mockito return values

Using Mockito we can change default value returned in a method invocation, we need to define a class MockitoConfiguration in package org.mockito.configuration:

public class MockitoConfiguration extends   
DefaultMockitoConfiguration {
public Answer<Object> getDefaultAnswer() {
return new ReturnsEmptyValues() {
@Override
public Object answer(InvocationOnMock inv) {
Class<?> type = inv.getMethod().getReturnType();
if (type.isAssignableFrom(Observable.class)) {
return Observable.error(createException(inv));
} else if (type.isAssignableFrom(Single.class)) {
return Single.error(createException(inv));
} else {
return super.answer(inv);
}
}
};
}

@NonNull
private RuntimeException createException(
InvocationOnMock invocation) {
String s = invocation.toString();
return new RuntimeException(
"No mock defined for invocation " + s);
}
}

In this class we define that, when a method returns an Observable (or a Single), the value returned is an Observable (or a Single) that emits an exception. In this way executing the previous test we get an exception that says that we need to define the behaviour of a mock (and not a NullPointerException!).

Testing synchronous behaviour

So let’s define the behaviour of the StackOverflowService object, we return a list with two users and a list with a singe badge:

when(stackOverflowService.getTopUsers()).thenReturn(
just(UserResponse.create(
User.create(1, 200, "user 1"),
User.create(2, 100, "user 2")
))
);
when(stackOverflowService.getBadges(anyInt())).thenReturn(
just(BadgeResponse.create(Badge.create("badge")))
);

And now we need to subscribe the Single, the simplest way to do it in a test is using a blocking Single:

List<UserStats> l = userService.loadUsers().blockingGet();
assertThat(l).hasSize(2);

In this way the test waits for the end of the server calls and we can verify the value emitted by the Single. In this example I am using AssertJ, a great library that allows to write test assertions in a simple and readable way.

The blockingGet method can’t be always used, for example we can’t test an Observable that doesn’t terminate using this method. Using RxJava 2 we can use a TestObserver, it can be created invoking the test method:

TestObserver<List<UserStats>> testObserver =    
userService.loadUsers().test();
testObserver.awaitTerminalEvent();testObserver
.assertNoErrors()
.assertValue(l -> l.size() == 2);

This is a more flexible way to write our test, it works in every situation (even when the observable we want to test doesn’t terminate). The TestObserver class contains a lot of methods that can be used to verify the behaviour of an RxJava object (more examples on how to use this class are available in this post).

In this example we must invoke awaitTerminalEvent because the method that retrieves the badges list is executed on the io scheduler. In the next paragraph we’ll see other solutions to this problem.

Now let’s modify our test to check the ids of the users. We can use assertValue method and check the ids (we are using RxJava to extract the id on every object of the list):

userService.loadUsers()
.test()
.assertNoErrors()
.assertValue(l ->
Observable.fromIterable(l)
.map(UserStats::id)
.toList()
.blockingGet()
.equals(Arrays.asList(1, 2))
);

The parameter of the assertValue method is a Predicate, it’s executed on the emitted value and the test fails when the return value is false. Unfortunately the error message is something similar to this one:

java.lang.AssertionError: Value not present (latch = 0, values = 1, errors = 0, completions = 1)

Reading this message is not easy to understand the cause of the error.

TestObserver and AssertJ

We can improve the previous test using a small hack and AssertJ:

testObserver
.assertNoErrors()
.assertValue(check(
l -> assertThat(l)
.extracting(UserStats::id)
.containsExactly(1, 2)
));

The hack is in the check method, it transform a block of code that can throw an exception to a Predicate that returns a boolean. If the code throws an exception the test fails:

public static <T> Predicate<T> check(Consumer<T> consumer) {
return t -> {
consumer.accept(t);
return true;
};
}

This is an example of an AssertJ error message, it’s very accurate and the cause of the failure can be easily understood reading it:

Actual and expected have the same elements but not in the same order, at index 0 actual element was:
<2>
whereas expected element was:
<1>

Testing asynchronous code

The previous test works… sometimes! We are using a flatMap with a Single executed on the io scheduler so the invocations of the getBadges method are executed in parallel. For this reason this test (and the production code!) is not deterministic, it works when the first invocation of getBadges is faster than the second one.

If we want to fix this issue we can use a Rule that replaces the default scheduler with an synchronous one (more info about this solution is available here):

public class TrampolineSchedulerRule implements TestRule {

@Override
public Statement apply(final Statement base, Description d) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaPlugins.setIoSchedulerHandler(
scheduler -> Schedulers.trampoline());
RxJavaPlugins.setComputationSchedulerHandler(
scheduler -> Schedulers.trampoline());
RxJavaPlugins.setNewThreadSchedulerHandler(
scheduler -> Schedulers.trampoline());
RxAndroidPlugins.setInitMainThreadSchedulerHandler(
scheduler -> Schedulers.trampoline());

try {
base.evaluate();
} finally {
RxJavaPlugins.reset();
RxAndroidPlugins.reset();
}
}
};
}
}

Using this rule in the test we fix the previous problem but we are removing the concurrency in our code; the two invocations of getBadges are no more executed in parallel.

Let’s try to remove the nondeterministic behaviour, we can add delays to simulate that the first call is slower than the second one:

when(stackOverflowService.getTopUsers()).thenReturn(
just(UserResponse.create(
User.create(1, 200, "user 1"),
User.create(2, 100, "user 2")
)
));
when(stackOverflowService.getBadges(eq(1))).thenReturn(
just(BadgeResponse.create(Badge.create("badge1")))
.delay(2, TimeUnit.SECONDS));
when(stackOverflowService.getBadges(eq(2))).thenReturn(
just(BadgeResponse.create(Badge.create("badge2")))
.delay(1, TimeUnit.SECONDS));

TestObserver<List<UserStats>> testObserver =
userService.loadUsers().test();

testObserver.awaitTerminalEvent();

testObserver
.assertNoErrors()
.assertValue(check(
l -> assertThat(l)
.extracting(UserStats::id)
.containsExactly(1, 2)
));

This test is deterministic, it always fails because the elements in the list are not returned in the right order. But unfortunately it last two seconds because the delays are real and we need to wait them. To fix it we can use a TestScheduler, we can create another rule similar to the previous one to replace the real schedulers with a TestScheduler:

public class TestSchedulerRule implements TestRule {  private final TestScheduler testScheduler = new TestScheduler();

public TestScheduler getTestScheduler() {
return testScheduler;
}

@Override
public Statement apply(final Statement base, Description d) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaPlugins.setIoSchedulerHandler(
scheduler -> testScheduler);
RxJavaPlugins.setComputationSchedulerHandler(
scheduler -> testScheduler);
RxJavaPlugins.setNewThreadSchedulerHandler(
scheduler -> testScheduler);
RxAndroidPlugins.setMainThreadSchedulerHandler(
scheduler -> Schedulers.trampoline());

try {
base.evaluate();
} finally {
RxJavaPlugins.reset();
RxAndroidPlugins.reset();
}
}
};
}
}

Using a TestScheduler we can easily manage time, for example we can advance time by two seconds:

TestObserver<List<UserStats>> testObserver = 
userService.loadUsers().test();

testSchedulerRule.getTestScheduler().advanceTimeBy(2, SECONDS);

testObserver
.assertNoErrors()
.assertValue(check(
l -> assertThat(l)
.extracting(UserStats::id)
.containsExactly(1, 2)
));

And now the test is deterministic and fast! But it fails, let’s see how to fix it.

We can replace the flatMap invocation with a concatMap, using this method the server calls to retrieve badges are executed in sequence (more info on flatMap Vs concatMap are available in this post of Fernando Cejas):

public Single<List<UserStats>> loadUsers() {
return service.getTopUsers()
.flattenAsObservable(UserResponse::items)
.take(5)
.concatMap(this::loadUserStats)
.toList();
}

Using concatMap we need to change the test code, the delays are not executed in parallel so the total time is three seconds. To fix the test we must change the parameter of the advanceTimeBy method:

testSchedulerRule.getTestScheduler().advanceTimeBy(3, SECONDS);

If we want to execute the server calls in parallel we can use flatMap and manually sort the data using toSortedList method using a Comparator that order the user based on the reputation:

public Single<List<UserStats>> loadUsers() {
return service.getTopUsers()
.flattenAsObservable(UserResponse::items)
.take(5)
.flatMapSingle(this::loadUserStats)
.toSortedList((u1, u2) ->
u2.reputation() - u1.reputation())
;
}

EDIT: in the reddit thread of this post Vinaybn has suggested another solution to this problem. The method concatMapEager can be used to maintain the order and execute the observables in parallel:

public Single<List<UserStats>> loadUsers() {
return service.getTopUsers()
.flattenAsObservable(UserResponse::items)
.take(5)
.concatMapEager(user -> loadUserStats(user).toObservable())
.toList();
}

Testing timeouts

Using RxJava is very easy to add a timeout to a method invocation, in our example we can just add an invocation to timeout method:

public Single<List<UserStats>> loadUsers() {
return service.getTopUsers()
.flattenAsObservable(UserResponse::items)
.take(5)
.flatMapSingle(this::loadUserStats)
.toSortedList((u1, u2) ->
u2.reputation() - u1.reputation())
.timeout(20, TimeUnit.SECONDS);
}

We are using the timeout method without the Scheduler parameter. This method internally use the computation scheduler, this is the source code of the method:

public final Single<T> timeout(long timeout, TimeUnit unit) {
return timeout0(timeout, unit, Schedulers.computation(), null);
}

Using the rule we defined earlier we can replace the computation scheduler with a TestScheduler. Writing a test that simulates a timeout is easy:

when(stackOverflowService.getTopUsers()).thenReturn(
just(UserResponse.create(
User.create(1, 200, "user 1"),
User.create(2, 100, "user 2")
)).delay(10, TimeUnit.SECONDS)
);
when(stackOverflowService.getBadges(eq(1))).thenReturn(
just(BadgeResponse.create(Badge.create("badge1")))
.delay(2, TimeUnit.SECONDS));
when(stackOverflowService.getBadges(eq(2))).thenReturn(
just(BadgeResponse.create(Badge.create("badge2")))
.delay(11, TimeUnit.SECONDS));

TestObserver<List<UserStats>> testObserver =
userService.loadUsers().test();

testSchedulerRule.getTestScheduler().advanceTimeBy(20, SECONDS);

testObserver.assertError(TimeoutException.class);

Testing exceptions and retry logic

A good test should verify the behaviour of the production code when an exception occurs. So let’s add other test methods to our example.

First of all let’s add an invocation to retry method:

public Single<List<UserStats>> loadUsers() {
return service.getTopUsers()
.flattenAsObservable(UserResponse::items)
.take(5)
.flatMapSingle(this::loadUserStats)
.toSortedList((u1, u2) ->
u2.reputation() - u1.reputation())
.retry(1)
.timeout(20, TimeUnit.SECONDS);
}

In the test method we can simulate that first invocation of getBadges throws an exception (the second invocation returns the correct value):

when(stackOverflowService.getTopUsers()).thenReturn(
just(UserResponse.create(
User.create(1, 200, "user 1"),
User.create(2, 100, "user 2")
))
);
when(stackOverflowService.getBadges(eq(1)))
.thenReturn(error(new RuntimeException(":(")))
.thenReturn(
just(BadgeResponse.create(Badge.create("badge1")))
);
when(stackOverflowService.getBadges(eq(2))).thenReturn(
just(BadgeResponse.create(Badge.create("badge2")))
);
TestObserver<List<UserStats>> testObserver =
userService.loadUsers().test();

testSchedulerRule.getTestScheduler().triggerActions();

testObserver
.assertNoErrors()
.assertValue(check(
l -> assertThat(l)
.extracting(UserStats::id)
.containsExactly(1, 2)
));

This test works correctly, the getBadges method is invoked two times with the parameter 1 thanks to the retry method.

Unfortunately we can’t write a similar test method to verify what happens when an error of getTopUsers method occurs. This method is invoked just once even when there is an error, the retry method executes another subscription to the same Single returned by the getTopUsers method.

We need to define a Single that throws an exception on the first subscription and return the right value on the second subscription. Using a boolean field we can decide the behaviour of the Single:

when(stackOverflowService.getTopUsers())
.thenReturn(Single.fromCallable(new Callable<UserResponse>() {
private boolean firstEmitted;

@Override public UserResponse call() throws Exception {
if (!firstEmitted) {
firstEmitted = true;
throw new RuntimeException(":(");
} else {
return UserResponse.create(
User.create(1, 200, "user 1"),
User.create(2, 100, "user 2")
);
}
}
}));

Wrapping up

Testing asynchronous code is never easy, it’s almost impossible to manually test all the cases and it’s hard to simulate them using a JUnit test. In this post we have seen that, using RxJava and the right libraries, it’s not so difficult and that all the corner cases can be tested using a JVM test.

Thanks to Federico Paolinelli and Francesco Strazzullo for proofreading this post.

--

--

Fabio Collini

Android GDE || Android engineer @nytimes || author @androidavanzato || blogger @codingjam