RSocket протокол по-русски

Kirill Sereda
26 min readMar 1, 2020

--

Как уже писал в предыдущих постах, большая благодарность Олегу Докука за его материал и подачу, с которой он выступает на конференциях! Именно благодаря ему я заинтересовался и решился на прочтение документации и написание этого поста. И вы не представляете как же давно я хотел это сделать!

Здесь будут сугубо мои мысли и примеры, которые могут отличаться от действительности.

Если вы найдете неточности — напишите мне и я исправлю.

Давайте создавать русское сообщество вместе!

Что такое RSocket ?

Это протокол, обеспечивающий семантику Reactive Streams.

Это двоичный протокол связи точка-точка, разработанный для использования в распределенных приложениях. В этом смысле он предоставляет альтернативу другим протоколам, таким как HTTP.

С RSocket вы можете узнать, когда лучше отправлять запрос, а когда нет. Вы не можете сделать это с HTTP.

RSocket имеет реализации на нескольких языках. Библиотека Java построена на Project Reactor и Reactor Netty для транспорта. Это означает, что сигналы от Reactive Streams Publishers в вашем приложении распространяются через RSocket по сети.

Протокол RSocket использует транспортный протокол более низкого уровня для передачи кадров RSocket .

RSocket позволяет общаться с использованием следующих транспортных протоколов:

RSocket пользуется доверием и поддержкой некоторых крупнейших компаний в Интернете, таких как Netifi, Pivotal, Facebook, Netflix, Alibaba и других.

Почему RSocket ?

Использование микросервисов очень популярно сейчас.

Но ситуация усложняется, если вы решите запустить свои приложения в облаке, где проблемы с сетью, периоды повышенной задержки и большой поток запросов — это то, чего вы не можете полностью избежать. Вместо того, чтобы пытаться исправить проблемы с сетью, лучше сделать вашу архитектуру устойчивой и полностью работоспособной даже при таких условиях.

Давайте посмотрим на пример:

У нас есть много микросервисов, которые общаются друг с другом через HTTP. Мы также используем облачные серверы (AWS, GCP, Azure). Каждый компонент предоставляет простые REST API.

Первая проблема — модель взаимодействия запрос/ответ HTTP . Некоторые шаблоны коммуникации трудно реализовать эффективным способом, используя модель взаимодействия запрос / ответ. Даже выполнение простой операции «fire and forget» имеет побочные эффекты — сервер должен отправить ответ обратно клиенту, даже если клиенту это не нужно.

Вторая проблема — производительность.

Обмен сообщениями на основе RabbitMQ, gRPC или даже HTTP 2 с его поддержкой мультиплексирования будет намного лучше с точки зрения производительности и эффективности, чем простой HTTP 1.x.

Дополнительные ресурсы влекут за собой дополнительные расходы, хотя все, что нам нужно, — это простое сообщение “fire and forget”.

Кроме того, множество различных протоколов может создавать серьезные проблемы, связанные с управлением приложениями, особенно если наша система состоит из сотен микросервисов.

Упомянутые выше проблемы являются основными причинами, по которым RSocket был изобретен. Благодаря своей реактивности и встроенной надежной модели взаимодействия RSocket может применяться в различных бизнес-сценариях.

Разница между gRPC, WebSocket и HTTP

HTTP.1 — это хорошее решение, проверенное временем, но оно не будет работать на производительность и реактивность.

WebSocket слишком сложен для разработки и в тяжелых стресс-тестах не работает так, как ожидалось.

gRPC — несомненно лучше, чем HTTP и WebSocket, он прост в разработке. По словам разработчиков из Google, он работает поверх HTTP.1 (то есть, по HTTP.2) — на самом деле, это не совсем так, то есть это просто оболочка над HTTP. 2. Для каждого действия отправляется обычный запрос POST (HTTP). Поэтому это неэффективное использование ресурсов. Он использует прокси (envoy) между браузером и сервером и отправляет регулярные HTTP-запросы POST к прокси, и прокси уже отправляет HTTP.2 на сервер. С помощью больших стресс-тестов он показал себя не очень хорошо, если, например, у нас нет очень мощного сервера.

gRPC и RSocket пытаются решить разные проблемы. gRPC — это среда RPC, использующая HTTP/2.

gRPC и RSocket находятся на разных уровнях в стеке.

gRPC находится на уровне 7 OSI — уровне RPC, построенном поверх HTTP/2.

RSocket — это уровень OSI 5/6, который моделирует семантику Reactive Streams по сети. Реактивные потоки позволяют моделировать асинхронные потоки с backpressure.

gRPC предназначен для работы с семантикой HTTP/2. Если вы хотите отправить его через другой транспорт, вы должны имитировать семантику HTTP / 2. По всей сети он фактически связан с HTTP/2 и TCP.

RSocket требуется только дуплексное соединение — т.е. то, что может отправлять и получать байты. Это может быть TCP, WebSockets и т. д.

У gRPC есть традиционная модель клиент-сервер, потому что она основана на семантике HTTP/2 и RPC. В gRPC клиент подключается к серверу, но сервер не может совершать вызовы клиенту.

RSocket не имеет взаимодействия клиент-сервер в традиционном смысле HTTP. В RSocket, когда клиент подключается к серверу, они оба могут быть запросчиком и ответчиком. Это означает, что после того, как они подключены, RSocket является полностью дуплексным, и запрашивающая сторона сервера может инициировать вызовы запрашивающей стороне клиента — то есть сервер может совершать вызовы в веб-браузер после подключения.

Просто обалдеть можно :)

Разница между WebSocket и RSocket

