R(eactive)Socket: Bootified

Srinivasa Vasu
Jun 28 · 6 min read

Reactive Socket popularly known as RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets, Http/2 and Aeron.

It enables the following interaction models via reactive semantics over a single network connection:

  • request →← response
  • request →←← stream
  • request channel →→←→←
  • fire and forget →!

In this blog, we shall look at how to create a consumer/publisher interaction based on the above four interactive models using Spring Boot WebFlux. Spring Boot abstracts away the complexity of dealing with lower level RSocket APIs with its well know annotation driven declarative programming model.


To get started, head to Spring Initializer page, select 2.2.0 M4 milestone that has support for RSocket to generate consumer and publisher code bits. Follow the below instructions to generate the code bits.

rsocket-server

  • Select the following dependencies and download the project to your local environment,
  • And the maven pom would be like,
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

spring-boot-starter-rsocket will automatically pull in the core RSocket dependencies.

Next, to bootstrap an RSocket server using Spring Boot, mention the server port and transport (optional) in the property file (application.properties/yaml)

spring:
rsocket:
server:
port: 8000

With this Spring Boot auto-configures the RSocket server for us. Start the boot application and you would have the Netty server up and running at port 8000. The output would look similar to this,

Let’s set-up the client consumer service,

rsocket-client

  • Select the following dependencies and download the project to your local environment,
  • And the maven pom would be like,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

Much similar to server side configuration, Spring Boot does the heavy lifting on client side as well, but in this case we should define the client config bean explicitly.

@Bean
RSocketRequester requester(RSocketStrategies strategies) throws URISyntaxException {
return RSocketRequester.builder().rsocketStrategies(strategies)
.rsocketFactory(factory -> {
factory.dataMimeType(MimeTypeUtils.ALL_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY);
})
.connect(TcpClientTransport.create(new InetSocketAddress(
clientConfigProp.getHost(), clientConfigProp.getPort())))
.retry().block();
}

RSocketRequester is something similar to RestTemplate or WebConfig for RSocket. It wraps the underlying communication APIs’ with a user friendly builder APIs. We have defined a client connection factory that communicates over TCP on the defined host and port. By default RSocket copies the payload which is expensive. To disable this, we have defined a payload decoder that works directly on the underlying transport without copying. In a similar way we can define/modify other configurations like MimeType, Retry strategies etc.

Start the boot application and the output would look similar to this,

We shall create a simple ticker generator server side service. This is obviously a well known standard service to easily illustrate the aforementioned interaction models.

Ticker generator service would look something like,

enum EntQuote {

PVTL("PVTL", 11.59),
VMW("VMW", 169.60),
DELL("DELL", 54.99),
GOOGL("GOOGL", 1_064.54),
MSFT("MSFT", 131.58),
AAPL("AAPL", 199.80),
FB("FB", 189.50),
NFLX("NFLX", 370.02),
AMZN("AMZN", 1_904.28);
}

QuoteGenerator() {
stream(EntQuote.values()).forEach(e -> this.prices.add(
new Quote(e.getTicker(), new BigDecimal(e.getPrice(), this.mathContext),
Instant.now(), counter.incrementAndGet())));
this.quoteStream = Flux.interval(Duration.ofSeconds(1))
.flatMap((e) -> Flux.fromIterable(prices.stream().map(baseQuote -> {
BigDecimal priceChange = baseQuote.getPrice().multiply(
new BigDecimal(0.05 * this.random.nextDouble()),
this.mathContext);
return new Quote(baseQuote.getTicker(),
baseQuote.getPrice().add(priceChange), Instant.now(),
counter.incrementAndGet());
}).collect(Collectors.toList()))).share();
}

Publisher in this case is a Flux that emits List<Quote> instances starting with 0 and incrementing at every second by computing random incremental price for the list. It’s a never ending publisher. When the first subscription happens, it starts and keeps emitting the ticker symbols.


Let’s get into the interaction models:

  1. request →← response

This is a well known interaction model for web based workloads. Familiar communication protocol http is based on this type of communication where a client initiates the conversation, requests for something and the server processes it and responds.

Subscriber

On the client side, request processing logic would look something like,

