Resiliency in Spring Reactive Applications

Manvendra Kumar
Walmart Global Tech Blog
10 min readMar 15, 2024

In the ever-evolving landscape of software development, the spotlight often shines on the functional features and capabilities of a new application or system. However, beneath the surface, there exists a critical dimension that plays an equally pivotal role in determining a software’s success: Non-Functional Requirements, or NFRs for short.

Here, we will discuss Resiliency, which is one of the most critical Non-Functional Requirements terminologies. Resiliency in software systems or applications is the unsung hero of software engineering. It is the silent architect of software quality, ensuring that your application not only works but works exceptionally well. These requirements encompass characteristics that often remain invisible to end-users but have a profound impact on their experience, the system’s performance, and the organization’s reputation. To address the need for resilience, developers have created various patterns and techniques for building software that can withstand failures.

In this article, we will implement a sample Reactive application using WebClient with Retry, Bulkhead, and circuit breaker features using Spring Cloud Starter Circuitbreaker Reactor Resilience4j.

Spring Cloud Starter Circuitbreaker Reactor Resilience4j: Spring Cloud Starter Circuitbreaker Reactor Resilience4j is a powerful Java library that provides developers with several resilience patterns to build robust and fault-tolerant applications. Among these patterns, Retry, Circuit Breaker, Rate Limiter, and Bulkhead are some of the most commonly used. In this article, we will dive into these patterns and see how they can help us design and implement highly resilient Java applications with Spring Webflux.

Java applications have multiple common ways to apply resilience, including Circuit Breaker, Retry, Rate Limiter, and Bulkhead. This article explores each technique in-depth and discusses how they can be implemented using popular Java frameworks such as Spring Webflux and Spring-Cloud-Starter-Circuitbreaker-Reactor-Resilience4j.

Circuit Breaker behaviour between consumer and server using webclient

Circuit Breaker

In the software paradigm, it is very common to make remote calls to different services running on different machines or processes across the network. There is a chance that these remote calls might fail due to network connectivity issues or long-running processes on the remote machine. It is not a good idea to hold the caller on an unresponsive supplier, sender, or server as this can lead to the caller running out of critical resources. This can result in cascading failures across multiple systems that are being handled.

One of the key differences between remote calls and in-memory calls is that remote calls can hang without a response until a timeout limit is reached or can fail due to network issues. To prevent such catastrophic cascades, we can utilize the circuit breaker design pattern.

The fundamental idea behind the circuit breaker is very simple. You wrap a protected function call in a circuit breaker object, which monitors for failures defined in the circuit breaker configuration. Once the failures reach a certain threshold, the circuit breaker trips, and all further calls to the circuit breaker return with an error, without the protected call being made at all. It is advisable to implement some kind of alerting when the circuit breaker trips (CLOSED -> OPEN -> HALF-OPEN -> CLOSED).

Bulk-Head:

The Bulkhead pattern is a microservice design approach that effectively mitigates the impact of a failing service by dividing the system into distinct compartments known as “bulkheads”. Each bulkhead houses a set of services that are segregated from the rest of the system, ensuring that if one bulkhead experiences failure, the other bulkheads can continue to function without any disruption.

Implementing the Bulkhead pattern offers several advantages, such as enhanced fault tolerance, improved availability, and increased scalability. By isolating the effects of a failing service, this pattern effectively prevents the propagation of failures that could potentially bring down the entire system. Moreover, the ability to independently scale individual services allows for optimized resource utilization and cost reduction.

Retry:

The Retry pattern, as its name implies, involves the act of repeatedly attempting designated operations until they succeed or given number of times.

A common scenario for employing this pattern is retrying a request until it achieves the desired outcome. However, it is important to be cautious of potential pitfalls. Excessive retry attempts can exacerbate issues or hinder the recovery of the service or downstream services involved.

Prerequisites

  • Spring Boot webflux 2.6.6 or above
  • Maven 3.6.+
  • Java 11 or later
  • Spring-Cloud-Starter-Circuitbreaker-Reactor-Resilience4j 2.6.6 or above

Getting Started

We will start by creating a simple Spring Boot Weflux project from start.spring.io, with the following dependencies: Spring Reactive Web, Spring-Cloud-Starter-Circuitbreaker-Reactor-Resilience4j, Lombok, and Validation.

Sprint initializer with required dependencies

Project Structure

Here is our project structure:

