RxJava — Practical takeUntil Example

Last week I had the following problem:

- I had to send different model objects of the same type to the backend
- There was no backend API to send all models at once
- Once the backend returned a successful response I should stop sending further model objects

Since the source of model objects was already reactive and I already had a way of sending one model to the backend in a reactive manner too I
decided to stick with the reactive approach.

I was thinking of which operator would make the most sense here and I remembered takeUntil.

From the documentation:

Returns an Observable that emits items emitted by the source Observable, checks the specified predicate
for each item, and then completes if the condition is satisfied.

Sounds pretty good right, so how do we actually wire it up?

First let’s get our models that we need to send.

modelProvider.getItems()

This is our source Observable. The next thing to do is for each object a backend request. FlatMap comes in handy there.

modelProvider.getItems()
.flatMap(retroApiInterface::doBackendRequest)

I’m using Retrofit here to send a backend request which will return an Observable that emits the matching response.

Let’s assume the model that our Retrofit Observable emits has a function whether it was successful or not. All we need to do is check it. If it was successful we should stop sending any more requests to the backend. If it wasn’t we should keep going. So let’s apply the takeUntil operator.

modelProvider.getItems()
.flatMap(retroApiInterface::doBackendRequest)
.takeUntil(response -> response.isSuccessful())

Now it’d automatically stop the source Observable once the response is successful and hence stop sending backend requests. In case it wasn’t, further backend requests will be made.

Now one could think that we’re done but there are actually 2 cases left that are not handled:

- What if no items are emitted from the source Observable?
- What if the backend never gives us a successful response?

In both of those scenarios we’d end up in onComplete of our Subscriber and that might not be what we want. We can fix that with the lastOrDefault Operator.

modelProvider.getItems()
.flatMap(retroApiInterface::doBackendRequest)
.takeUntil(response -> response.isSuccessful())
.lastOrDefault(ServerResponse.createUnsuccessful())

We want the last one that was emitted from the Source Observable and then flatMapped into the Retrofit Observable. If the source Observable did not emit we’ll create an invalid response our self.

Now that we only emit one value we can convert it to Single.

modelProvider.getItems()
.flatMap(retroApiInterface::doBackendRequest)
.takeUntil(response -> response.isSuccessful())
.lastOrDefault(ServerResponse.createUnsuccessful())
.toSingle()

That’s it now. We can subscribe now and handle the emitted response as well as the error case.

modelProvider.getItems()
.flatMap(retroApiInterface::doBackendRequest)
.takeUntil(response -> response.isSuccessful())
.lastOrDefault(ServerResponse.createUnsuccessful())
.toSingle()
.subscribe(response -> {
if (response.isSuccessful()) {
// We made it.
} else {
// Not successful.
}
}, throwable -> {
// Some error happened along the way.
})

This is it. The problem is solved in a reactive manner with little code. I’m more than happy to receive any feedback.

Note: I left out the entire Scheduling on purpose to keep it simple. Usually you’d apply Scheduler(s) to shift the backend request to the background and once you subscribe to it shift it back to the UI to be able to make UI changes.

Edit: As pointed out by Ivan Škorić we can make this even shorter, not use takeUntil and instead use firstOrDefault:

modelProvider.getItems()
.flatMap(retroApiInterface::doBackendRequest)
.firstOrDefault(ServerResponse.createUnsuccessful(), response -> response.isSuccessful())
.toSingle()