Error handling for Flux
Earlier, we saw how to check/ verify errors in the stream.
Flux<String> strFlux = Flux.just("These","Strings","will","create","flux")
.concatWith(Flux.error(new RuntimeException("Exception Occured")))
.concatWith(Flux.just("After Error"));StepVerifier.create(strFlux)
.expectNext("These","Strings","will","create","flux")
.expectError(RuntimeException.class)
.verify();
Now let’s see how to handle the error.
Sending a default value in case of error
- onErrorResume() method can be used to return a default Flux object if any error occurs. The consumer will not know at all that any error occurred.
Flux<String> strFlux = Flux.just("These","Strings","will","create","flux")
.concatWith(Flux.error(new RuntimeException("Exception Occured")))
.concatWith(Flux.just("After Error"))
.onErrorResume((e) -> {
System.out.println("Exception: " + e);
return Flux.just("default value","ek aur default value");
});StepVerifier.create(strFlux)
.expectNext("These","Strings","will","create","flux")
.expectNext("default value","ek aur default value")
.verifyComplete();
- onErrorReturn() method can be used to return a default value instead of the whole Flux.
Flux<String> strFlux = Flux.just("These","Strings","will","create","flux")
.concatWith(Flux.error(new RuntimeException("Exception Occured")))
.concatWith(Flux.just("After Error"))
.onErrorReturn("default");StepVerifier.create(strFlux)
.expectNext("These","Strings","will","create","flux")
.expectNext("default")
.verifyComplete();
Transforming an error
- onErrorMap() method can be used to apply a transformation on the actual Exception object.
Flux<String> strFlux = Flux.just("These","Strings","will","create","flux")
.concatWith(Flux.error(new RuntimeException("Exception Occured")))
.concatWith(Flux.just("After Error"))
.onErrorMap((e) -> new CustomException(e));StepVerifier.create(strFlux)
.expectNext("These","Strings","will","create","flux")
.expectError(CustomException.class)
.verify();
- Notice here that the error is still being thrown. Just the object is transformed from RuntimeException to CustomException.
Retry
- retry() method can be used if we wanted to try again in case of errors.
Flux<String> strFlux = Flux.just("These","Strings","will","create","flux")
.concatWith(Flux.error(new RuntimeException("Exception Occured")))
.concatWith(Flux.just("After Error"))
.onErrorMap((e) -> new CustomException(e))
.retry(2);StepVerifier.create(strFlux)
.expectNext("These","Strings","will","create","flux")
.expectNext("These","Strings","will","create","flux")
.expectNext("These","Strings","will","create","flux")
.expectError(CustomException.class)
.verify();
- Notice here that the Consumer will only know of an error if the last try throws it. If 1st time or 2nd time error is thrown, the Flux will try again. As the retry count is 2, 3rd time will be the last. If 3rd time also error happens, it will be thrown to Consumer.
- This can be useful in the case of API calls. If a connection error occurred due to any network problem, we can use the retry() method to try again 2–3 times before throwing an error.
- In that case, we would ideally want to wait for a while before trying again.
- retryBackoff() method can be used for that.
Flux<String> strFlux = Flux.just("These","Strings","will","create","flux")
.concatWith(Flux.error(new RuntimeException("Exception Occured")))
.concatWith(Flux.just("After Error"))
.onErrorMap((e) -> new CustomException(e))
.retryBackoff(2,Duration.ofSeconds(5));StepVerifier.create(strFlux)
.expectNext("These","Strings","will","create","flux")
.expectNext("These","Strings","will","create","flux")
.expectNext("These","Strings","will","create","flux")
.expectError(IllegalStateException.class)
.verify();
- This method, however, doesn’t throw the original error after all retries. It will throw an Illegal StateException.