Error handling with reactive streams

Kalpa Senanayake
4 min readJul 14, 2019

Introduction

Reactive stream is a powerful technique to address the use cases we encounter in even-driven systems. It allows the developer a lot more flexibility than its counterparts. And more importantly empower the developer to focus on business logic , rather than wasting productivity on dealing with the underlying execution mechanism.

Here in this post we are going to discuss various error handling mechanisms offered through reactive stream fluent APIs. For this example we are using project reactor.

Exception and Errors

Before we dive deep we need to be clear about a few concepts. Exceptions and Errors are not unfamiliar to any Java developer. However, those two represent very different meanings.

Exceptions : An exception indicates conditions that an application might want to catch and the application might be able to recover from this condition gracefully.

Error: An Error indicates serious problems that an application should not try to catch or recover from. For an example OutOfMemoryError.

The same rules apply to reactive programming world as well, however since everything is a signal (whether it is a successful event or error) people use Errors when they refer to error signals in reactive streams. Hence do not get confused when we refer error signals as errors, these are not java.lang.Error s.

Error signals are first class citizens in reactive world.

Any error signal in reactive sequence is a terminal event

Meaning it will stop the sequence. Even if an error-handling operator is used, it does not allow the original sequence to continue. This is a really important fact that we need to understand while using reactive streams.

A simple error scenario

The following code has a Flux which emits event “A” followed by a RuntimeException (an error signal). And we are using StepVerifier to subscribe to the stream and evaluate the signals.

Nothing fancy about this code. But note that when we verify the sequence we have not used verifyComplete() instead we have used verify().

This is because as we discussed earlier the error signal terminate the sequence. The following note shows you how the event sequence occurred.

onSubscribe()
request(unbounded)
onNext(A)
onError(java.lang.RuntimeException: ERROR

There is no onComplete() event since the sequence never got completed.

Now let’s have a look on various scenarios of error handling.

Catch the error and return static value

This is a classic technique where we detect the error and return a static value on error as our handling mechanism.

The event sequence will be as follows, “A” and “C” will be emitted from the Flux.

onSubscribe()
request(unbounded)
onNext(A)
onNext(C)
onComplete()

Catch the error and execute alternative path.

In this case, we want the logic to detect the error signal and execute an alternative logic to get the result. For example, in the controller layer of a RESTFul API, we can just create a response entity with a generic error message while we log the error information to internal logs.

Event sequence

onSubscribe(FluxOnErrorResume.ResumeSubscriber)
request(unbounded)
onNext(A)
onNext(RUNTIME_EX)
onComplete()

Catch and re throw (Exception translation)

This is one of the most common patterns of error handling, where the handling logic translates the error in to meaningful business exceptions.

Event sequence is as follow

onSubscribe()
request(unbounded)
onNext(A)
reactor.Flux.OnErrorResume.1 - onError(reactive.errorHandling.CustomException: Error detected)

Catch and react on side

This pattern is equivalent to log the error details and let the error propagate to the upper layers. Logging is side effect which triggered by the presence of the error signal.

Event sequence shows the log message and we can still see the error signal is in play.

onSubscribe()
request(unbounded)
onNext(A)
reactive.errorHandling.ErrorHandlerTest - Something went wrong
onError(java.lang.RuntimeException: ERROR)

Using finally handler

In the imperative style of programming, we do use the finally {} block to execute the must-do stuff no matter how it ends in the core logic. Those use cases are still valid in the reactive world.

Reactive publishers provide doFinally(Consumer<SignalType> onFinally) method which can use to achieve the same.

It adds a side effect that triggered after a stream terminates for any reason. Stream terminating events are

SignalType.ON_COMPLETE 
SignalType.ON_ERROR
SignalType.CANCEL

Here in following example we use take(long n) method to simulate the cancel event. It lets request from downstream to propagate as is and cancels once N elements have been emitted. In this case N = 3.

The event sequence is interesting one and it shows the emission of 4 events and onComplete() but since the take operator make sure only 3 gets through and emit cancel signal final number becomes 3 events.

onSubscribe()
request(32)
onNext(A)
onNext(B)
onNext(C)
onNext(D)
onComplete()
reactive.errorHandling.ErrorHandlerTest - event number 1 emitted
reactive.errorHandling.ErrorHandlerTest - event number 2 emitted
reactive.errorHandling.ErrorHandlerTest - event number 3 emitted
cancel()
reactive.errorHandling.ErrorHandlerTest - Signal Type :CANCEL
reactive.errorHandling.ErrorHandlerTest - Final number of events emitted 3

Handling checked exceptions

So far all these techniques only demonstrate handling RuntimeExceptions but when we deal with other Java libraries it is common to have to deal with checked exceptions. The following example demonstrates one mechanism to handle those.

It uses Exceptions.propagate(Throwable t) method to convert the checked exception to an unchecked exception, then propagates through onError signal.

And more importantly, on the subscriber’s error handling logic it unwraps and extracts the original exception. This is handy when we want to execute the handling logic only for a particular type of exceptions. But please note that we can only use this when the wrapping is done via bubble() or propagate() methods.

The event sequence shows how things went on.

onSubscribe()
request(unbounded)
onNext(A)
reactive.errorHandling.ErrorHandlerTest - event received A
onNext(B)
reactive.errorHandling.ErrorHandlerTest - event received B
onNext(C)
cancel()
reactive.errorHandling.ErrorHandlerTest - Something went wrong during I/O operation

Conclusion

The given examples show how the classic imperative error handling techniques still can be used to build resilient applications in a reactive world. The beauty of that is, the developers do not have to re-wire their brain completely to think in a new way, rather it allows them to re-use existing knowledge to a new paradigm with a much more effective way.

Happy coding everyone!

--

--