RxJava — Handling Errors Like a Pro

TC Wang
SodaLabs
Published in
8 min readApr 21, 2019
Image result for handle error in rxjava

In this article, we aim to answer the following questions.

  1. Why is an error event also a terminate event in RxJava?
  2. How can we bring the reactive streams back alive after they are terminated by an error?
  3. What are some strategies for handling error events in RxJava?

Note: Most of the code snippets are written in Kotlin.

Why is an Error Event also a Terminate Event in RxJava?

You might be asking, “Why does throwing an error cause a terminate event in RxJava?”. Let’s use a coffee machine as an example to explain how RxJava propagates the error and terminates the reactive stream.

Coffee Machine with a Timeout

In the above code snippet, we connect the button clicks Observable to a brewCoffee() Observable with a timeout of 3 minutes. This means that an error will be raised if the brewCoffee() takes more than 3 minutes to finish its job.

The function brewCoffee is intentionally designed to be error-prone. It will either randomly take up to 5 minutes to brew coffee or randomly throw a RuntimeException. This is obviously a bad coffee machine, but luckily is great for us to observe errors!

By connecting the clicks with a brewCoffee, the final subscription block will have three message channels. These are onNext, onError, and onComplete, and are established when subscribe is called.

If a click event is successfully converted to a coffee, we will observe a successfully brewed coffee in the final subscription block via the onNext() channel. If an Exception is thrown, it is propagated via theonError() channel. Similarly, the onComplete() channel is used when the stream is disposed of and will no longer emit values.

But wait! There is a secret channel connecting the reactive streams in the reverse direction. The onDispose() channel will connect back up the chain as shown by the blue lines in the following figure.

When the downstream (the rightmost block) subscribes to the upstream (the leftmost block), the upstream passes a Disposable down so that the downstream can stop listening for values from the upstream. This is done by calling the Disposable’s dispose() method.

When brewCoffee raises an error, the final subscription block sees the error via onError() channel, AND THEN the final subscription block may call Disposable’s dispose() method.

We use “may” here because the community decided not to call dispose() in LambdaObserver when terminating starting in RxJava 2.2.5. You can find the change here.

In RxJava, most of the Observers call Disposable’s dispose() as they get disposed. Therefore, an error becomes a chain reaction from bottom to top causing all of the blocks to terminate.

All of the streams are terminated and thus the streams do not emit any new values. That is why an error event is also a terminate event in RxJava.

How can we bring the reactive streams back alive after they are terminated by an error?

RxJava provides many different error handlers when an error is thrown inside of a stream. Two of the most basic but useful Observers are retry() and retryWhen().

Disclaimer: As we mentioned previously, not all of the Observers call Disposable’s dispose() method when they see an error via theonError channel. Some Observers prevent the terminating chain reaction from happening.

The retry() Operator

In the above figure, the is an error event. The retry() operator responds to an onError notification from the source Observable by resubscribing to the source Observable rather than passing that call through to its observers. This means that the ROOT SOURCE will be re-subscribed.

In our coffee machine example, adding a retry() right after the switchMapSingle source Observable will prevent the reactive stream from terminating by resubscribing to the clicks source Observable.

You might be asking, “What happens if we add retry() after brewCoffee()”? In that case, it will immediately attempt to brew the coffee again when it sees an error. This is because the retry resubscribes to the Single of brewing coffee. We should be careful because it is possible to be trapped in an infinite loop in these inner cases. In our example here, we could safely resubscribe since the Single.fromCallable throws errors at random.

Many times, it’s not always bad to rebrew the coffee. In other words, you may want to retry the action but after a short delay or within a set amount of retry attempts. In this case, you should use the retryWhen() operator.

The “retryWhen()” Operator

In the above figure, the is the error event. The retryWhen operator is similar to retry, but gives you an error source Observable and lets you decide whether or not to resubscribe to upstream. The text explanation of the official documentation [link] is quite long and hard to read, so let’s look at the code snippet to get a better understanding.