@GetMapping(value = "/v1/quote/{symbol}", produces = MediaType.APPLICATION_JSON_VALUE)
public Publisher<Quote> getAQuote(@PathVariable String symbol) {
return requester.route("a-quote").data(Mono.just(symbol))
.retrieveMono(Quote.class);
}

Request mapping is very similar to WebSocket MessageMapping. Subscriber constructs the data payload holding the symbol which is sent to the publisher by RSocketRequester. The response is a publisher which in this case is a Mono (emitting 0..1). retrieveMono will initiate the request/response communication that’s when the subscription happens and the output event gets emitted.

Publisher

On the server side, MessageMapping would look something like,

@MessageMapping("a-quote")
public Mono<Quote> getAQuote(String symbol) {
return quoteGenerator.getQuote(symbol);
}
public Mono<Quote> getQuote(String symbol) {
return quoteStream.takeWhile(quote -> quote.getTicker().equalsIgnoreCase(symbol))
.take(1).switchIfEmpty(Mono.just(new Quote())).single();
}

It takes only the first element matching the ticker symbol from the quoteStream publisher.

The browser client output would look something like,

/v1/quote/PVTL

2. request →←← stream

Unlike http, this is a prolonged interaction model. Client sends a request and continuously receives the stream of response from the server.

Subscriber

On the client side, request processing logic would look something like,

@GetMapping(value = "/v1/stream/quotes", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Publisher<Quote> getAllQuotesStream() {
return this.requester.route("all-quote-stream").data(Mono.empty())
.retrieveFlux(Quote.class);
}

In this case client requests for all the ticker info and the response would be a stream of events being emitted by the publisher Flux. retrieveFlux would make the framework initiates the request.

Publisher

On the server side, MessageMapping would look something like,

@MessageMapping("all-quote-stream")
public Flux<Quote> getAllQuotesStream() {
return quoteGenerator.getQuoteStream();
}

It returns Flux<Quote> that emits 0..N elements. If you try this from a web browser, response would be a never ending stream, as in this case is a Flux that emits incrementally every second.

/v1/stream/quotes

3. request channel →→←→←

This is a more involved chatty interaction model where the client and server participates in a continuous interaction mode. Elements are being emitted continuously both ways. Client initiates a request, gets the response, does some filtering/processing and then asks for further response by emitting a stream of input elements to the publisher.

Subscriber

On the client side, request processing logic would look something like,

@GetMapping(value = "/v1/stream/quotes/faang", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Publisher<Quote> getFilteredQuotesStream() {
return requester.route("filtered-quote-stream")
.data(Flux.just("FB", "AMZN", "AAPL", "NFLX", "GOOGL"))
.retrieveFlux(Quote.class);
}

Client requests for the filtered ticker info by emitting a Flux of ticker symbols. In this case server subscribes to this client publisher and then responds back which is eventually consumed by the client.

In this model there is not much of a difference between client and server as both of them end up playing the role of a publisher and subscriber during the interaction period.

Publisher

On the server side, MessageMapping would look something like,

@MessageMapping("filtered-quote-stream")
public Flux<Quote> getFilteredQuotesStream(Flux<String> symbol) {
return quoteGenerator.getFilteredQuoteStream(symbol);
}
public Flux<Quote> getFilteredQuoteStream(Flux<String> symbol) {
return symbol.flatMap(this::getQuoteStream).switchIfEmpty(Mono.just(new Quote()));
}

Server subscribes to the input stream, filters the list from quoteStream by matching the incoming symbols and starts emitting continuously the filtered output to the client.

If you try this in a browser client, output would look similar to this,

/v1/stream/quotes/faang

4. fire and forget →!

This is again a familiar interaction model. As the name implies, client sends a request but doesn’t expect a response back.


Spring Boot really abstracts away the RSocket lower level APIs and keeps it extremely simple with its well know annotation driven declarative programming model. References for further reading and the source repo used in this blog are available below.

Links

https://github.com/srinivasa-vasu/rsocket-client.git
https://github.com/srinivasa-vasu/rsocket-server
https://rsocket.io/
https://start.spring.io/

Srinivasa Vasu

Written by

Ever learning bloke @ Pivotal

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade