Implementing Reactive Microservices with Spring Boot and RSocket

Yash Fofdiya
Simform Engineering
7 min readApr 30, 2024

Unlock the potential of scalable, responsive, and resilient distributed systems with Spring Boot and RSocket-powered reactive microservices.

Implementing Reactive Microservices with Spring Boot and RSocket

In the dynamic realm of modern software development, the demand for distributed systems that balance responsiveness and resilience has never been greater. Microservices architecture has become a favored methodology for crafting scalable and easily maintainable applications. Concurrently, reactive programming presents a robust framework for managing asynchronous and event-centric workflows. When these two methodologies converge with the RSocket communication protocol, they herald a new era of adaptability and effectiveness in constructing reactive microservices.

What are reactive microservices?

Reactive microservices emphasize message-driven communication, resilience, elasticity, and responsiveness. This method constructs systems that better manage distributed settings, high concurrency, and fluctuating demands.

Reactive microservices have several important features, such as:

  • Responsiveness: Adaptable to external events and user interactions, they respond quickly and efficiently even with a high volume of requests.
  • Resilience: Techniques like supervision, isolation, and replication are used to make reactive microservices resistant to system flaws and failures. They can smoothly bounce back from errors without degrading system performance as a whole.
  • Elasticity: Dynamically adjust their size to meet variations in demand or workload, ensuring optimal efficiency and enduring unforeseen traffic spikes at peak hours.
  • Message-driven communication: Typically involves message-driven and asynchronous communication between microservices, enhancing system scalability and responsiveness.

What is RSocket?

RSocket is a binary protocol designed for byte stream transports like Web-Sockets and TCP. It enables creating message-driven, reactive systems capable of managing high-throughput, low-latency service-to-service communication.

With features such as multiplexing, reusability, and bidirectional communication, RSocket facilitates the development of modern distributed systems, including real-time data streaming, microservices communication, and Internet of Things applications.

To understand how to use RSocket in microservices with Spring Boot, let’s begin with the project setup

Project Setup

This project creates two Spring Boot services: an employee-client and an employee-server. The employee-client makes requests, while the employee-server provides return values.

First, we must add a few required dependencies to configure RSocket in the POM file and configure the server to host RSocket on a specified port. For this example, let’s designate port 9000 for RSocket. This configuration can be implemented within the application settings.

Employee client application:

  <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
spring:
application:
name: employee-client
rsocket:
port: 9000
server:
port: 8081

Employee server application:

  <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
spring:
application:
name: employee-server
rsocket:
server:
port: 9000
server:
port: 8082

There are several ways to perform interactions between the applications. Before we start, let’s create an employee POJO class to hold the required fields.

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Employee {
private int empId;
private String empName;
private long empSalary;

public Employee(int empId) {
this.empId = empId;
}
}

Request-Response

First, configure the client application through a Java-based configuration and create an endpoint to send the request.

As mentioned in the application settings, we will establish a connection to the RSocket on port 9000. Let’s start with a Java configuration.

Employee client application:

@Configuration
public class RSocketConfig {

@Value("${rsocket.port}")
private int port;

@Bean
public RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
return builder.tcp("localhost", port);
}
}
@RestController
@Slf4j
@RequestMapping("/api/employees")
public class ClientEmployeeController {

@Autowired
private RSocketRequester rSocketRequester;

@GetMapping("/{empId}")
public Publisher<Employee> retrieveEmployeeById(@PathVariable("empId") int empId) {
log.info("Sending request via Request-Response");
return rSocketRequester
.route("employeeDataById")
.data(empId)
.retrieveMono(Employee.class);
}
}

Employee server application:

Create a service to hold employees, and then fetch employees by the requested ID in the controller class.

@Service
public class EmployeeService {

private static final List<Employee> employees = new ArrayList<>();

static {
employees.add(new Employee(101, "Test", 10000));
employees.add(new Employee(102, "Dummy", 5000));
employees.add(new Employee(103, "Example", 50000));
}

public Mono<Employee> retrieveEmployeeById(int empId) {
return Mono.justOrEmpty(employees.stream()
.filter(e -> e.getEmpId() == empId)
.findFirst()
);
}
}
@Controller
@Slf4j
public class EmployeeController {

@Autowired
private EmployeeService employeeService;

@MessageMapping("employeeDataById")
public Mono<Employee> retrieveEmployeeDataById(int empId) {
return employeeService.retrieveEmployeeById(empId);
}
}

@MessageMapping is used to map incoming WebSocket messages to specific handler methods.

To start the interaction between these two applications, hit a request from the browser or Postman, and get an output with the requested data.

Request-Response Result

The client patiently awaits the response while maintaining a non-blocking stance. This is the easiest interaction, as we are all familiar with HTTP.

Fire-and-Forget

In this interaction model, the client dispatches data without anticipating any response. Use the same configuration for this approach and create endpoints in both applications to send and handle requests.

Employee client application:

    @PostMapping
public Publisher<Void> addEmployee(@RequestBody Employee employee) {
log.info("Sending request via Fire And Forget");
return rSocketRequester
.route("newEmployee")
.data(employee)
.send();
}

Employee server application:

    public Employee addEmployee(Employee employee) {
employees.add(employee);
return employee;
}
    @MessageMapping("newEmployee")
public Mono<Void> addEmployee(Employee employee) {
Employee emp = employeeService.addEmployee(employee);
log.info("New Employee with ID : {}, Name: {}, Salary {} ", emp.getEmpId(), emp.getEmpName(), emp.getEmpSalary());
return Mono.empty();
}

In the fire-and-forget interaction method, we’ll see an output on the console as the method does not return anything.