In the above code snippet, we apply a back-off to the brewCoffee() with 3 retry attempts where each attempt is delayed by 5 seconds to the power of the attempt number (5¹ = 5s, 5² = 25s, 5³ = 125s).

Since range(1, 3) runs out of numbers on the fourth error, it calls onCompleted(), which causes the entire zip to complete. This prevents further retries.

The Differences of Where to use “retry()” and “retryWhen()”

We have seen two places in which retry() and retryWhen() are used. These locations serve as different strategies for managing the reactive stream. In general, we’d suggest placing retry() after a hot stream and retryWhen() after a cold stream.

Case 1, the (network) back-off:

Place retryWhen() right after the Observable/Single/Maybe, i.e. a cold stream, whenever you want to perform a back-off. This retryWhen provides a back-off with a limited number of retry attempts but does not protect the entire reactive stream from terminating.

Case 2, the safeguard:

Place retry() right before the final subscription block when you want more of a stream safeguard. This retry() prevents the reactive stream from terminating. However, for hot streams, it does not necessarily provide the same back-off strategies. This is because the first source Observable in a hot stream usually does not emit values upon resubscribing (replaying or repeating).

It’s ok to add retry operator here since our upstream doesn’t emit values when resubscribing. However, for some upstream that emits values every time when an observer subscribes to it, we’d suggest you use the retryWhen operator.

Case 3, back-off and safeguard:

You could use both the ways above to reinforce the error tolerance of your reactive stream.

Last, But Not Least

When using flatMap or switchMap you will have a nested stream inside of these operators. If you want to useretry on the nested source observable, that retry will be applied to the source inside the operator. For example:

Retry is layered by flatMap/switchMap
  • The retry at position #1:

The root source being resubscribed by retry is the Single returned by brewCoffee().

  • The retry at position #2:

The root source being resubscribed by retry here is the hot stream, debounceClicks.

Inside the flatMap and switchMap, you are creating a new substream. Therefore, the root source given to the retry is the source of that new substream.

Nested stream

Strategies for Handling Errors

Strategy 1, reactive state:

As Jake Wharton recommends in the following talk [link], you can convert the result from the source Observable into UI states. Then you can handle the resulting state in the final subscription block. For example:

Implementing this strategy leads us into the world of modern programming and gives us advantages such as:

  • The input event from the UI is handled and transformed to the UI states with a lifecycle, e.g. the beginning, in-progress, and finishing states.
  • The UI states are immutable.
  • The UI states are handled sequentially and can be merged (or reduced).
  • It’s clear that the final subscription block is the place in which the UI states are presented to the users.

It also comes at one cost:

  • The error handling code is spread in the subscription blocks. In other words, it’s not that scalable as more kinds of errors are taken care of.

Strategy 2, redirect:

Redirect the error to other places for central management. For example:

This way gives us an advantage:

  • Flexibility to modularize the error handling component in the architecture.
  • Able to handle the errors with a sub-reactive stream. e.g. Reset the domain states.

The downside is:

  • Empty subscribe().

Summary

Error handling can be quite complicated in an enterprise application. Most likely, you will want to reset your domain states based on the errors. There are many practices of handling errors, therefore, we suggest finding what works for you. However, here are some tips for dealing with errors:

  1. Define the known/expected errors.
  2. Bucket the known/expected errors.
  3. Let the onError() channel to propagate errors.
  4. Protect the reactive stream from terminating regardless of what error happens.
    e.g. Use both retry() and retryWhen().
  5. Determine what to do with the known/expected errors.
    e.g. Inline in the subscription block or redirect?

Some reference related to error handling in RxJava

I hope this article inspires you to handle errors in RxJava better 😀. Please support this guide with your 👏👏👏 using the clap button and help it spread to a wider audience 🙏.

--

--

TC Wang
SodaLabs

I’m an engineer who loves solving problems and science. You could find me on my LinkedIn page, https://www.linkedin.com/in