Deep Dive into Reactive Programming with Spring Boot

Using Reactive Programming with Spring Boot

Kushagrasehgal
Simform Engineering
8 min readJun 16, 2023

--

Reactive Programming with Spring Boot

Reactive programming is an approach to handling asynchronous and event-based programming that has gained popularity in recent years due to the rise of real-time data-intensive applications. The reactive programming model allows developers to build more efficient, scalable, and resilient systems.

In this blog, we’ll explore using reactive programming with Spring Boot, one of the most popular Java frameworks for building web applications.

So before we start, let’s understand what Reactive Programming is,

What is Reactive Programming?

  1. Reactive programming is a design approach that uses asynchronous programming logic to handle real-time adjustments. So the core of reactive programming is a data stream that we can observe and react to, even apply back pressure as well. In plain terms, reactive programming is about non-blocking applications that are asynchronous and event-driven and require a small number of threads to scale.
  2. A key aspect is to understand the concept of backpressure, which is a mechanism just to ensure producers don’t overburden consumers

Why is a need for Reactive Programming?

Several modern applications today require the ability to handle multiple concurrent or simultaneous requests. Therefore, traditional methods are inadequate to handle these operations.

To move forward, we need to embrace the principle that software should be developed by utilizing hardware efficiently, which has been a part of the software engineering community for a long time.

Reactive programming on the server side allows web applications or server-side applications to perform and scale more efficiently. By utilizing this structure, server-side applications can handle multiple requests asynchronously, improving performance, increasing scalability, and handling high-user traffic.

Let’s understand the Components of Reactive Programming:

The Publisher creates an event or message for every result obtained, and it can send the Subscriber (n) numbers of values by calling onNext, but the important thing to keep in mind here is that it can also signal an error by calling onError, which can terminate the sequence, while if onComplete is called, the sequence is terminated by default.

In Reactor, Flux, and Mono, there are two primary primitive publishers.

Flux: A Flux Publisher in Reactor publishes data or event and can produce from 0 to N values asynchronously.

Mono: There is only one value that a Mono publisher can produce, or perhaps none at all.

Reactive framework support in Spring with Spring Web Flux

  1. Spring Web Flux is a reactive programming model introduced by Pivotal in Spring 5.
  2. It provides an asynchronous, non-blocking, and event-driven architecture for building web applications that are resilient and responsive.
  3. It enables developers to build applications that can handle high loads of traffic without compromising on performance.
  4. The framework is based on the Reactive Streams specification and provides support for both client-side and server-side development.
  5. With the help of Spring Web Flux, developers can create applications that are more efficient and scalable than traditional ones.

To start with using Web Flux with spring boot we will require below dependency

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.0.5</version>
</dependency>

Once you have added the dependency, you can start building reactive applications using the annotations provided by Spring WebFlux.

Here’s an example of how to use Reactive programming with Spring Boot:

1. Project Structure

2. Dependencies in pom.xml

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>1.5.3.Final</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>1.5.3.Final</version>
</dependency>
</dependencies>

The example has basically 2 endpoints save the user and get all users

3. Entity RestController along with Service and Repository

@Data
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Document(value = "user")
public class User {
@Id
private String id;
private String name;
private String email;
private String phone;
private String address;
private String city;
private String state;
private String postalCode;
private Boolean isPreferred;
private LocalDate birthDate;

}
@RestController
@RequestMapping("/api/user")
public class UserController {

final UserService userService;

public UserController(UserService userService) {
this.userService = userService;
}

@PostMapping
public Mono<UserResponse> saveUser(@RequestBody UserRequest request)
{
return userService.saveUser(request);
}

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<UserResponse> retrieveUsers() {
return userService.retrieveUsers();
}

}

@Service
public class UserService {
final UserMapper userMapper;

final UserRepository userRepository;

public UserService(UserMapper userMapper, UserRepository userRepository) {
this.userMapper = userMapper;
this.userRepository = userRepository;
}

public Mono<UserResponse> saveUser(UserRequest request) {
User user = userMapper.toUser(request);
Mono<User> customerMono = userRepository.save(user).log();
return customerMono.map(userMapper::toUserResponse);
}

public Flux<UserResponse> retrieveUsers() {
return userRepository.findAll().log().map(userMapper::toUserResponse);
}

}

@Repository
public interface UserRepository extends ReactiveCrudRepository<User, String> {
}

4. I am using mapper for the conversion of request to entity and entity back to response

@Mapper(
componentModel = "spring"
)
public interface UserMapper {
UserResponse toUserResponse(User destination);
User toUser(UserRequest request);
}

5. Properties file

spring:
data:
mongodb:
authentication-database: admin
database: demo_db
host: localhost
password: root
port: 27017
username: root

6. That completes the coding part now let’s have a look what happens when we hit the save record

curl --location 'http://localhost:8080/api/user' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "John Doe",
"email": "jd@gmail.com",
"phone": "555-555-5555",
"address": "456 Oak St",
"city": "Chicago",
"state": "IL",
"postalCode": "60601",
"isPreferred": false,
"birthDate": "09/01/1985"
}'
Components: Behind the scenes

As we got to know how components are acting, it’s important to understand the Backpressure mechanism.

Understanding Backpressure in Reactive Programming

