Practical usage of RxJava’s Completable on Android App

2017/02/04 updated : Special thanks to Dávid Karnok for pointing out mistakes and giving me some great feedbacks :)

Nowadays, most Android developers are familiar with RxJava to build their own app. In most of the case, Observable and Single(or Flowable and Maybe in RxJava 2.X) is enough to build any app on Android. However, another (and some what inconspicuous)reactive class called Completable might help you build a better app.

Here, I would like to write about a usage of Completable and some points you have to pay attention to.

What is Completable and why use it?

For those who are not familiar with Completable, here's the document from JavaDoc.

Represents a deferred computation without any value but only indication for completion or exception

In another word, this reactive class just returns success notification or any error of operations. A simple example of Completable:

Completable.fromAction(() -> { //Do long tasks here })   
.subscribeOn(Scheduler.io())
.observeOn(AndroidScheduler.mainthread())
.subscribe(() -> System.out.println("complete task done"), Throwable::printStackTrace) }

You can convert to Completable from other react classes using flatMapCompletable:

Completable completableTask(long miliSec) { 
return Completable.fromAction(() -> {
Thread.sleep(miliSec);
System.out.println("complete task done with " + miliSec);
});
}
Flowable.just(1000L)
.flatMapCompletable(s -> completableTask(s))
.subscribe(() -> System.out.println("done")); //OK

Or you can use .ignoreElements() for Flowable and .ignoreElement() for Maybe:

LongTask()
.ignoreElements()
.subscribe(()-> System.out.println("complete"));

In my opinion, Completable is useful in 2 reasons for Android.

1. Simplifies interface

Often Subscriber just wants to know if a task that subscribed has been completed or not. For instance, persisting data to a local storage or posting data to servers.

RxView.clicks(button)
.subscribeOn(AndroidScheduler.mainthread())
.observeOn(Scheduler.io())
.map(aVoid -> updateData(button.getItem()))
.observeOn(AndroidScheduler.mainthread())
.subscribe(data -> {
/* do tasks which never consumes subscribed data */
}, e -> {});

It’s waste of resource to pass data that never get used. A return value could confuse other developers. Using Completable simplifies a usage of a method.

RxView.clicks(button)
.subscribeOn(AndroidScheduler.mainthread())
.observeOn(Scheduler.io())
.map(aVoid -> updateData(button.getItem()))
.observeOn(AndroidScheduler.mainthread())
.subscribe(() -> {
/* successfully completed */
}, e -> {});

2. Improves testability

Since Completable is only responsibile for a task's completion or errors, testing becomes much easier.

Let’s say updateData() method converts and upload data and saves it to the local client(here database) and returns Flowable<someData>.

Flowable<someData> updateData(Item item){ 
return Flowable.just(item)
.map(item -> convertUseCase.someConvertconvertData(item))
.flatMap(convertData -> someDataApi.updateData(convertData))
.doOnNext(someData -> db.setSomeData(someData)); }

convertUseCase.someConvertconvertData returns different data based on an item argument. To test this method, it would be like

@Test 
public void test_updateData_success_some_data_pattern1{
...
when(convertUseCase.someConvertconvertData(item))
.thenReturn(mockConvertDatapatten_1);
when(someDataApi.updateData(convertData))
.thenReturn(mockSomeData);

update(item).test()
.assertValue(actual -> {
assertThat(actual, is(expected));
});
}
@Test 
public void test_updateData_success_some_data_pattern2{
...
when(convertUseCase.someConvertconvertData(item))
.thenReturn(mockConvertData_pattern2);
when(someDataApi.updateData(convertData))
.thenReturn(mockSomeData);
  update(item)
.test()
.assertValue(actual -> {
assertThat(actual, is(expected));
});
}
@Test
public void test_updateData_error_some_data_pattern1{
...
when(convertUseCase.someConvertconvertData(item))
.thenReturn(mockConvertData_pattern1);
when(someDataApi.updateData(convertData))
.thenReturn(mockSomeData);
  update(item).test().assertError(...); }

As you can see, you have to check two things: whether this method returns successfully or not, and the logic is correct.

If you use Completable, test can be much more simple. First, method would be like this:

Completable updateData(Item item){ 
return Flowable.just(item)
.map(item -> convertUseCase.convert(item))
.flatMap(convertData ->
someDataApi.updateData(convertData))
.doOnNext(someData -> db.setSomeData(someData))
.ignoreElement();
}

Then test

@Test 
public void test_updateData_success {
...
when(convertUseCase.convert(item))
.thenReturn(mockConvertData);
when(someDataApi.updateData(convertData))
.thenReturn(mockSomeData);
  update(item)
.test()
.assertComplete();
  verify(convertUseCase,times(1))
.convert(mockConvertData);
verify(someDataApi, times(1)).updateData(mockSomeData);
verify(db, times(1)).setSomeData(someData);
}

@Test
public void test_updateData_error {
...
when(convertUseCase.convert(item))
.thenReturn(mockConvertErrorData);
when(someDataApi.updateData(convertData))
.thenReturn(mockSomeData);
   update(item).test().assertError(...);
}

This is it for testing update() method. All you have to test is what logics are executed inside the method. Logics (here convertUseCase.convert() and someDataApi.updateData(convertData)) can be tested separately.

Things to keep in mind

Now you understood the benefit of using Completable. There are couple things to watch out when you use it.

Completable never completes when upstream is Processor

Processor(known as Subject in RxJava 1.X) is a very useful tool to bridge non-RxJava Apis. Also it can be used as Publisher which emits data to each Subscriber whenever a data sources is updated. Let's think about a simple usecase in updating a profile. ProfileService class below has a method to update a name and any Subscriber observing profile can detect the change.

class ProfileService { 
BehaviorProcessor<Profile> profileProcessor
= BehaviorProcessor.create();
Profile profile = Profile();
  Flowable<Profile> observeProfile() { return profileProcessor; } 

Completable updateName(String name) {
profile.updateName(name);
profileProcessor.onNext(profile);
}
}

Usage would be like:

profileService.observeProfile()
.subscribe(this::updateView); // watch profile update
profileService.updateName(name) // observeProfile() will be emitted    
.subscribe(() -> log("name updated"));

This works perfectly fine. Now what if you want to do some completable Task between observeProfile() and subscribe()? Maybe you would think the code below should work.

profileService.observeProfile()
.flatMapCompletable(profile -> someCompletableTask(profile))
.subscribe(() -> log("completable task done"));

Unfortunately, this doesn’t run as you expected. someCompletableTask will be executed, but log() won't. This is because Completable requires Upstream to be completable but Processor never completes. You can call observeProfile().complete() to call onComplete() but that’s not what you want for Processor. Instead, you can use doOnComplete to do the completable task

profileService.observeProfile()
.flatMapCompletable(profile -> someCompletableTask(profile).doOnComplete(() -> log("completable task done")))
.subscribe();

Make sure to call .subscribe() to actually execute someCompletableTask.

Explicitly call test() to finish flatmapCompletable operator test

Let’s say you want to test convertData below

Completable convertData(SomeData data) { 
return usecaseA.convertSomeData(data).ignoreElements();
}
//UseCaseA.java 
Flowable<ConvertData> convertSomeData(SomeData data) {
//... some operation
return Flowable.just(convertedData);
}

Test code would be

@Test 
public void convertData_success() {
...
convertData(mockData);
verify(usecaseA, times(1)).convertSomeData(data);
}

This test will pass. However, if flatMapCompletable is used as the code below, the test above will fail.

Completable convertData(SomeData data) { 
return usecaseA.convertSomeData(data)
.flatMapCompletable(usecaseB::someCompletableTask);
}

You will see the error message:

Wanted but not invoked: usecaseB.someCompletableTask(data)

This is because .test() is required to execute operation inside flatMapCompletable. So test should be

@Test 
public void convertData_success() {
...
convertData(mockData).test();
verify(usecaseA, times(1)).convertSomeData(data);
verify(usecaseB, times(1)).someCompletableTask(data);
}

Wrap up

Although Completable does not have much presence compared to other reactive classes, it helps you write simple codes and better tests. There are some points that you should be aware of, but once you get used to it, you will find a lot of usecases.


Originally published at gist.github.com.