Project structure from intelliJ IDE

Configuration

First, we need to configure the WebClient, Circuit Breaker, Bulkhead, and Retry objects.

Web-Client Configuration

In reactive applications, we need to use WebClient to make any downstream call. WebClient provides a non-blocking, reactive client for performing HTTP requests. It exposes a fluent, reactive API over underlying HTTP client libraries such as Reactor Netty.

WebClient Config

@Bean("sslWebclient") 

public WebClient createWebClient() {



HttpClient httpClient =

HttpClient.create()

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30)

//ChannelOption allows to configure a ChannelConfig in a type-safe

// way. Which ChannelOption is supported depends on the actual implementation

// of ChannelConfig and may depend on the nature of the transport it belongs.



.doOnConnected(

conn ->

conn.addHandlerFirst(new ReadTimeoutHandler(30, TimeUnit.SECONDS))

.addHandlerFirst(new WriteTimeoutHandler(30, TimeUnit.SECONDS)))

//Set or add a callback called after Connection has been connected

.option(ChannelOption.SO_KEEPALIVE, true) //A keepalive (KA) is a message sent by one device to another to check that the link between the two is operating, or to prevent the link from being broken

.option(EpollChannelOption.TCP_KEEPIDLE, 300)

.option(EpollChannelOption.TCP_KEEPINTVL, 60)

.option(EpollChannelOption.TCP_KEEPCNT, 10)

.secure(

spec -> spec.sslContext(DefaultSslContextSpec.forClient())

.handshakeTimeout(Duration.ofSeconds(3)));



//If dont want to enabled SSL in test env, use below configuration

// SslContext sslContext =

// SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();

// HttpClient httpClient = HttpClient.create().secure(t -> t.sslContext(sslContext));

// ClientHttpConnector httpConnector = new ReactorClientHttpConnector(httpClient);

// return WebClient.builder()

// .clientConnector(httpConnector)

// .codecs(

// clientCodecConfigurer ->

// clientCodecConfigurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024))

// .build();

// }





ClientHttpConnector httpConnector = new ReactorClientHttpConnector(httpClient);

return WebClient.builder()

.clientConnector(httpConnector)

.codecs(

clientCodecConfigurer ->

clientCodecConfigurer.defaultCodecs().maxInMemorySize(10 * 1024 * 1024))

// Codecs provides feature to override underlying webclient exchangeStrategies.

//Extension of CodecConfigurer for HTTP message reader and writer options relevant on the client side

.build();

}

In the WebClient configuration, you can override the above configuration based on your use-case. For example, if you want to test the connection in a test environment, you can disable the TrustManager.

Circuit-Breaker Configuration

We need to create our first ReactiveResilience4JCircuitBreakerFactory bean with timer and circuitBreaker configurarion.

ReactiveCircuitBreakerConfig

@Bean 

public Customizer<ReactiveResilience4JCircuitBreakerFactory>

reactiveResilience4JCircuitBreakerFactoryCustomizer() {

return factory -> {

factory.configure(

builder ->

builder

.circuitBreakerConfig(getCircuitBeakerConfig())

.timeLimiterConfig( TimeLimiterConfig.custom().timeoutDuration(Duration.ofMillis(5000)).build())//This is optional in circuit breaker. If you wanted to override timeout then enable else default timeout value will be picked.,

"circuitBreakerFactory");

};

}

The ReactiveResilience4JCircuitBreakerFactory class is the factory that creates Reactive Circuit-Breaker config with all default configurations.

The second step is to create a custom circuit breaker configuration based on the usage of the service or application. Here, we can override multiple configurations to suit our specific use-case.

Custom CB config