Веб-сокеты не обеспечивают backpressure на уровне приложения, только backpressure на уровне байтов на основе TCP. Веб-сокеты также обеспечивают только создание, они не обеспечивают семантику приложения. Разработчик должен разработать протокол приложения для взаимодействия с веб-сокетом.

RSocket обеспечивает кадрирование, семантику приложения, backpressure на уровне приложения, и оно не привязано к конкретному транспорту.

Разница между HTTP и RSocket

RSocket — это протокол приложений для реактивных потоков, который обеспечивает управление потоком приложений по сети, чтобы предотвратить сбои и повысить отказоустойчивость. В отличие от HTTP, RSocket не ожидает ответа или запроса от клиента.

Ryland Degnan объясняет это основное различие между HTTP и RSocket.

“A lot of effort spent in building these distributed systems ends up being workarounds for problems with HTTP. Circuit breakers are a great example of that. It’s working around the problem that HTTP doesn’t have flow control built into it, which means that you have to guess whether the downstream service is available or not, to not cause it to receive too many requests,” said Degnan. “You need to cut off traffic to it if you think that it’s now unavailable, but that involves a lot of configurations. RSocket could remove that problem entirely.”

В отличие от этого, RSocket использует идею асинхронной обработки потока с неблокирующим противодавлением , при котором отказавший компонент будет вместо того, чтобы просто отбрасывать трафик, сообщать о своих нагрузках вышестоящим компонентам, заставляя их снижать нагрузку и позволяя системе «Изящно реагировать на нагрузку, а не разрушаться под ней», согласно глоссарию Reactive Manifesto . Далее Ryland Degnan объяснил, как это применимо к современному миру микросервисов.

“Reactive streams are designed to have vendors and receivers of information that are decoupled from each other. Rather than having the receiver control the flow of information, it’s allowed the sender to asynchronously send data that’s really important,” said Degnan. “In microservices for example, where you have a lot of independent components that need to communicate, what often happens is some services are able to operate at a higher speed than others, or traffic spikes overwhelm parts of the system. Reactive Streams allows you to have receivers say, ‘all right, now I’m ready to receive five more requests’ and then have that message be asked around the system and the flow of information to be regulated.”

Основные понятия

RSocket использует кадрирование.

Кадр RSocket — это отдельное сообщение, которое содержит запрос, ответ или обработку протокола. К кадру RSocket может добавляться 24-битное поле длины кадра (Frame Length), представляющее длину кадра в байтах. Зависит от базового транспортного протокола, используемого RSocket, поле длины кадра может не требоваться.

Кадры RSocket начинаются с заголовка RSocket Frame.

RSocket поддерживает два типа полезных данных: данные и метаданные. Данные и метаданные могут быть закодированы в разных форматах.

В настоящее время в протоколе RSocket имеется 16 типов кадров:

  • SETUP - Настройка подключения. Всегда использует идентификатор потока 0.
  • REQUEST_RESPONSE - Используется в модели запроса-ответа. Запрос одного сообщения.
  • REQUEST_STREAM - Запрашивается поток сообщений.
  • REQUEST_FNF - Используется в модели «fire-and-forget». Отправляет сообщение и не ждет ответа.
  • REQUEST_CHANNEL - Используется в модели канала. Запрашивает поток сообщений в обоих направлениях.
  • REQUEST_N - Запрашивает больше данных и используется для контроля потока.
  • PAYLOAD - Полезная нагрузка сообщения.
  • ERROR - Ошибка на уровне соединения или приложения.
  • CANCEL - Отмена невыполненного запроса.

Принцип работы

Взаимодействие в RSocket разбито на фреймы. Каждый кадр состоит из заголовка кадра, который содержит идентификатор потока, определение типа кадра и другие данные, относящиеся к типу кадра. За заголовком кадра следуют метаданные и полезная нагрузка — эти части несут данные, указанные пользователем.

Клиент отправляет установочный фрейм на сервер в самом начале связи. Этот кадр можно настроить так, чтобы вы могли добавлять свои собственные правила безопасности или другую информацию, требуемую при инициализации соединения. Следует отметить, что RSocket не различает клиента и сервер после фазы настройки соединения. Каждая сторона может начать отправку данных другой — это делает протокол почти полностью симметричным.

Кадры отправляются в виде потока байтов. Это делает RSocket более эффективным, чем обычные текстовые протоколы. С точки зрения разработчика, легче отлаживать систему, когда JSON летают туда-сюда по сети, но влияние на производительность делает такое удобство очень и очень сомнительным. Протокол не навязывает какой-либо конкретный механизм сериализации/десериализации, он рассматривает кадр как пакет битов, которые могут быть преобразованы во что угодно.

Следующим фактором, который оказывает огромное влияние на производительность RSocket, является мультиплексирование. Протокол создает логические потоки (каналы) поверх единственного физического соединения. Каждый поток имеет свой уникальный идентификатор, который в некоторой степени можно интерпретировать как очередь. Такой дизайн имеет дело с основными проблемами, известными из HTTP 1.x — модель соединения на запрос и слабая производительность «конвейерной обработки».

Более того, RSocket изначально поддерживает передачу больших полезных нагрузок. В таком случае кадр полезной нагрузки разделяется на несколько кадров с дополнительным флагом — порядковым номером данного фрагмента.

RSocket использует Reactor, поэтому на уровне API мы в основном работаем с объектами Mono и Flux.

Интеграция с Spring

Maven:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>5.2.3.RELEASE</version>
</dependency>

Gradle:

compile "org.springframework.integration:spring-integration-rsocket:5.2.3.RELEASE"

Spring Boot 2.2 поддерживает установку сервера RSocket через TCP или WebSocket.

