RETRY improvement — RxKotlin

Nicolas Duponchel
2 min readFeb 18, 2018

--

In this article, I explain how the Rx retryWhen() method helped me.

My problem is the following, I use a service named readContainer and that returns an Observable. This service can throw the error CONTAINER_LOCKED. This error occurs when the operation I want to achieve is temporary impossible. But in my case, waiting a little longer could solve the problem.

If I encounter this error I would like to retry x times every t seconds. The algorithm I want is the following.

- read → throw CONTAINER_LOCKED
- wait t
- retry read (1st time)→ throw CONTAINER_LOCKED
- wait t
- […]
- retry read (x time) → throw CONTAINER_LOCKED

First let’s try to simply retry when this error is throwed. We use the retryWhen method. It “returns an Observable that emits the same values as the source ObservableSource with the exception of an onError.” So we operate on an Observable<Throwable> (the it in the code).

.retryWhen {
it
.map { throwable ->
if (throwable.toError() == CONTAINER_LOCKED) throwable
else throw throwable
}
}

Now let’s control when and how we will retry. We use the method interval. It “Returns an Observable that emits a sequential number every specified interval of time.” This will be our “timer”.

Observable.interval(DELAY_BEFORE_RETRY.duration, DELAY_BEFORE_RETRY.timeUnit)data class Duration(val duration: Long, val timeUnit: TimeUnit){...}

To “synchronize” this Observable with the first one, we will use zip. It “Returns an Observable that emits the results of a specified combiner function applied to combinations of two items emitted, in sequence, by two other ObservableSources.”

Observables.zip(
source1 = firstObservable,
source2 = secondObservable,
source3 = thirdObservable)

The Kotlin zip method returns a Pair<src1, src2>, in our case it will be a Pair<Throwable!, Long!>. Throwing the error after the timeout.

map { if (it.second >= TIMEOUT / DELAY_BEFORE_RETRY) throw it.first }

The final code :

readContainer().retryWhen {
Observables
.zip(source1 = it.map { throwable ->
if (throwable.toError() == CONTAINER_LOCKED) throwable
else throw throwable
}, source2 = Observable.interval(DELAY_BEFORE_RETRY.duration, DELAY_BEFORE_RETRY.timeUnit))
.map { if (it.second >= TIMEOUT / DELAY_BEFORE_RETRY) throw it.first }
}

To go further

As you can see, the code above is not as readable as we would expect. I suggest to use an extension function.

fun <T> Observable<T>.retry(predicate: (Throwable) -> Boolean, maxRetry: Long, delayBeforeRetry: Duration): Observable<T> =
retryWhen {
Observables.zip(
it.map { if (predicate(it)) it else throw it },
Observable.interval(delayBeforeRetry.duration, delayBeforeRetry.timeUnit))
.map { if (it.second >= maxRetry) throw it.first }
}

Which would give the following.

readContainer().retry(
predicate = { it.toError() == CONTAINER_LOCKED },
maxRetry = TIMEOUT / DELAY_BEFORE_RETRY,
delayBeforeRetry = DELAY_BEFORE_RETRY
)

Thanks Kotlin !

--

--