private CircuitBreakerConfig getCircuitBeakerConfig() { 

return io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.custom()

.failureRateThreshold(

50) // Configures the failure rate threshold in percentage. If the failure rate is equal

// to or greater than the threshold, the CircuitBreaker transitions to open and

// starts short-circuiting calls.

.slowCallRateThreshold(

50) // Configures a threshold in percentage. The CircuitBreaker considers a call as slow

// when the call duration is greater than slowCallDurationThreshold(Duration).

.slowCallDurationThreshold(

Duration.ofMillis(

1000)) // Configures the duration threshold above which calls are considered as slow

// and increase the slow calls percentage

.waitDurationInOpenState(

Duration.ofMillis(

3000)) // Configures an interval function with a fixed wait duration which controls

// how long the CircuitBreaker should stay open, before it switches to half

// open

.minimumNumberOfCalls(

10) // Configures the minimum number of calls which are required (per sliding window

// period) before the CircuitBreaker can calculate the error rate

.slidingWindowType(

io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowType

.TIME_BASED) // CircuitBreakerConfig.SlidingWindowType.COUNT_BASED

// Configures the type of the sliding window which is used to record the outcome of calls

// when the CircuitBreaker is closed. Sliding window can either be count-based or

// time-based.

.slidingWindowSize(

10) // Configures the size of the sliding window which is used to record the outcome of

// calls when the CircuitBreaker is closed. slidingWindowSize configures the size of

// the sliding window

.permittedNumberOfCallsInHalfOpenState(

10) // Configures the number of permitted calls when the CircuitBreaker is half open.

.recordExceptions(

Exception

.class) // Configures a list of error classes that are recorded as a failure and

// thus increase the failure rate. Any exception matching or inheriting from

// one of the list should count as a failure, unless ignored via

// ignoreExceptions(Class[]) or ignoreException(Predicate).

.writableStackTraceEnabled(

true) // Enables writable stack traces. When set to false, Exception.getStackTrace()

// returns a zero length array

.build();

}

BulkHead Configurations

We need to create our first BulkheadConfig bean that can control the max concurrent requests to any other services or applications.

bulk-head configuration

@Bean 

public BulkheadConfig bulkHeadConfig() {

return BulkheadConfig.custom()

.maxWaitDuration(Duration.ofMillis(5000)) //change based on your usecase

.maxConcurrentCalls(10) //change based on your usecase

.fairCallHandlingStrategyEnabled(

true) // Indicates whether FairSync or NonfairSync should be used in Semaphore. When set

// to true, a fair call handling strategy is used. It guarantees the order of

// incoming requests (FIFO) based on internal queue. When set to false, an non

// fair strategy will be used which does not guarantee any order of calls

.writableStackTraceEnabled(

true) // Enables writable stack traces. When set to false, Exception.getStackTrace()

// returns a zero length array

.build();

}

Here, we can override Bulkhead configurations based on our application use-case. For example, we can set the maxWaitDuration in case the Bulkhead is full or configure the maxConcurrentCalls to control the maximum concurrent request calls.

Reactive Retry Configurations

We need to create our first RetryConfig bean that can be used to retry failed requests based on certain criteria set in the configuration. Sometimes, due to network glitches, requests do not reach the remote server and time out. These requests can be retried within a specific time duration before marking them as failed. This way, customers don’t need to retry from their end if the requests are already being retried by the service itself.

Retry Configuration

@Bean 

public RetryConfig retryConfig() {

return RetryConfig.custom()

.maxAttempts(3)//the maximum allowed attempts to make

.intervalFunction(

IntervalFunction.ofExponentialBackoff(

Duration.ofMillis(500), 2, Duration.ofMillis(2000)))//Set a function to modify the waiting interval after a failure. By default the interval stays the same

.retryExceptions(Exception.class)//Configures a list of error classes that are recorded as a failure and thus are retried

.build();

}

Note: You can override these values based on your use-case.

Implementation Of Circuit-Breaker In Spring Webflux Application

  1. Create separate Function of Bulkhead and RetryConfig to supply these methods at the time of chain subscriptions.

biFunction config

  private final Function<? super Mono<Object>, ? extends Publisher<Object>> bulkHeadTransformer; 

private final Function<? super Mono<Object>, ? extends Publisher<Object>> retryTransformer;


this.bulkHeadTransformer = BulkheadOperator.of(Bulkhead.of("bulkhead-config", bulkheadConfig));

this.retryTransformer = RetryOperator.of(Retry.of("retry-config", retryConfig));

Note: retryConfig & bulkheadConfig are the config Beans that should be created before Autowiring in Reactive Function here.

2. Create ReactiveCircuitBreaker Beans that was defined previously as a part of configuration.

ReactiveCircuitBreaker Config bean

private final ReactiveCircuitBreaker reactiveCircuitBreaker; 

this.reactiveCircuitBreaker = circuitBreakerFactory.create("circuitBreakerFactory"); //Name of reactiveCircuitBreaker config defined in configuration, should be same as given.

3. Implement ReactiveCircuitBreaker with BulkHead and request Retry.

CircuitBreaker Implement