Существует также поддержка клиентов и автоматическая настройка для RSocketRequester.Builder и RSocketStrategies.

Характеристики

  • Он бинарный.

Вся отправка данных уже оптимизирована по максимуму. Все что надо, это сконвертировать сообщение в байты и потом деконвертировать.
RSocket все сделает за вас.
Если данные большие, RSocket сделает фрейминг сообщения и удостовериться в том, что оно целое и невредимое придет к получателю.

  • Он является Multiplexed

Он позволяет только одно соединение для всех логических стримов.

  • Bi-directional

Как только мы получили соединение, обе стороны могут как запрашивать данные так и отдавать их.

  • Backpressure

RSocket — это имплементация реактивных стримов поверх Network.
Он предоставляет настоящий backpressure.
Например когда вы говорите ему: дай мне 11 элементов, он конвертирует их в фрейм, отправляет на другую сторону (получателю), декодирует его и нотифицирует продьюсера, сколько элементов ему надо отправить (т.е. больше 11 он не сможет отправить никак!).

  • Возобновление сессии

Управление состоянием прозрачно для приложений и хорошо работает в сочетании с backpressure, которое может по возможности остановить производителя и уменьшить количество требуемого состояния. Возобновление — это возможность возобновить работу в случае сбоя (например, восстановление внезапно закрытого соединения).
Это особенно полезно, так как при отправке кадра RESUME, содержащего информацию о последнем принятом кадре, клиент может возобновить соединение и запрашивать только те данные, которые он еще не получил, избегая ненужной нагрузки на сервер и тратя время на попытки восстановить данные, которые уже были получены.

Его следует использовать везде, где это имеет смысл.

Стратегии использования

  • Request-Response — отправляет сообщение и получает результат
  • Request-Stream — отправляет сообщение и получает обратно поток данных
  • Channel — отправляет потоки сообщений в обоих направлениях
  • Fire-and-Forget — отправляет односторонее сообщение и не ждет ответа

Fire-and-Forget:

Fire-and-forget предназначен для передачи данных от отправителя к получателю, в котором отправитель не заботится о результате операции — он отправил запрос и забыл о нем. Такое упрощенное сообщение может быть полезно при отправке уведомлений на мобильную связь например.

В случае операции request stream запрашивающая сторона отправляет ответчику один кадр и ответчик возвращает обратно поток данных. Вместо отправки периодических запросов можно просто подписаться на поток и реагировать на поступающие данные — они будут поступать автоматически, когда они станут доступны.

Благодаря мультиплексированию и поддержке двунаправленной передачи данных, мы можем сделать очень интересную на мой взгляд вещь, используя метод request channel. RSocket может передавать данные от запрашивающей стороны к ответчику и наоборот, используя одно физическое соединение. Такое взаимодействие может быть полезно, когда запрашивающая сторона например обновляет подписку. Без двунаправленного канала клиент должен был бы отменить поток и повторно запросить его с новыми параметрами.

Основные преимущества

Можем сделать простой CRUD — нет проблем, т.к. есть простой реквест-респонс.
Можем сделать более advanced CRUD — нет проблем. Например мы пушим какие-то метрики на другой сервер и нам все равно, удлятся они или нет, главное нам отправить их в сеть и забыть о них. Там используется Fire and Forget — отправляем сообщение и скорей забываем про него без ожидания окончания процессинга.

Есть стриминг: можем запросить стрим сервера и сервер отправит данные или мы можем сами про помощи Stream-Stream или Request-Channel Communication пушить данные или пушить стрим данных и сервер нам тоже может отвечать (пушить нам какой-то стрим даных).

Поддержка RSocket практически на всех языках.
Можете использовать или RPC архитектуру или Messaging, не важно.
Вы можете использовать RSocket поверх любого транспортного протокола (WebSocket, HTTP.2, TCP и другие).

Aeron UDP — предоставляется из коробки.
Связка RSocket + Aeron = супер мега ультра быстрое решение передачи данных!

Как это работает ?

Рассмотрим 2 этапа:

  • Connecting

Первоначально клиент подключается к серверу с помощью низкоуровневого потокового транспорта, такого как TCP или WebSocket, и отправляет на сервер фрейм SETUP, чтобы установить параметры для соединения.

Сервер может отклонить кадр SETUP, но, как правило, после того, как он отправлен (для клиента) и получен (для сервера), обе стороны могут начать делать запросы.

  • Making Requests

Как только соединение установлено, обе стороны могут инициировать запрос через один из кадров REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL или REQUEST_FNF . Каждый из этих кадров несет одно сообщение от запрашивающей стороны к ответчику.

Ответчик может затем возвращать кадры PAYLOAD с ответными сообщениями, и в случае REQUEST_CHANNEL запрашивающая сторона также может отправлять кадры PAYLOAD с большим количеством сообщений запроса.

Когда запрос включает в себя поток сообщений, таких как Request-Stream и Channel, сервер должен учитывать сигналы запроса от запрашивающей стороны (клиента).

Начальное требование указывается в кадрах REQUEST_STREAM и REQUEST_CHANNEL . Последующее требование сигнализируется через кадры REQUEST_N .

Каждая сторона может также отправлять уведомления метаданных через фрейм METADATA_PUSH , который относится не к какому-либо отдельному запросу, а скорее к соединению в целом.

Стратегии использования RSocket на примерах

Интерфейс RSocket библиотеки RSocket определяет методы связи протокола. Разработчики могут создавать реактивные системы, используя интерфейс RSocket.

