Server polling and retrying failed operations. With Retrofit and RxJava.

A very common case in developing rest application is server polling and retry. When server is doing some job and we need to ask (with some delay) if it’s finished, also when we got an error, sometimes we want to retry. I’ve decided to write about that after I stuck with implementing server polling correctly with RxJava. Eventually I got a good answer about it in this stackoverflow question.

In this article I will explain how easily we can do it with RxJava and Retrofit 1. I assume that you already know how to work with RxJava, Retrofit and how to set up application architecture with these great tools.

A few definitions that I will use in this article:

  • “Predicate” — a class that is passed to some Observable methods, for Example: Observable.filter(/*here we pass the predicate*/), Observable.takeUntil(/*here we pass the predicate*/).
  • “Child of Observable” — an Observer that is chained after parents Observable. For example:
Observable
.filter(/*predicate here*/)
.takeUntil(/*predicate here*/)
.subscribe(/*subscriber here/)

Observable returned from takeUntil() is a child of Observable returned by filter(), the Subscriber that is passed as a parameter to the subscribe() is a child of Observable returned from takeUntil().

Server Polling.

This is the case when you are waiting for some job to be done by the server and you have to periodically make an API call to know if it’s already done.

Here is sample code:

It looks big, but it’s very easy to understand and it’s written with elegant chain of operators.

Assume that server returned “isJobDone=true” after third attempt, here is the log:

repeatWhen, call
// at this stage the API call is performed
filter, call response isJobDone=false
takeUntil, call response isJobDone=false
// at this stage the API call is performed again
filter, call response isJobDone=false
takeUntil, call response isJobDone=false
// at this stage the API call is performed for the third time
filter, call response isJobDone=true
onNext response isJobDone=true
takeUntil, call response isJobDone=true
onCompleted

In the next section I’ll explain why methods are called in that way.

RxJava internals explained for server polling implementation.

After the http request first of all the call() method of filter() predicate is called.

If we return “false” from filter() it means that result is not satisfying and we should not deliver it to the Subscriber<ServerPollingResponse>. Here we get the following chain of events:

  1. We returned “false”, it means that result was not delivered to the child of filter() aka Subscriber<ServerPollingResponse>.
  2. The call to onNext() was passed to takeUntil() Observable and call() method of its predicate was called.
  3. We see that “job is not done yet” and we return “false” from call() method of takeUntil().
  4. It means that onNext() and onComplete() of repeatWhen() Observable will be called.
  5. When onComplete() is called it causes a resubscribe to the original Observable with 5 seconds of delay. Basically this is causing http request to be called again. Which was our goal.

If we return “true” from filter() it means that this result is satisfying and we get following chain of events:

  1. Result is delivered to onNext(ServerPollingResponse response) of filter’s child — the Subscriber<ServerPollingResponse>.
  2. Then this result is delivered to call() method of takeUntil() predicate.
  3. We also return “true” from takeUntil() because we got the “job done”.
  4. Because we returned “true”, the takeUntil() operator calls onComplete() on its child. The child of takeUntil() is filter().
  5. filter() calls the onComplete() of its child — which is the Subscriber<ServerPollingResponse>.
  6. takeUntil() unsubscribes immediately, that’s why Observable of repeatWhen() will not get a call to onNext() or onComplete() and the http request won’t be called again.

The chain is terminated by internal unsubscribe() that is called from takeUntil() operator.

Server polling with increasing the time of waiting on each repeat.

The basic principle is the same. We just need to add a few chaining methods to the repeatWhen() predicate:

Here before each API call the original delay time is multiplied by the value of the attempt. Very simple and effective.

Here is the log:

repeatWhen, call
// here is our first API call
filter, call response isJobDone=false
takeUntil, response isJobDone=false
zipWith, call, attempt 1
flatMap, call, repeatAttempt 1
// wait 10 seconds and run API call for the second time
filter, call response isJobDone=false
takeUntil, response isJobDone=false
zipWith, call, attempt 2
flatMap, call, repeatAttempt 2
// wait 20 seconds and run API call for the third time
filter, call response isJobDone=true
onNext response isJobDone=true
takeUntil, response isJobDone=true
onCompleted

Please find the explanation below.

RxJava zipWith() and flatMap() explained.

I will omit those parts that were explained previously and jump right to the most relevant — zipWith() and flatMap().

  1. After takeUntil() did it’s job the Observable returned from repeatWhen() starts its work. This Observable is a combined result of two functions: zipWith() and flatMap().
  2. zipWith(parameter1, parameter2) take values emitted from Observable that came into repeatWhen() which is Void aVoid and value emitted by parameter1 aka Observable.range(COUNTER_START, ATTEMPTS) and passes it into function call(Void, Integer) method. In call() method we can do some stuff with these two parameters and return a value (In our case it’s Integer, but it can be anything, if we want something else we just need to change the third generic type in new Func2<Void, Integer, /*change this*/Integer>) but we just return the value we got from our Observable, this value is the repeat attempt.
  3. The value from zipWith() is wrapped into Observable that emits this value. And we handle it in flatMap().
  4. flatMap() takes the value and uses it to produce a timer Observable. Timer Observable waits for specified amount of time and passes control down the chain, to the original Observable that does the API call.

We could omit using zipWith() and just use flatMap() inside the repeatWhen().

In this case we would need to have a counter, control the attempt by ourselves and terminate the sequence when we have to stop. But we use zipWith() and RxJava does it for us!

Retry when we got an error

As you know in Retrofit 1 every network error is handled via onError() method.

To implement retry when we lost internet connection or get any http status besides 200 OK we have to use retryWhen() instead of repeatWhen(). And the parameters of zipWith() will be a bit different.

The difference between repeatWhen() and retryWhen() is that repeatWhen() resubscribes when it gets onNext() and retryWhen() does that when is gets onError().

Lets combine repeatWhen() and retryWhen() to poll the server and retry if we failed.

Here is the relevant code:

You may notice that I’ve wrapped the zipWith() and flatMap() into separate method and used compose to reuse it in repeatWhen() and retryWhen(). Now if we failed we retry, if we got success but “job is not ready yet” we will repeat.

RxJava rocks!

Cheers :)

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.