Rx java timeout improvement with Kotlin

Nicolas Duponchel
2 min readMar 4, 2018

--

In reactive programming, Rx offers the possibility to add timeout on what you subscribe by. The basic use case consists on making a stream throwing a TimeoutException when it doesn’t emit a result in the due time.

Let’s take the case of a Completable and have a look at Rx timeout method. “It returns a Completable that runs this Completable and emits a TimeoutException in case this Completable doesn’t complete within the given time.” It takes two params : the timeout and the unit.

val completable = Completable
.create { /* your action */ }
.timeout(10, TimeUnit.SECONDS)
.subscribeBy(
onComplete = { },
onError = { /* called if your action doesn't finish before 10s */ }
)

In fact, if my emiter doesn’t complete within the due time, onError would be called with a Throwable which would be a TimeoutException.

onError = {
if (it is TimeoutException) Log.d(TAG, "action doesn't complete within the given time")
else Log.d(TAG, "other error")
}

Control the timeout

For other reasons, we might prefer that the stream deliver other results. For example, you can choose to emit your custom exception instead of the Rx default one. Rx timeout method can also take a third parameter which is an other CompletableSource. This time it “returns a Completable that runs this Completable and switches to the other Completable in case this Completable doesn’t complete within the given time.

.timeout(10, TimeUnit.SECONDS, Completable.create { /* other action*/ })

In my case I prefer keeping the default one, but the TimeoutException throwed by timeout function doesn’t have any message. And I need one to improve my Logs (see perfecting logs with rxKotlin), So I use the previous function, and I switch to a new Completable that emits a TimeoutException with my custom message.

.timeout(10, TimeUnit.SECONDS, Completable.error(TimeoutException("My custom message")))

Finally here is my custom kotlin extension function.

data class Duration(val duration: Long, val timeUnit: TimeUnit)

fun Completable.timeout(duration: Duration, message: String): Completable =
timeout(duration.duration, duration.timeUnit, Completable.error(TimeoutException(message)))

--

--