public interface RSocket extends Availability, Closeable {

/**
* Fire and Forget interaction model of {@code RSocket}.
*
* @param payload Request payload.
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
* handled, otherwise errors.
*/
Mono<Void> fireAndForget(Payload payload);

/**
* Request-Response interaction model of {@code RSocket}.
*
* @param payload Request payload.
* @return {@code Publisher} containing at most a single {@code Payload} representing the
* response.
*/
Mono<Payload> requestResponse(Payload payload);

/**
* Request-Stream interaction model of {@code RSocket}.
*
* @param payload Request payload.
* @return {@code Publisher} containing the stream of {@code Payload}s representing the response.
*/
Flux<Payload> requestStream(Payload payload);

/**
* Request-Channel interaction model of {@code RSocket}.
*
* @param payloads Stream of request payloads.
* @return Stream of response payloads.
*/
Flux<Payload> requestChannel(Publisher<Payload> payloads);

}

Вы должны создать реализацию, которая реализует этот интерфейс. Унаследуйте абстрактный класс AbstractRSocket, который реализует RSocket для реализации необходимых вам функций.

public abstract class AbstractRSocket implements RSocket {

private final MonoProcessor<Void> onClose = MonoProcessor.create();

@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Fire and forget not implemented."));
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
payload.release();
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.error(new UnsupportedOperationException("Request-Channel not implemented."));
}

}

Давайте теперь рассмотрим каждую стратегию отдельно и в связке со Спрингом.

Fire-and-Forget

Здесь клиент не получит ответа от сервера.

Метод fireAndForget() в основном используется для односторонних push-уведомлений. Тип возвращаемого значения определяется как Mono<Void>.

Возврат Mono<Void> означает, что данные не были получены из метода fireAndForget.

Давайте создадим 2 простых сервиса: сервер и клиент.

Сервер:

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'com.stergioulas.tutorials.springbootrsocket'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}

repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
exclude group: 'junit', module: 'junit'
}
}

test {
useJUnitPlatform()
}

файл настроек выглядит так

spring.rsocket.server.port=7000
spring.main.lazy-initialization=true

И основной класс выглядит очень просто.

Чтобы избежать необходимости реализации каждого метода интерфейса RSocket, API предоставляет абстрактные возможности, при помощиAbstractRSocket который мы можем расширить. Соединяя SocketAcceptor и AbstractRSocket , мы получаем реализацию на стороне сервера, которая в базовом сценарии может выглядеть следующим образом:

@SpringBootApplication
public class ServerRsocketApplication {

public static void main(String[] args) {
SpringApplication.run(ServerRsocketApplication.class, args);

RSocket rSocketImpl = new AbstractRSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
System.out.println(payload.getDataUtf8());
return Mono.empty();
}
};

Disposable server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(rSocketImpl))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.subscribe();

server.dispose();

}

}

Для простоты напишем все в одном классе. Здесь RSocket использует TCP на порту 7000.

Код сервера реализует необходимую функциональность, переопределяя метод findAndForget() , который наследуется от AbstractRSocket .

Модель findAndForget не возвращает данные с сервера. Таким образом, он возвращает Mono<Void>, который объявляет о возврате Mono.empty().

И мы использовали TcpClientTransport для транспорта. RSocket дает вам возможность выбрать либо TCP, либо WebSocket (если вы хотите работать с WebSocket, вы можете использовать WebsocketClientTransport — вместо TcpServerTransport).

receive() — означает, что он получит запрос от клиента.

acceptor - определяет, как обрабатывать запрос, он принимает класс SocketAcceptor. Это место, где мы можем встроить наш обработчик для обработки входящего запроса. AbstractRSocket — это удобный класс, предоставляющий методы для всех интерактивных моделей, поддерживаемых в RSocket. Например, здесь нам нужно обрабатывать модель fire-and-forget только путем переопределения метода requestResponse.

transport - определяет информацию о сервере, включая хост, порт, протокол и т. Д. Здесь мы используем TCP в качестве протокола и запускаем его на локальном хосте на порту 7000.

Клиент:

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'com.stergioulas.tutorials.springbootrsocket'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}

repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
exclude group: 'junit', module: 'junit'
}
testImplementation 'io.projectreactor:reactor-test'
}

test {
useJUnitPlatform()
}

На стороне отправителя использование модели взаимодействия довольно просто, ведь все, что нам нужно сделать, это вызвать конкретный метод для экземпляра RSocket, который мы создали с помощью RSocketFactory.

Основной класс:

@SpringBootApplication
public class ClientRsocketApplication {

public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ClientRsocketApplication.class, args);

RSocket socket =
RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();

socket
.fireAndForget(DefaultPayload.create("Client service is available now"))
.subscribe();

Thread.sleep(3000);
socket.dispose();

}

}

На клиентской стороне мы также используем RSocketFactory, но на этот раз с методом connect() вместо receive().

connect()- указывает на то, что текущий клиент будет подключаться к серверу.

transport - инфа о сервере, на который мы будем подключаться.

Метод start() возвращает RSocket, его можно использовать для взаимодействия со стороной сервера. Здесь мы отправляем запрос типа requestResponse, он требует ответа от сервера.

Наконец, мы можем использовать subcribe() для отображения полученного ответа от сервера.

Давайте запустим приложение.

Клиент отправил сообщение «Client-service is available now» модели findAndForget и сервер вывел полученное сообщение в консоль. Это очень простой пример.

Например, это может быть полезно, когда один из наших сервисов вышел из строя (сбой сети или большая нагрузка), а затем поднялся и отправил так называемое push-уведомление другому сервису о том, что он жив.

Запустите сервер, а затем клиента. В консоли сервера вы можете увидеть сообщение от клиента.

Исходный код примера можно найти на GitHub.

Request-Channel