public Mono<HealthResponse> processDownstreamServiceHealth() { 

return webClient

.get()

.uri("http://127.0.0.1:8080/helloWorld")

.header("content-type", MediaType.APPLICATION_JSON_VALUE)

.exchangeToMono(

clientResponse -> {

log.info(clientResponse.statusCode() + " " + LocalDateTime.now());

return clientResponse.bodyToMono(HealthResponse.class);

})

.transformDeferred(

bulkHeadTransformer) // Defer the given transformation to this {@link Mono} in order to

// generate a target {@link Mono} type.

// A transformation will occur for each {@link Subscriber}.

.transformDeferred(retryTransformer)

.transform(

objectMono ->

reactiveCircuitBreaker.run(

objectMono,

throwable -> {

if (throwable instanceof CallNotPermittedException) {

log.error(throwable);

return Mono.error(new Exception(throwable.getMessage()));

} else if (throwable instanceof BulkheadFullException) {

log.error(

"Bulkhead is full, request can not be processed at this moment:{}",

throwable.getMessage());

return Mono.error(new Exception(throwable.getMessage()));

}

log.error(

"Error while processing downstream health:{}", throwable.getMessage());

return Mono.error(new Exception(throwable.getMessage()));

})); // Transform this {@link Mono} in order to generate a target {@link Mono}.

// Unlike {@link #transformDeferred(Function)}, the provided function is

// executed as part of assembly.

}

Testing

With 10 Minimum number of calls, Circuit will be OPEN and no more request to downstrean service.

CB Testing

2023–08–08T17:24:23.157+05:30 ERROR 24230 — — [ctor-http-nio-2] c.j.r.c.S.service.HealthService : io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker ‘circuitBreakerFactory’ is OPEN and does not permit further calls

After specified delay in OPEN state, Circuit-Breaker will move to HALF-OPEN state for configured duration and it will evaluate the status of CB with specified number of Calls.

HALF_OPEN State Log Message:

CB Half Open testing

If the threshold is breached and exceptions/errors continue to occur, the request will not be allowed. After a given duration, the circuit breaker will be moved to the OPEN state again.

Bulkhead Testing:

Health request is failed due to: Bulkhead ‘bulkhead-config’ is full and does not permit further calls.

This error will continue to occur for the duration configured as part of the bulkhead, i.e., maxWaitDuration().

Retry Mechanism Testing:

  1. Request got failed at 5:56:39.048
  2. Retry request was executed at 5:56:39.689 After 500ms delay
  3. Next retry was executed at 15:56:40.701 After 1000ms delay as we have configured exponential retryable policy.

2023–08–10T15:56:39.048+05:30 INFO 19282 — — [ctor-http-nio-3] c.j.r.c.S.service.HealthService : 500 INTERNAL_SERVER_ERROR 2023–08–10T15:56:39.045318
2023–08–10T15:56:39.689+05:30 INFO 19282 — — [ctor-http-nio-3] c.j.r.c.S.service.HealthService : 500 INTERNAL_SERVER_ERROR 2023–08–10T15:56:39.689169
2023–08–10T15:56:40.701+05:30 INFO 19282 — — [ctor-http-nio-3] c.j.r.c.S.service.HealthService : 500 INTERNAL_SERVER_ERROR 2023–08–10T15:56:40.701377

Conclusion

In today’s software development, non-functional requirements play a crucial role. They help ensure that the software is not only functional but also efficient, secure, and scalable. As software development continues to evolve, it is important to keep these requirements in mind to deliver high-quality products. We recommend that software developers and engineering managers carefully consider the non-functional requirements for each project and prioritize them accordingly. By doing so, they can deliver software that meets the needs of their customers and the demands of the market. Let’s strive for excellence in software development by prioritizing non-functional requirements as well.

What Next:

Distributed tracing is a method of tracking application requests as they flow through applications that include any database calls, method calls, and upstream service calls, among others. Developers can use distributed tracing to troubleshoot requests that require end-to-end tracing for business use cases, intermittent errors, or high latency. In this article, we will discuss how distributed tracing works in reactive applications and the challenges that developers generally face during its implementation.

References:

  1. Github Spring-Cloud-Starter-Circuitbreaker-Reactor-Resilience4j.zip
  2. Spring Cloud Circuit Breaker documentation: https://cloud.spring.io/spring-cloud-circuitbreaker/reference/html/index.html
  3. https://spring.io/reactive/

--

--