In Reactive programming, backpressure is a mechanism that allows the consumer of data to control the rate at which the producer produces data. This is important because, in many scenarios, the producer may be generating data faster than the consumer can process it, leading to a buildup of unprocessed data, which can cause memory and performance issues.

Backpressure enables the consumer to signal the producer to slow down or stop producing data until the consumer is ready to accept more. This can be implemented using various techniques such as buffering, dropping data, or requesting the producer to slow down.

For example, in a stream processing application, if the producer is generating data at a high rate, the consumer may not be able to keep up with the rate of data consumption. To avoid data overload, the consumer can signal the producer to slow down by applying back pressure. The producer will then pause or slow down until the consumer is ready to accept more data.

Backpressure ensures that both producers and consumers can operate at their optimal processing rates without causing any bottlenecks or overloading the system.

Let’s modify the GET method to make something like

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<UserResponse> retrieveUsers() {
return userService.retrieveUsers()
.onBackpressureBuffer(10, BufferOverflowStrategy.DROP_OLDEST)
.delayElements(Duration.ofMillis(100))
.log();
}

The onBackpressureBuffer(10, BufferOverflowStrategy.DROP_OLDEST) operator is then applied to the Flux, which limits the buffer size to 10 elements and uses a buffer overflow strategy of dropping the oldest elements when the buffer becomes full. This means that if the downstream subscriber cannot keep up with the rate of emissions, the buffer will store up to 10 elements before it starts dropping older elements to make room for newer ones.

Finally, the delayElements(Duration.ofMillis(100)) operator is applied to the Flux, which adds a delay of 100 milliseconds before emitting each element. This means that the UserResponse objects emitted by the Flux will be spaced out by at least 100 milliseconds, which can help control the rate of emissions and prevent backpressure issues.

To mock this situation and test the backpressure and delay of elements, you can create a test and use a mocking framework like Mockito to create a mock implementation of userService.retrieveUsers(). You can then use the StepVerifier class from Reactor to subscribe to the Flux returned by retrieveUsers() and verify that the backpressure and delay are working as expected.

package com.simformsolutions.user;

import com.simformsolutions.user.controller.UserController;
import com.simformsolutions.user.model.response.UserResponse;
import com.simformsolutions.user.service.UserService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.time.Duration;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class UserControllerTest {

@Mock
private UserService userService;

@InjectMocks
private UserController controller;

@Test
void testRetrieveUsersBackpressureAndDelay() {
// Create a mock Flux of UserResponse objects with 1000 elements
// Not considering other values like address, email etc for test case
Flux<UserResponse> mockFlux = Flux.range(1, 1000)
.map(i -> UserResponse.builder()
.id(String.valueOf(i))
.build()
);

// When the retrieveUsers method is called on the userService mock,
// return the mockFlux we created above
when(userService.retrieveUsers()).thenReturn(mockFlux);

// Subscribe to the Flux returned by the controller's retrieveUsers method
Flux<UserResponse> result = controller.retrieveUsers();

// Verify that the backpressure and delay are working as expected
StepVerifier.create(result)
.expectSubscription()
.thenRequest(5) // Request the first 5 elements
.expectNextCount(5) // Expect to receive 5 elements
.thenAwait(Duration.ofMillis(100)) // Wait for 100ms before requesting more elements
.thenRequest(5) // Request the next 5 elements
.expectNextCount(5) // Expect to receive 5 more elements
.thenCancel()
.verify();
}

@Test
void testRetrieveUsers_BufferOverflow() {
// Given
int numUsers = 100;
when(userService.retrieveUsers()).thenReturn(Flux.range(1, numUsers)
.map(i -> UserResponse.builder().id(String.valueOf(i)).name("User "+ i).build()));
// When
Flux<UserResponse> userResponseFlux = controller.retrieveUsers();

// Then
StepVerifier.create(userResponseFlux)
.expectSubscription()
.expectNextCount(10) // With a backpressure buffer size of 10, we expect to receive the first 10 users
.thenAwait(Duration.ofMillis(1000)) // Wait for the buffer to overflow
.expectNextCount(0) // We should not receive any more users after the buffer overflows
.thenCancel()
.verify();
}


}

Let me summarize what’s done in the above test cases,

Test case 1: testRetrieveUsersBackpressureAndDelay()

  • Mocks a Flux of UserResponse objects with 1000 elements
  • Sets up the UserService mock to return the above-created Flux when retrieveUsers() method is called
  • Calls the controller’s retrieveUsers() method
  • Uses StepVerifier to verify that backpressure and delay are working as expected by requesting the first 5 elements waiting for 100ms, then requesting the next 5 elements and canceling after that.

Test case 2: testRetrieveUsers_BufferOverflow()

  • Given 100 users, sets up the UserService mock to return a Flux of UserResponse objects containing these users.
  • Calls the controller’s retrieveUsers() method
  • Uses StepVerifier to verify that as the buffer size is set to 10 only the first 10 users will be received. After waiting for the buffer to overflow for 1 second, no more users are expected.

Conclusion:

Reactive programming is a powerful programming paradigm that can help you build responsive, resilient, and scalable applications. By using Reactive programming with Spring Boot, you can take advantage of the many benefits it offers while still leveraging the power and flexibility of Spring Boot. So go ahead and give it a try!

Dive into the ocean of engineering knowledge by following the Simform Engineering blog and unlock the latest updates and insights in the world of technology.

Follow us on Twitter | LinkedIn

--

--