Благодаря мультиплексированию и поддержке двунаправленной передачи данных, мы можем сделать очень интересную на мой взгляд вещь, используя метод request channel. RSocket может передавать данные от запрашивающей стороны к ответчику и наоборот, используя одно физическое соединение. Такое взаимодействие может быть полезно, когда запрашивающая сторона например обновляет подписку. Без двунаправленного канала клиент должен был бы отменить поток и повторно запросить его с новыми параметрами.

Давайте создадим 2 простых сервиса: сервер и клиент.

Сервер:

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'com.stergioulas.tutorials.springbootrsocket'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}

repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
exclude group: 'junit', module: 'junit'
}
}

test {
useJUnitPlatform()
}

файл настроек выглядит так

spring.rsocket.server.port=7000
spring.main.lazy-initialization=true

Основной класс

@SpringBootApplication
public class ServerRsocketApplication {

public static void main(String[] args) {
SpringApplication.run(ServerRsocketApplication.class, args);

RSocket rSocketImpl = new AbstractRSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).flatMap(payload ->
Flux.fromStream(
payload.getDataUtf8().codePoints()
.mapToObj(c -> String.valueOf((char) c))
.map(i -> DefaultPayload.create("channel: " + i))))
.doOnNext(System.out::println);
}
};

Disposable server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(rSocketImpl))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.subscribe();

server.dispose();

}

}

receive() — означает, что он получит запрос от клиента.

acceptor — определяет, как обрабатывать запрос, он принимает класс SocketAcceptor. Это место, где мы можем встроить наш обработчик для обработки входящего запроса. AbstractRSocket — это удобный класс, предоставляющий методы для всех интерактивных моделей, поддерживаемых в RSocket. Например, здесь нам нужно обрабатывать модель request-channel только путем переопределения метода requestResponse.

transport — определяет информацию о сервере, включая хост, порт, протокол и т. Д. Здесь мы используем TCP в качестве протокола и запускаем его на локальном хосте на порту 7000.

Клиент:

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'group = 'com.stergioulas.tutorials.springbootrsocket'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
exclude group: 'junit', module: 'junit'
}
testImplementation 'io.projectreactor:reactor-test'
}
test {
useJUnitPlatform()
}

На стороне отправителя использование модели взаимодействия довольно просто, ведь все, что нам нужно сделать, это вызвать конкретный метод для экземпляра RSocket, который мы создали с помощью RSocketFactory.

Основной класс:

@SpringBootApplication
public class ClientRsocketApplication {

public static void main(String[] args) {
SpringApplication.run(ClientRsocketApplication.class, args);

RSocket socket =
RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();

socket.requestChannel(Flux.just("one", "two")
.map(DefaultPayload::create))
.delayElements(Duration.ofMillis(1000))
.map(Payload::getDataUtf8)
.doOnNext(System.out::println)
.blockLast();

}

}

На клиентской стороне мы также используем RSocketFactory, но на этот раз с методом connect() вместо receive().

connect()- указывает на то, что текущий клиент будет подключаться к серверу.

transport — инфа о сервере, на который мы будем подключаться.

Метод start() возвращает RSocket, его можно использовать для взаимодействия со стороной сервера. Здесь мы отправляем запрос типа requestResponse, он требует ответа от сервера.

Наконец, мы можем использовать subcribe() для отображения полученного ответа от сервера.

Клиент отправил «one», «two» сообщения в модель requestChannel, а затем сервер создает канал для клиента (с задержкой в ​​1 секунду) и затем сервер вывел в консоль полученное сообщение.

Запустите сервер, а затем клиент.

Сервер:

Клиент:

Исходный код этого примера можно найти на GitHub.

Request-Response

RSocket также может имитировать поведение HTTP. Он поддерживает семантику запрос-ответ, и, вероятно, это будет основной тип взаимодействия, которое вы собираетесь использовать с RSocket.

В контексте потоков такая операция может быть представлена ​​в виде потока, который состоит из одного объекта. В этом сценарии клиент ожидает ответный кадр, но делает это полностью неблокирующим образом.

Fire-and-Forgot вернул Mono<Void>, но метод RequestResponse возвращает Mono<Payload>, так как клиент должен доставить один объект и получить ответ от сервера. Однако мы не используем Flux, потому что это только один из ответов на наш запрос от клиента.

Давайте создадим 2 простых сервиса: сервер и клиент.

Сервер:

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'group = 'com.stergioulas.tutorials.springbootrsocket'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
exclude group: 'junit', module: 'junit'
}
}
test {
useJUnitPlatform()
}

файл настроек выглядит так

spring.rsocket.server.port=7000
spring.main.lazy-initialization=true

Основной класс

@SpringBootApplication
public class ServerRsocketApplication {

public static void main(String[] args) {
SpringApplication.run(ServerRsocketApplication.class, args);

RSocket rSocketImpl = new AbstractRSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
System.out.println(payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Nice to meet you"));
}
};

Disposable server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(rSocketImpl))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.subscribe();

server.dispose();

}

}

В приведенном выше коде RSocketFactory.receive() создает объект ServerRSocketFactory для построения сервера RSocket.

ServerRSocketFactory.acceptor() устанавливает объект SocketAcceptor для приема соединений.

Объект AbstractRSocket реализует только метод requestResponse() для обработки модели запрос-ответ. Тип возврата Mono<Payload> означает, что в качестве ответа ожидается не более одного объекта Payload.

Для каждого полученного запроса ответом будет полезная нагрузка запроса с ECHO >> в качестве префикса.

Метод DefaultPayload.create() — это простой способ создания объектов Payload.

ServerRSocketFactory.transport() устанавливает транспортный уровень, используемый RSocket. Здесь TcpServerTransport используется для TCP на порту 7000.