Fire-And-Forget Request
Fire-And-Forget Result

Channel-Stream

This interaction model offers flexibility, allowing both applications to exchange data in a manner that suits their needs. Create endpoints in both applications to send and handle requests.

Employee client application:

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Employee> retrieveEmployees() {
log.info("Sending request via Channel Stream");
return rSocketRequester
.route("allEmployees")
.retrieveFlux(new ParameterizedTypeReference<>() {});
}

Here, we have used MediaType.TEXT_EVENT_STREAM_VALUE, so the output looks nicer, but it is optional.

Employee server application:

    public Flux<Employee> retrieveEmployees() {
return Flux
.fromIterable(employees)
.delayElements(Duration.ofSeconds(2));
}

The server will send a response to the client every two seconds.

    @MessageMapping("allEmployees")
public Flux<Employee> retrieveEmployees() {
return employeeService.retrieveEmployees();
}
Channel-Stream Result

Handling Errors

Create a new endpoint that causes an error.

Employee client application:

    @GetMapping("/error")
public Mono<String> sendRequest() {
return rSocketRequester
.route("handleError")
.data(1001)
.retrieveMono(String.class)
.doOnError(e -> log.error(String.valueOf(e)))
.onErrorResume(e -> Mono.just(e.getMessage()));
}

Employee server application:

    @MessageMapping("handleError")
public Mono<String> handleError(int empId) {
Mono<Employee> employee = employeeService.retrieveEmployeeById(empId);
return employee
.switchIfEmpty(Mono.error(new RuntimeException("Employee not found with id " + empId)))
.flatMap(emp -> Mono.just("Employee found with id " + empId));
}

@MessageExceptionHandler(RuntimeException.class)
public Mono<RuntimeException> exceptionHandler(RuntimeException e) {
log.error(e.getMessage());
return Mono.error(e);
}

@MessageExceptionHandler is used for handling exceptions thrown within message-handling methods.

Handling Error Result

Security

Apply security to the created endpoints using Spring Security and JWT authentication.

Here, we will use simple authentication using Spring Security. Let’s modify the existing applications.

Employee client application:

  <dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-rsocket</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-config</artifactId>
</dependency>

Create an endpoint to pass the authentication metadata to the server:

    @GetMapping("/authenticate")
public Mono<Employee> doAuthentication() {
return rSocketRequester
.route("authenticate")
.metadata(
new UsernamePasswordMetadata("test", "test"),
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString())
)
.data(Mono.empty())
.retrieveMono(Employee.class);
}

without making any modifications to the existing config class.

Exception Message

Modify the RSocket configuration class and add an encoder to support authentication.

    @Bean
public RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
return builder
.rsocketStrategies(s -> s.encoder(new SimpleAuthenticationEncoder()))
.tcp("localhost", port);
}

We have not added any handler endpoints to our service application, so we will get another exception, but we fixed the above exception successfully.

Exception Message

Employee server application:

Add the required dependencies to enable security before creating a new endpoint to handle the client's request.

  <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-messaging</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-rsocket</artifactId>
</dependency>

Now, create a handler method for authentication:

    public Mono<Employee> retrieveEmployeeByName(String username) {
return Mono.justOrEmpty(employees.stream()
.filter(e -> e.getEmpName().equalsIgnoreCase(username))
.findFirst()
);
}
    @MessageMapping("authenticate")
public Mono<Employee> handleAuthentication(@AuthenticationPrincipal Mono<UserDetails> user) {
return user.flatMap(userDetails -> {
String username = userDetails.getUsername();
return employeeService.retrieveEmployeeByName(username);
});
}

@AuthenticationPrincipal is used to access the principal (authenticated user) directly from the method signature of a controller handler method.

Now, let’s try to hit the same endpoint again, but we will get an access denied error as we have not configured the security. So, let’s configure security using a Security Config class.

@Configuration
@EnableRSocketSecurity
@EnableReactiveMethodSecurity
public class SecurityConfig {

@Bean
public MapReactiveUserDetailsService authentication() {
return new MapReactiveUserDetailsService(
User
.builder()
.username("test")
.password("{noop}test")
.roles("ADMIN", "USER")
.build()
);
}

@Bean
public PayloadSocketAcceptorInterceptor payloadSocketAcceptorInterceptor(RSocketSecurity security) {
return security
.authorizePayload(auth ->
auth
.route("authenticate").authenticated()
.anyRequest().permitAll()
.anyExchange().permitAll()
)
.simpleAuthentication(Customizer.withDefaults())
.build();
}

@Bean
public RSocketMessageHandler rSocketMessageHandler(RSocketStrategies strategies) {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.getArgumentResolverConfigurer()
.addCustomResolver(new AuthenticationPrincipalArgumentResolver());
handler.setRSocketStrategies(strategies);
return handler;
}
}

{noop} indicates that the password encoder should be set to NoOpPasswordEncoder, meaning the password is stored in plaintext.

Let’s try again by hitting the request.

Authentication Result

That’s how we can use the RSocket to communicate between different services and interact with each other.

Conclusion

This article explored implementing reactive microservices using Spring Boot and RSocket. We covered project setup, interaction patterns (request-response, fire-and-forget, and channel-stream), error handling, and security considerations. By adhering to these methodologies and leveraging RSocket capabilities, developers can build robust, scalable, and responsive distributed systems.

As technology advances, the amalgamation of reactive programming, microservices architecture, and RSocket communication will continue to shape software development. This synergy empowers developers to create adaptive, efficient, and highly responsive systems to meet evolving digital demands.

For more updates on the latest tools and technologies, follow the Simform Engineering blog.

Follow Us: Twitter | LinkedIn

--

--