Back-off my Flowables

Recently I needed to add a network backoff to an RxJava2 Flowable in our Android project and really struggled to find any good examples on the Internet. Most of the back-off examples were written for RxJava Observables not RxJava2 Flowables and, of course, nearly everything was written in Java and not Kotlin. So there was nothing else for it: I had to write the code from scratch.

My basic requirement was that I wanted it to back-off exponentially for the first x network requests, then revert to a number of fixed duration attempts before finally raising a network error. I gave it some default values so 3 exponential backoffs (1, 4 and 9 seconds), followed by up to 180 ten-second delays (half an hour). The flowable is restarted when the screen is reopened so this would require the user to actually stay on the same screen for more than half an hour inanely watching my progress wheel spinning without their phone locking and without the user doing anything to resolve their network issues.

My RxJava has always been a bit shaky but after some help from my colleague Rojan Thomas, we finally came up with a good solution that we were quickly able to reuse throughout the project.

Also because it was a simple, discrete class it was a great candidate for good unit-testing. Anyway, without further ado, here is the code:

private const val MAX_BACKOFF_RETRIES = 3 // Up to 9 seconds backoff
private const val MAX_FIXED_DELAY_RETRIES = 180 // Half an hour
private const val FIXED_DELAY_DURATION_SECONDS = 10L

class BackoffRetryWhen(private val maxBackoffRetries: Int = MAX_BACKOFF_RETRIES,
private val maxFixedDelayRetries: Int = MAX_FIXED_DELAY_RETRIES,
private val fixedDelayDuration: Long = FIXED_DELAY_DURATION_SECONDS,
private val scheduler: Scheduler = Schedulers.computation()) : Function<Flowable<Throwable>, Publisher<Long>> {

private var retryAttempts: Long = 0L

override fun apply(flowableThrowable: Flowable<Throwable>): Publisher<Long> {
return flowableThrowable
.flatMap { throwable ->
when {
++retryAttempts <= maxBackoffRetries -> {
val retryDuration = retryAttempts * retryAttempts
return@flatMap Flowable.just<Long>(retryDuration).delay(retryDuration, TimeUnit.SECONDS, scheduler)
}
retryAttempts <= maxFixedDelayRetries + maxBackoffRetries -> {
return@flatMap Flowable.just<Long>(fixedDelayDuration).delay(fixedDelayDuration, TimeUnit.SECONDS, scheduler)
}
else -> {
return@flatMap Flowable.error<Long>(throwable)
}
}
}
}
}

And here is the companion test class (thanks to another ex-colleague Lukasz for a few nice tweaks here too) which is hopefully self-explanatory:

class BackoffRetryWhenTest {

private val t = Throwable()
private val testScheduler: TestScheduler = TestScheduler()
private lateinit var testSubscriber: TestSubscriber<Long>
private lateinit var flowableThrowable: PublishProcessor<Throwable>
private val list: MutableList<Long> = mutableListOf()

@Before
fun setUp() {
testSubscriber = TestSubscriber()
flowableThrowable = PublishProcessor.create()
}

@Test
fun testDefaultBackoff() {
val backoffRetryWhen = BackoffRetryWhen(scheduler = testScheduler)
backoffRetryWhen.apply(flowableThrowable).subscribe(testSubscriber)

LongRange(1, 3).forEach { advanceAndAssert(it * it) }
repeat(180) { advanceAndAssert(10L) }

flowableThrowable.onNext(t)
testSubscriber.assertError(t)
}

@Test
fun testCustomValuesBackoff() {
val backoffRetryWhen = BackoffRetryWhen(maxBackoffRetries = 5, maxFixedDelayRetries = 5, fixedDelayDuration = 5L, scheduler = testScheduler)
backoffRetryWhen.apply(flowableThrowable).subscribe(testSubscriber)

LongRange(1, 5).forEach { advanceAndAssert(it * it) }
repeat(5) { advanceAndAssert(5L) }

flowableThrowable.onNext(t)
testSubscriber.assertError(t)
}

private fun advanceAndAssert(i: Long) {
flowableThrowable.onNext(t)
testScheduler.advanceTimeBy(i, TimeUnit.SECONDS)
mList.add(i)
testSubscriber.assertValueCount(mList.size)
testSubscriber.assertValues(*mList.toTypedArray())
testSubscriber.assertNoErrors()
}
}

Actually using the BackoffRetry is also really easy:

val backoffRetryWhen = BackoffRetryWhen()

myRxFlowable.doOnError { //Handle error }
.subscribeOn(subscribeOnScheduler)
.retryWhen { flowableThrowable -> backoffRetryWhen.apply(flowableThrowable) }
} ...
//Remaining Rx flows

I assign the back off retry to a variable before using it so it doesn’t keep creating a new instance each time we retry.

Hopefully, the code speaks for itself but if there is anything unclear please raise some comments. If you found this useful some claps and shares are always greatly appreciated.

Like what you read? Give Barry Irvine a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.