Клиент:

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'group = 'com.stergioulas.tutorials.springbootrsocket'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
exclude group: 'junit', module: 'junit'
}
testImplementation 'io.projectreactor:reactor-test'
}
test {
useJUnitPlatform()
}

На стороне отправителя использование модели взаимодействия довольно просто, ведь все, что нам нужно сделать, это вызвать конкретный метод для экземпляра RSocket, который мы создали с помощью RSocketFactory.

Основной класс:

@SpringBootApplication
public class ClientRsocketApplication {

public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ClientRsocketApplication.class, args);

RSocket socket =
RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();

assert socket != null;

socket.requestResponse(DefaultPayload.create("Hello!"))
.map(Payload::getDataUtf8)
.doOnNext(System.out::println)
.block();

socket.dispose();

}

}

На клиентской стороне мы также используем RSocketFactory, но на этот раз с методом connect() вместо receive().

connect()- указывает на то, что текущий клиент будет подключаться к серверу.

transport — инфа о сервере, на который мы будем подключаться.

Метод start() возвращает RSocket, его можно использовать для взаимодействия со стороной сервера. Здесь мы отправляем запрос типа requestResponse, он требует ответа от сервера.

Наконец, мы можем использовать subcribe() для отображения полученного ответа от сервера.

Клиент отправил «Hello!» сообщение для модели запрос-ответ. Сервер вывел в консоль полученное сообщение и отправил клиенту сообщение «Nice to meet you».

Сервер:

Клиент:

Исходный код этого примера можно найти на GitHub.

Request-Stream

В случае операции request-stream запрашивающая сторона отправляет ответчику один кадр и возвращает поток данных. Вместо отправки периодических запросов сереру (клиенту) можно подписаться на поток и реагировать на поступающие данные — они будут поступать автоматически, когда они станут доступны.

Давайте создадим 2 простых сервиса: сервер и клиент.

Сервер:

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'group = 'com.stergioulas.tutorials.springbootrsocket'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
exclude group: 'junit', module: 'junit'
}
}
test {
useJUnitPlatform()
}

файл настроек выглядит так

spring.rsocket.server.port=7000
spring.main.lazy-initialization=true

Основной класс

@SpringBootApplication
public class ServerRsocketApplication {

public static void main(String[] args) {
SpringApplication.run(ServerRsocketApplication.class, args);

RSocket rSocketImpl = new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
System.out.println(payload.getDataUtf8());
return Flux.range(1, 5)
.map(i -> DefaultPayload.create("onNext-" + i));
}
};

Disposable server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(rSocketImpl))
.transport(TcpServerTransport.create("localhost", 7001))
.start()
.subscribe();

server.dispose();

}

}

Для простоты напишем все в одном классе. Здесь RSocket использует TCP на порту 7000.

Код сервера реализует необходимую функциональность, переопределяя метод request-stream() , который наследуется от AbstractRSocket .

Модель findAndForget не возвращает данные с сервера. Таким образом, он возвращает Mono<Void>, который объявляет о возврате Mono.empty().

И мы использовали TcpClientTransport для транспорта. RSocket дает вам возможность выбрать либо TCP, либо WebSocket (если вы хотите работать с WebSocket, вы можете использовать WebsocketClientTransport — вместо TcpServerTransport).

receive() — означает, что он получит запрос от клиента.

acceptor — определяет, как обрабатывать запрос, он принимает класс SocketAcceptor. Это место, где мы можем встроить наш обработчик для обработки входящего запроса. AbstractRSocket — это удобный класс, предоставляющий методы для всех интерактивных моделей, поддерживаемых в RSocket. Например, здесь нам нужно обрабатывать модель request-stream только путем переопределения метода requestResponse.

transport — определяет информацию о сервере, включая хост, порт, протокол и т. Д. Здесь мы используем TCP в качестве протокола и запускаем его на локальном хосте на порту 7000.

Клиент:

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'group = 'com.stergioulas.tutorials.springbootrsocket'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
exclude group: 'junit', module: 'junit'
}
testImplementation 'io.projectreactor:reactor-test'
}
test {
useJUnitPlatform()
}

На стороне отправителя использование модели взаимодействия довольно просто, ведь все, что нам нужно сделать, это вызвать конкретный метод для экземпляра RSocket, который мы создали с помощью RSocketFactory.

Основной класс:

@SpringBootApplication
public class ClientRsocketApplication {

public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ClientRsocketApplication.class, args);

RSocket socket =
RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 7001))
.start()
.block();

socket.requestStream(DefaultPayload.create("request-stream example!"))
.delayElements(Duration.ofMillis(1000))
.subscribe(
payload -> System.out.println(payload.getDataUtf8()),
e -> System.out.println("error" + e.toString()),
() -> System.out.println("completed")
);

Thread.sleep(3000);
socket.dispose();

}

}

На клиентской стороне мы также используем RSocketFactory, но на этот раз с методом connect() вместо receive().

connect()- указывает на то, что текущий клиент будет подключаться к серверу.

transport — инфа о сервере, на который мы будем подключаться.

Метод start() возвращает RSocket, его можно использовать для взаимодействия со стороной сервера. Здесь мы отправляем запрос типа requestResponse, он требует ответа от сервера.

И в конце мы можем использовать subcribe() для отображения полученного ответа от сервера.

Запустите вначале сервер а затем клиент.

Клиент отправил «request-stream example!» сообщение для модели requestStream на сервер, а затем сервер создал поток для клиента (с задержкой в ​​1 секунду) и вывел в консоль следующее:

Сервер:

Клиент:

Исходный код этого примера можно найти на GitHub.

RSocket + Spring

Давайте теперь попробуем использовать Спринг и создадим простецкую приложеньку.

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

configurations {
compileOnly {
extendsFrom annotationProcessor
}
}

repositories {
mavenCentral()
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'io.rsocket:rsocket-core:0.12.1'
implementation 'io.rsocket:rsocket-transport-netty:0.12.1'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}

test {
useJUnitPlatform()
}

Сервер:

Конфигурация

@Configuration
public class RSocketConfig {
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketService()))
.transport(TcpServerTransport.create(8000))
.start()
.block()
.onClose()
.block();
}
}

Сервис, который запускает некоторую логику

@Slf4j
@Component
public class RSocketService extends AbstractRSocket {

@Override
public Mono<Void> fireAndForget(Payload payload) {
System.out.println("fire-and-forget: server received");
return Mono.empty();
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
log.info(payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Connection successful"));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
log.info(payload.getDataUtf8());
return Flux.range(1, 5)
.map(i -> DefaultPayload.create("request-stream: " + i));
// return Flux.just(
// DefaultPayload.create("request-stream-1"),
// DefaultPayload.create("request-stream-2"),
// DefaultPayload.create("request-stream-3"),
// DefaultPayload.create("request-stream-4"));
}

@Override
public Flux<Payload>requestChannel(Publisher<Payload>payloads) {
return Flux.from(payloads).map(Payload::getDataUtf8)
.doOnNext(str ->log.info("Received: " + str))
.map(DefaultPayload::create);

}
}

Теперь сделаем Клиент:

Конфигурация

@Configuration
public class RSocketConfig {

@Bean
public RSocket rSocket(){
return RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", 8000))
.start()
.block();
}
}

Контроллер

@Slf4j
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class RSocketController {

private final RSocket rSocket;

@GetMapping("/fire-and-forget")
public Mono<Void> fireAndForget() {
rSocket.fireAndForget(DefaultPayload.create("fire-and-forget!"))
.subscribe(System.out::println);

return Mono.empty();
}

@GetMapping("/request-response")
public Mono<Payload> getRequestResponse() {
return rSocket.requestResponse(DefaultPayload.create("request-response!"))
.doOnNext(System.out::println);
}

@GetMapping("/request-stream")
public Disposable getRequestStream() {
// return rSocket.requestStream(DefaultPayload.create("request-stream!"));
return rSocket.requestStream(DefaultPayload.create("request-stream!"))
.delayElements(Duration.ofMillis(1000))
.subscribe(
payload -> System.out.println(payload.getDataUtf8()),
e -> System.out.println("Error: " + e.toString()),
() -> System.out.println("Completed")
);

}

@GetMapping("/channel")
public Flux<String> getChannel() {
return rSocket.requestChannel(Flux.interval(Duration.ofSeconds(2)).map(l -> DefaultPayload.create("ping ")))
.map(Payload::getDataUtf8)
.doOnNext(string ->log.info("Received: " + string))
.take(20);
}

}

Давайте запустим сервер, а затем клиента и попробуем вызвать все ендпоинты по очереди.

  • Fire-And-Forget

перейдем по урлу

localhost:8080/api/fire-and-forget

И увидим сообщение на сервере

  • Request-Response

перейдем по урлу

localhost:8080/api/request-response

и увидим ответ на сервере “Connection successful”

в консоли сервера

в консоли клиента

  • Request-Stream

перейдем по урлу

localhost:8080/api/request-stream

Клиент отправит запрос, и сервер начнет отправлять ответ в течение неограниченного времени (но мы ограничились всего 5 элементами для наглядности).

Метод requestStream возвращает поток Flux.

Сервер:

Клиент:

  • Channel

перейдем по урлу

localhost:8080/api/channel

а в браузере в реальном времени мы увидим, как сообщения поступают в течение 1 секунды (мы эмулировали эту ситуацию как канал связи между сервисами)

Сервер

Клиент

Канал обеспечивает двунаправленную связь, сообщения будут непрерывно передаваться от потребителя к издателю, а затем от издателя к потребителю.

Исходный код примера можно взять на GitHub.

Давайте сделаем еще один пример.

Давайте создадим 2 простых приложения: client-service будет запрашивать данные из movie-service:

application.properties

spring.rsocket.server.port=7000
spring.data.mongodb.uri=mongodb://localhost:27017/moviedb

Запускаем MongoDB в докере на стандартном порту 27017.

Основной класс будет иметь следующую форму (здесь мы запустим 6 фильмов в нашей базе данных при запуске приложения)

@SpringBootApplication
public class MovieServiceApplication {

public static void main(String[] args) {
SpringApplication.run(MovieServiceApplication.class, args);
}

@Bean
CommandLineRunner run(MovieRepository movieRepository) {
return args -> {
movieRepository.deleteAll()
.thenMany(Flux.just(
new Movie("1", "Lion King", "130"),
new Movie("2", "Saw", "200"),
new Movie("3", "Home Alone", "150"),
new Movie("4", "Home Alone 2", "180"),
new Movie("5", "Interstellar", "300"),
new Movie("6", "Prometheus", "80")
)
.flatMap(movieRepository::save))
.thenMany(movieRepository.findAll())
.subscribe(System.out::println);

};
}

}

Репозиторий

@Repository
public interface MovieRepository extends ReactiveMongoRepository<Movie, String> {
}

Если вы посмотрите на интерфейс ReactiveMongoRepository, то увидите, что нам возвращаются объекты, заключенные в классы Mono и Flux. Это означает, что при обращении в базу данных мы не сразу получим результат. Вместо этого мы получаем поток данных Publisher, из которого можно получить данные, как только они будут готовы, или, скорее, он передаст их нам, как только они будут готовы.

Модель

@Data
@Builder
@AllArgsConstructor
@Document(collection = "movies")
public class Movie {

@Id
private String id;

private String name;
private String price;

}

и

public class RequestMovie {

private String name;

public RequestMovie() {
}

public RequestMovie(String name){
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

Контроллер

@RestController
@RequestMapping("/")
public class MovieController {

private MovieRepository movieRepository;

public MovieController(MovieRepository movieRepository) {
this.movieRepository = movieRepository;
}

@MessageMapping("request-response")
Mono<Movie> getMovieByName(Movie movie) {
return movieRepository.findById(movie.getId());
}

@MessageMapping("request-stream")
Flux<Movie> getAllMovies() {
return movieRepository.findAll();
}

@MessageMapping("fire-forget")
Mono<Void> addMovie(Movie movie) {
movieRepository.save(movie);
return Mono.empty();
}

}

Сервер RSocket, связанный с Spring Boot, может настроить конечную точку с помощью аннотации @MessageMapping. Другими словами, нет необходимости наследовать и реализовывать класс AbstractRSocket. Однако тип возврата должен быть четко определен в соответствии с моделью связи RSocket! Это важно!

Если вы используете метод Request-Response, аналогичный HTTP, данные ответа, которые должны быть доставлены, должны быть единым объектом. То есть тип возвращаемого значения должен быть Mono<Object>. Если вы используете метод Request-Stream, данные ответа должны быть Flux, потому что данные ответа — это один или несколько потоковых данных.

request-response: Сервер использует аннотацию @MessageMapping для настройки конечной точки. Тип возвращаемого значения должен быть Mono<Movie>. Клиент использует внедрение зависимостей RSocketRequester.

request-stream: поскольку вам необходимо передать более одного потока данных, вы должны использовать Flux.

fire-and-forget: сервер только обрабатывает запросы, полученные от клиентов, и не отвечает на них. Так что вам надо просто вернуть Mono.empty().

Client-service

Модель идентична серверу.

Создадим конфигурацию для RSocket.

@Slf4j
@Configuration
public class RSocketConfiguration {

@Bean
RSocketRequester rSocketRequester(RSocketStrategies strategies) {
InetSocketAddress address = new InetSocketAddress("localhost", 7000);

return RSocketRequester.builder()
.rsocketFactory(factory -> factory
.dataMimeType(MimeTypeUtils.ALL_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY))
.rsocketStrategies(strategies)
.connect(TcpClientTransport.create(address))
.retry()
.block();
}
}

Мы также можем зарегистрировать RSocketRequester здесь

@Bean
public Mono<RSocketRequester> rSocketRequester(Mono<RSocket> rSocket, RSocketStrategies strategies) {
return rSocket
.map(socket -> RSocketRequester.wrap(socket, MimeTypeUtils.parseMimeType("application/cbor"), strategies))
.cache();
}

но для простоты мы будем использовать его стандартную реализацию, предоставленную из коробки.

Bean-компонент RSocketRequester должен быть определен с использованием метода компоновщика или RSocketRequester.

Метод RSocketRequester.create, используемый в ранних версиях Spring Boot 2.2.0.M2, куда-то исчез. Возможно, было внесено изменение во время разработки :(

Контроллер

@RestController
@RequiredArgsConstructor
@RequestMapping("/api")
public class ClientController {

private final RSocketRequester requester;

@GetMapping("/movie/{id}")
Mono<Movie> findMovieById(@PathVariable String id) {
return this.requester
.route("request-response")
// .data(DefaultPayload.create(""))
.data(new RequestMovie(id))
.retrieveMono(Movie.class);
}

@GetMapping("/showAllMovies")
Flux<Movie> findAllMovies() {
return this.requester
.route("request-stream")
.retrieveFlux(Movie.class);
}

@PostMapping("/addMovie/{id}/{name}/{price}")
Mono<Void> addMovie(@PathVariable String id,
@PathVariable String name,
@PathVariable String price) {
return this.requester
.route("fire-forget")
.data(new Movie(id, name, price))
.send();
}
}

findMovieById: здесь вы должны объявить маршрут, определенный @MessageMapping на сервере RSocket в .route().

Клиент передает идентификатор фильма и возвращенные данные преобразуются в Mono<Movie> с использованием метода retrieveMono().

showAllMovies: Клиенту просто нужно получить данные с помощью FLux.

addMovie: Метод send() возвращает Mono<Void>.

Вот такие манипуляции получились :)

Исходный код примера можно найти на GitHub.

Выводы

Разработка такая же как и gRPC.
Высокая производительность.
Для тех кто использует реактивный подход — это отличное решение.
Также эффективное использование ресурсов — одно соединение между клиентом и сервером для создания логического стриминга.

Из существенных минусов можно выделить единственный: очень маленькое community.

Любые вопросы по протоколу можно задать здесь.

Мне кажется что RSocket — это действительно будущее в организации микросервисной архитектуры, кто бы не говорил что это не так. Вот посмотрите, лет через 5–10 сложно будет построить масштабную систему без использования реактивного подхода и вспомните мои слова :)

Спасибо всем кто дочитал до конца! Спасибо Олегу!

Я планирую продолжать дальше более углубленно знакомиться с протоколом и писать об этом в постах и также буду рад за помощь. Давайте сделаем это вместе!

Если вы нашли неточности в описании данной статьи, вы можете написать мне на email и я с радостью вам отвечу.

Kirill Sereda

email: kirill.serada@gmail.com

skype: kirill-sereda

linkedin: www.linkedin.com/in/ksereda

--

--