Reactive Programming: Reactor и Spring WebFlux — часть 1

Kirill Sereda
20 min readSep 8, 2019

--

Я очень запарился по теме реактивного программирования, что наконец решился написать эту статью :)
Очень надеюсь она будет полезна для кого-то.

Когда я начал разбираться с этой темой у меня было много вопросов на этот счет: что это такое, зачем ?
Прочитав некоторые статьи на эту тему я не получил полноту знаний, чтобы просто понять, что такое реактивная система.
Я решил углубиться в данную тему: перечитал все возможные туториалы и статьи на этот счет и прочел официальную документацию.
Сейчас попробую рассказать своими (и не только) словами, как же я понимаю, что такое реактивная система.
Я постарался изложить максимально детально некоторые очевидные вещи, но по ходу статьи можно будет найти много чего интересного.

Надеюсь после это статьи у читателя не останется лишних вопросов.

Большую часть материала из этой статьи можно найти на просторах интернета и в официальной документации.
Я постарался максимально сжато и в то же время подробно остановиться на главных и ключевых моментах.
Но это вовсе не значит, что можно остановиться на прочтении данной статьи.
Впереди еще много всего интересного, что касается реактивщины :)

Кто еще такой этот Reactor ?

Project Reactor — это библиотека Java 8, которая реализует модель реактивного программирования. Он построен на основе спецификации реактивных потоков , стандарта для создания реактивных приложений.

Reactor — это полностью неблокирующая основа реактивного программирования для JVM с эффективным управлением требованиями (в форме управления «противодавлением»). Он напрямую интегрируется с функциональными API-интерфейсами Java 8, в частности, Completable Future , Stream и Duration . Он предлагает составные API-интерфейсы асинхронной последовательности — Flux (для элементов [N]) и Mono (для элементов [1]) — и широко реализует спецификацию Reactive Streams.

Reactor — это реализация парадигмы реактивного программирования, которую можно описать следующим образом:

Wiki:

Реактивное программирование — это парадигма асинхронного программирования, связанная с потоками данных и распространением изменений. Это означает, что становится возможным легко выражать статические (например, массивы) или динамические (например, излучатели событий) потоки данных через используемый язык (языки) программирования.

Ядро реактора работает на Java 8 и выше.

Все примеры и реализации Mono и Flux, касающиеся Reactor и WebFlux, которые мы рассматриваем, будут одинаковыми. Далее мы разберемся со всем в контексте WebFlux, который построен на Reactor.

Деталь проектов:

  • Ядро Reactor — Реактивные основы для приложений и сред, а также реактивные расширения, основанные на API с типами Mono (1 элемент) и Flux (n элементов).
  • Reactor Netty — предлагает неблокирующие и готовые к backpressure TCP/HTTP/UDP клиенты и серверы на основе Netty фреймворка.
  • Reactor Addons — Мост к RxJava 2 Observable, Completable, Flowable, Single, Maybe, Scheduler, а также SWT Scheduler, Akka Scheduler и так далее.

Что такое реактивные типы и зачем их использовать ?

Реактивные типы не предназначены для обработки запросов или данных быстрее.

Их сила заключается в их способности одновременно обслуживать больше запросов и более эффективно обрабатывать операции с задержкой, такие как запрос данных с удаленного сервера.

Они позволяют обеспечить лучшее качество обслуживания и предсказуемое планирование пропускной способности, изначально имея дело со временем и задержкой, не затрачивая больше ресурсов.

В отличие от традиционной обработки, которая блокирует текущий поток во время ожидания результата, Reactive API запрашивает только тот объем данных, который он способен обработать и предоставляет новые возможности, поскольку он имеет дело с потоком данных, а не с отдельными элементами (объектами).

Следуя документации Spring WebFlux, Spring Framework использует Reactor для своей собственной реактивной поддержки.
Сейчас мы более детально во всем разберемся.

Reactor — это реализация Reactive Streams, которая дополнительно расширяет базовый контракт Reactive Streams Publisher с типами API-интерфейсов, которые можно комбинировать с Flux и Mono.

Reactive Streams (Реактивные Потоки)

Reactive Streams (Реактивные Потоки) состоят из 4-х простых Java — интерфейсов (Publisher, Subscriber, Subscription и Processor).

 public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}

public static interface Subscription {
public void request(long n);
public void cancel();
}

public static interface Processor
<T,R> extends Subscriber<T>,
Publisher<R> {
}

К ним всем выдвигаются примерно следующие требования:

ASYNC — асинхронность
NIO — “неблокируемость” ввода/вывода
RESPECT BACKPRESSURE — умение обрабатывать ситуации, когда данные появляются быстрее, чем потребляются (в синхронном, императивном коде подобная ситуация не возникает, но в реактивных системах такое часто встречается).

Различия с Java 8

Давайте немного вспомним.

До java 8 были Future и Callbacks.

Concurrency API ввел понятие сервиса-исполнителя (ExecutorService). Исполнители выполняют задачи асинхронно и обычно используют пул потоков, так что нам не надо создавать их вручную.

Кроме Runnable, исполнители могут принимать другой вид задач, который называется Callable. Но как тогда получить результат, который они возвращают? Поскольку метод submit() не ждет завершения задачи, исполнитель не может вернуть результат задачи напрямую. Вместо этого исполнитель возвращает специальный объект Future, у которого мы сможем запросить результат задачи.

Callback методы являются void (ничего не возвращают) и принимают дополнительный параметр, который вызывается после определенного события.

Интерфейс Future описывает API для работы с задачами, результат которых мы планируем получить в будущем. Для Future нас интересует его реализация java.util.concurrent.FutureTask. То есть это Task, который будет выполнен во Future. Чем эта реализация ещё интересна, так это тем, что она реализует и Runnable. Можно считать это своего рода адаптером старой модели работы с задачами в потоках и новой модели (появилась в java 5).

ExecutorService executor = ...;
Future f = executor.submit(...);
f.get();

Но дело в том, что Future является асинхронным, но блокирует текущий поток до тех пор, пока не будет завершено вычисление при попытке получить результат с помощью метода get ().

Future API был хорошим шагом на пути к асинхронному программированию, но ему не хватало некоторых важных и полезных функций.

Некоторые минусы:

  • Future нельзя завершить вручную.

Допустим, у вас есть метод для получения свободных номеров в гостинице из удалённого API. Поскольку этот вызов API занимает много времени, вы запускаете его в отдельном потоке и возвращаете Future.

Теперь предположим, что удалённый сервис перестал работать и вы хотите завершить Future вручную, передав актуальную цену продукта из кэша. К сожалению вы не сможете этого сделать.

  • Нельзя выполнять дальнейшие действия над результатом Future без блокирования.

Также в Future нельзя повесить функцию-колбэк, чтобы она срабатывала автоматически, как только станет доступен результат.

  • Невозможно выполнить множество Future один за другим.

Такой алгоритм асинхронной работы невозможен при использовании Future.

  • Невозможно объединить несколько Future.

Именно по этому Spring Framework 4 представил ListenableFuture — это Future реализация, которая добавляет неблокирующие возможности на основе обратного вызова.

У Guava есть интерфейс ListenableFuture, который является будущим, к которому вы можете прикрепить обратный вызов, который будет вызываться, когда будет доступен результат, так что вам не придется вызывать get() и создавать блок потока, пока результат не станет доступен.

С ListenableFuture вы можете зарегистрировать callback так:

ListenableFuture listenable = service.submit(...);
Futures.addCallback(listenable, new FutureCallback<Object>() {
@Override
public void onSuccess(Object o) {
//handle on success
}

@Override
public void onFailure(Throwable throwable) {
//handle on failure
}
})

Технически ListenableFuture расширяет интерфейс Future, добавляя простые:

void addListener (Runnable listener, Executor executor);

Потом в java 8 появились CompletableFuture и лямбды и облегчили жизнь многим разработчикам.
CompletableFuture позволяет иметь дело с будущим неблокирующим образом обеспечивая возможности для цепочки отложенной обработки результатов.

CompletableFuture используется для асинхронного программирования в Java. Асинхронное программирование — это средство написания неблокирующего кода путём выполнения задачи в отдельном, отличном от главного, потоке, а также уведомление главного потока о ходе выполнения, завершении или сбое.

Таким образом, основной поток не блокируется и не ждёт завершения задачи, а значит может параллельно выполнять и другие задания.

CompletableFuture реализует интерфейс Future, то есть наши task будут выполнены в будущем, и мы сможем выполнить get() и получить результат. Но ещё он реализует CompletionStage.

CompletableFuture запускает цепочку на выполнение сразу, не дожидаясь того, что у него попросят посчитанное значение, в отличие от Stream API, где при создании стрима он не запускается сразу, а ждёт, когда из него захотят значение.

Надо помнить, что CompletalbeFuture в своей работе использует Runnable, Consumer и Function.

Класс CompletableFuture является строго реализацией CompletionStage, имеющей дело с асинхронными вычислениями, которые завершатся и предоставят значение в некоторое неуказанное время.

С CompletableFuture вы также можете зарегистрировать callback, когда задача завершена, но она отличается от ListenableFuture тем, что она может быть завершена из любого потока, который хочет ее выполнить:

CompletableFuture completableFuture = new CompletableFuture();
completableFuture.whenComplete(new BiConsumer() {
@Override
public void accept(Object o, Object o2) {
//handle complete
}
}); // complete the task
completableFuture.complete(new Object())

Для того, чтобы асинхронно выполнить некоторую фоновую задачу, которая не возвращает, результат, можно использовать метод CompletableFuture.runAsync(). Он принимает объект Runnable и возвращает CompletableFuture<Void>.

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("Work is done in a separate thread");
});

Для выполнения асинхронной задачи и возврата результата стоит использовать CompletableFuture.supplyAsync(). Он принимает Supplier<T> и возвращает CompletableFuture<T>, где T это тип возвращаемого функцией-поставщиком значения:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of an Asynchronous Task";
});
String result = future.get();
System.out.println(result);

runAsync() и supplyAsync() выполняются в отдельном потоке.

CompletableFuture выполняет эти задачи в потоке, полученном из глобального ForkJoinPool.commonPool().

Вы можете повесить callback на CompletableFuture, используя методы thenApply(), thenAccept() и thenRun().

  • thenApply() служит для обработки и преобразования результата CompletableFuture при его поступлении. В качестве аргумента он принимает Function<T, R>.
CompletableFuture<String> future= CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Hello";
});

CompletableFuture<String> greetingFuture = future.thenApply(name -> {
return name + "World!";
});


System.out.println(greetingFuture.get()); // Hello, World!
  • thenAccept() и thenRun()

Если вы не хотите возвращать результат, а хотите просто выполнить часть кода после завершения Future, можете воспользоваться методами thenAccept() и thenRun(). Эти методы являются потребителями и часто используются в качестве завершающего метода в цепочке.

CompletableFuture.supplyAsync(() -> {
return StudentService.getCurrentStudents(studentId);
}).thenAccept(student -> {
System.out.println("Get all info about current srudent" + student.getFirstName())
});

Мы также можем объеденить несколько CompletableFuture, обрабатывать исключения, использовать асинхронные callback и другие возможности.

Но это все не предназначено для работы с задержкой, такой как операции ввода-вывода. И именно здесь появляются такие Reactive API, как Reactor или RxJava.

Реактивные API, такие как Reactor, предназначены для обработки как синхронных, так и асинхронных операций и позволяют буферизовать, объединять или применять широкий спектр преобразований к вашим данным.
Изначально API Reactive были разработаны только для работы с потоками данных типа Flux.
Но со временем был представлен и поток Mono.

Mono является реактивным эквивалентом CompletableFuture типа.

Чуть дальше мы более детально разберем Mono и Flux.

Flux и Mono реализуют Publisher интерфейс из спецификации Reactive Streams.

Оба класса соответствуют спецификации, и мы могли бы использовать этот интерфейс вместо них:

Publisher<String> just = Mono.just("one");

Но на самом деле это было сделано потому что некоторые операции имеют смысл только для одного из двух типов.

Основной задачей Reactive Streams является обработка backpressure. Я не стал переводить это слово, дабы не допустить ошибки в понимании.
Backpressure — это механизм, который позволяет получателю спрашивать, сколько данных он хочет получить.
Т.е. получатель начинает получать данные только тогда, когда он готов их обработать.

Основным артефактом проекта Reactor является reactor-core реактивная библиотека, которая фокусируется на спецификации Reactive Streams и ориентирована на Java 8+

Принцип работы

В официальной документации Reactor сравнивается с конвейером.

Publisher выдаёт какие-то данные (материалы). Данные идут по цепочке из операторов (конвейерной ленте), обрабатываются, в конце получается готовый продукт, который передаётся в нужный Consumer/Subscriber и употребляется уже там.

Оператор — это некий Publisher, который помимо какой-то своей логики содержит ссылку на исходный Publisher, к которому применяется. Вызовы операторов создают цепочку из Publisher.

Реактивное программирование возникло из-за желания писать асинхронный неблокируемый код в читаемом виде. Ни код, написанный на колбэках, ни код, написанный с помощью CompletableFuture, не может быть настолько удобочитаемым, как этого можно добиться при помощи реактивности.

В основе подхода лежит идея разделения компонентов на 2 типа: источник событий (Publisher) и обработчик событий (Subscriber).
Subscriber подписывается на события, которые создаёт Publisher, а затем каким-то образом их обрабатывает. По сути это паттерн Observer с надстроенными поверх возможностями и особенностями.

Существует еще одно понятие — Observer.
Он может подписаться на событие объекта и выполнять какие либо действия с полученным результатом.
У одного Subject может быть много подписчиков.

Общение между Publisher и Subscriber происходит через объект Subscription.

Subcriber может регулировать скорость поставки сообщений от Publisher (т.к. backpressure), а также отменять подписку.
Publisher’ы можно объединять в цепочки и комбинировать разными способами.

Интерфейсы Publisher, Subscriber и Subscription находятся в пакете org.reactivestreams, который по умолчанию был добавлен в Java 9.

Они задают спецификацию для собственной реализации реактивных потоков.
Библиотека Project Reactor является такой реализацией.

Небольшое вступление

В Reactor есть два типа Publisher: Flux<T> и Mono<T>.
О них чуть ниже.

Как только происходит публикация ивента, Subscriber его сразу же и получает.

Как можно получить поток ?

Разными способами, например

Flux flux1 = Flux.just(“foo”, “bar”, “foobar”);
Flux flux2 = Flux.fromIterable(Arrays.asList(“A”, “B”, “C”));
Flux flux3 = Flux.range(5, 3);

На самом деле, при вызове нового каждого оператора в цепочке создаётся новый Publisher, который добавляется к цепочке.
Например:

Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.limitRequest(5)
.skip(3)
.subscribe(value -> System.out.println("Value: " + value));

В качестве аналогии можно привести конвейер, по которому перемещаются элементы. Операторы же по очереди проводят с ними манипуляции.
Вначале мы ограничили лимит = 5, пропустили 3 элемента и подписались на наш поток, чтобы получить данные.
Пока все просто.

Следующие элементы в цепочке вызываются под капотом при помощи метода onNext().

Что касается исключений, так они являются терминальными. Это значит что поток сразу завершает свою работу.

Вы можете вернуть значение по умолчанию в случае ошибки выполнения метода при помощи onErrorReturn().

Никаких дополнительных потоков по умолчанию не создаётся при использовании реактивного подхода. Всё работает в потоке подписки: в каком потоке мы подписываемся на Publisher, в том все элементы и будут обрабатываться.

Спецификация реактивных потоков запрещает null значения в последовательности.

Ну а теперь разберемся чуть более подробно.

Что такое WebFlux ?

Spring 5 представил платформу WebFlux, которая представляет собой полностью асинхронный и неблокирующий реактивный веб-стек, который позволяет обрабатывать огромное количество одновременных соединений.
Модуль WebFlux является альтернативой Spring MVC и представляет собой реактивный подход для написания веб-сервисов.

WebFlux позиционирует себя как микрофреймворк.
Этот новый микрофреймворк поддерживает аннотированные контроллеры, функциональные конечные точки, WebClient (аналог RestTemplate в Spring Web MVC), WebSockets и многое другое.

В основе WebFlux лежит библиотека Reactor.

Вам нужен Spring Boot версии 2+ для использования модуля Spring WebFlux.

WebFlux по умолчанию использует Netty. Tomcat не поддерживает реактивщину.

Пару слов о Netty

Он предназначен для неблокирующего ввода-вывода.

На входе у него есть один поток, который работает в бесконечном цикле. Благодаря селектору и канальному механизму, он перенаправляет данные из входящих запросов во входящие буферы и делегирует обработку этих запросов выделенному пулу потоков асинхронных потоков. И также в обратном направлении.

В чем отличие Spring WebFlux от RxJava ?

Spring WebFlux и RxJava2+ являются реализациями реактивных потоков.

Т.е. теперь начинает складываться более детальная картина:
Project Reactor — это библиотека, которая лежит в основе WebFlux.
Она исправляет недостатки в RxJava и больше подходит для бэкэнд-разработки. RxJava имеет некоторые проблемы, которые могут вызвать нехватку памяти, например (взято из официальных источников).

Как сказал David Karnok

Use Reactor 3 if you are allowed to use Java 8+, use RxJava 2 if you are stuck on Java 6+ or need your functions to throw checked exceptions.

Если можно сказать простыми словами, то “Реактивное программирование” — это программирование с асинхронными потоками (streams) данных.

Т.е. можно слушать поток и реагировать на события в нем. Можем фильтровать поток как нам вздумается, объединять потоки, кроме того потоки могут быть входными параметрами других потоков.
Даже множественный поток может быть использован как входной аргумент другого потока. Вы можете объединять несколько потоков. Вы можете фильтровать один поток, чтобы потом получить другой, который содержит только актуальные данные. Вы можете объединять данные с одного потока с данными другого, чтобы получить еще один.

Поток — это некая последовательность, состоящая из постоянных событий, отсортированных по времени. В нем может быть три типа сообщений: значения (данные некоторого типа), ошибки и сигнал о завершении работы.
Нам лишь надо подписаться на поток.
Наблюдатель (observers) подписывается на поток.

Данные же будут получены тогда, когда они будут готовы — произойти это может в этом же самом либо другом потоке. Publisher их сам отдаст, когда они придут. Это push-модель. Отдача готового элемента — это event. Реактивная модель основана на событиях (event-driven).

Что такое Back-pressure ?

Мы возвращаем не объект, а “обещание” объекта — Publisher, который будет отдавать объекты, как только они появятся. Отдавать мы их будем Subscriber-у — тому, кто подписывается на Publisher.
Подписчик может быть как один, так и много. Subscriber и получает объекты. Причем подписчик может регулировать скорость потока, это и называется Back-pressure.

Как уже говорилось выше, основная концепция реактивного программирования — это неблокирующий ввод/вывод.

Слушать поток означает подписаться на него. Т.е. функции, которые мы определили — это наблюдатели (Observers). А поток является субъектом, который наблюдают. Этот подход называется Observer Design Pattern.

Если у нас есть Publisher, который отправляет события потребителю быстрее, чем он может их обработать, то, в конце концов, потребитель будет перегружен событиями, которые истощают системные ресурсы. Backpressure означает, что наш клиент должен иметь возможность сообщить производителю, сколько данных отправлять, чтобы предотвратить это, и это указано в самой спецификации.

Попробуем реализовать этот механизм следующим образом.

Давайте скажем восходящему потоку отправлять только два элемента одновременно, используя request()

List<Integer> elements = new ArrayList<>(); 

Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;

@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}

@Override
public void onNext(Integer integer) {
elements.add(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {}
});

Получим:

16:26:39.915 [main] INFO reactor.Flux.Array.1 — | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
16:26:39.917 [main] INFO reactor.Flux.Array.1 — | request(2)
16:26:39.917 [main] INFO reactor.Flux.Array.1 — | onNext(1)
16:26:39.917 [main] INFO reactor.Flux.Array.1 — | onNext(2)
16:26:39.917 [main] INFO reactor.Flux.Array.1 — | request(2)
16:26:39.917 [main] INFO reactor.Flux.Array.1 — | onNext(3)
16:26:39.917 [main] INFO reactor.Flux.Array.1 — | onNext(4)
16:26:39.917 [main] INFO reactor.Flux.Array.1 — | request(2)
16:26:39.918 [main] INFO reactor.Flux.Array.1 — | onComplete()

По сути, это реактивное baclpressure. Мы просим стрим подтолкнуть нам только определенное количество элементов, и только тогда, когда когда мы будем готовы.

Зачем мне вообще использовать RxJava или Reactor, если то же самое можно сделать с Streams, CompletableFutures и Optionals?

Проблема, по сути, заключается в том, что большую часть времени вы имеете дело с простыми задачами и вам действительно не нужны эти библиотеки. Но когда все усложняется, вы должны писать некрасивый кусок кода. Затем этот кусок кода становится все более сложным и сложным в поддержке.

RxJava и Reactor имеют множество удобных функций, которые будут удовлетворять ваши потребности на долгие годы.

Есть два варианта получить данные

- Pull

Это когда мы сами делаем запрос на получение и нам приходит ответ.

- Push

Когда данные сами нас уведомляют об изменениях и система “выталкивает” их нам.

Реактивное приложение, это когда приложение само извещает нас об изменении своего состояния. Не мы делаем запрос и проверяем, а не изменилось ли там что-то, а приложение само нам сигнализирует. Ну и конечно эти события и эти сигналы мы можем обрабатывать как нам вздумается.

Pull — коллекция (аналог — массив): в ней есть данные, которые мы можем получить по запросу, предварительно обработав их как нам хочется.

Push — полная противоположность: изначально в ней нет данных, но как только они появятся, она нам сообщит об этом. Во время этого мы также можем делать с ней что хотим и как только в коллекции появятся значения, она выполнит все наши фильтры (которые мы на нее навесили) и выдаст нам результат.

Push коллекция как-бы “состоит” из новой сущности Observable.

Это и есть коллекция, которая будет рассылать уведомления об изменении своего состояния.
Для реализации этого похода в реактивном контексте существует такое понятие как Callback — это объект или несколько объектов, которые “отслеживают” необходимые события, происходящие с обслуживаемыми объектами, и либо сообщают об этих событиях другим слушателям, либо отдают объекты, с которыми произошли это события, асинхронным потоком для обработки.

Спецификация для реактивного подхода Spring WebFlux:

ссылка

Основные концепции

В новом подходе у нас есть два основных класса для работы в реактивном режиме:

- Mono

Класс Mono нужен для работы с единственным объектом.
Mono также может использоваться как какая-то асинхронная задача в стиле “выполнил и забыл”, без возвращаемого результата (очень похож на Runnable).

  • Flux

Данный класс схож с Mono, но предоставляет возможность асинхронной работы со множеством объектов.
Flux — это Publisher, способный выпустить от 0 до N событий (элементов), в том числе и бесконечное их число.

Flux и Mono реализуют Publisher интерфейс из спецификации Reactive Streams.

Разделение на Flux и Mono помогает улучшить семантику реактивного API, делая его достаточно выразительным.

Flux и Mono — lazy.
Для того чтобы запустить какую-то обработку и воспользоваться данными, лежащими в Mono и Flux, нужно на них подписаться с помощью subscribe().
Методы subscribe() используют “лямбда-выражения” из Java 8 в качестве параметров.

Способы подписаться

  1. Подписаться (слушать поток):

subscribe();

2) Сделать что-то с каждым полученным значением

subscribe(Consumer<? super T> consumer);

3) Сделать что-то в случае исключения

subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);

4) Сделать что-то по завершению

subscribe(
Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer
);

Данные передаются следующим образом

  1. Вызывается метод subscribe()
    2) Затем создается Subscription объект
    3) После этого Subscriber вызовет request() метод в Subscription классе, чтобы указать количество объектов, которые он может обработать (если этот метод не вызывается явно, запрашивается неограниченное количество объектов).
    4) Subscriber может получать объекты с помощью метода onNext().
    Если подписчик получает все запрошенные им объекты, он может запросить дополнительные объекты или отменить подписку, вызвав onComplete(). Если в какой-то момент возникает ошибка, издатель вызывает метод onError() на подписчике.
Flux.just(1, 2, 3, 4)
.subscribe(System.out::println);

Данные не начнут поступать, пока мы не подпишемся — метод subscribe().

Когда мы вызываем подписку, под капотом вызывается Subscription, который запрашивает элементы из потока (это означает, что он запрашивает каждый доступный элемент).

Этот поток описывается в интерфейсе Subscriber как часть спецификации реактивных потоков, и фактически это то, что было реализовано за кулисами в нашем вызове метода onSubscribe().

В интерфейсе Subscriber 4 метода:

- onSubscribe()
- onNext()
- onComplete()
- onError()

Мы можем записать это по-другому

Flux.just(1, 2, 3, 4)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX__VALUE);
}

@Override
public void onNext(Integer integer) {
elements.add(integer);
}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {}
});

Давайте глянем на следующий пример

List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.subscribe(elements::add);

Мы получим следующий результат

20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()

onSubscribe() —вызывается когда мы подписываемся на поток

request(unbounded) — вызывается когда мы вызываем подписку (subscribe), за кулисами мы создаем Subscription. Эта подписка запрашивает элементы из потока. В этом случае он запрашивает каждый доступный элемент.

onNext() — вызывается на каждом элементе

onComplete() — вызывается последним, после получения последнего элемента.

Основные приемущества

Реактивность дает слабую связанность.

В некоторых случаях это дает возможность писать более простой и понятный код.

Например мы можем взять обычную коллекцию, преобразовать ее к реактивной коллекции и тогда мы будем иметь коллекцию событий об изменении данных в ней. Мы очень просто получаем только те данные, которые изменились. По этой коллекции мы можем делать выборку, фильтровать и т.д.

Если бы мы это делали обычным традиционным способом, то нам нужно было бы получить данные, закэшировать их, сделать запрос на получение новых данных,сравнить их с текущим значением.

Чтобы получить данные, мы обращаемся к базе данных, получаем данные и отдаем пользователю. Если у вас в этот момент времени на секунду пропадет интернет то вы получите ошибку. Беда.

Реактивщина нам поможет в данном случае.

Аналог для примера: Gmail, Facebook, Instagram, Twitter. Когда у вас плохой интернет, вы не получаете ошибку, а просто ждете результат немного дольше.

Вы едете в метро, обновили ленту в instagram, у у вас пропал интернет, спустя минуту он у вас появился и вам не надо будет обновлять пальцем сверху вниз (сделать еще один запрос на получение данных), система вам отдаст эти данные сама.

Обязанности:
- Приложение должно отдавать пользователю результат за полсекунды
- Обеспечить отзывчивость под нагрузкой
- Система остается в рабочем состоянии даже, если один из компонентов отказал.
- Система должна занимать оптимальное количество ресурсов в каждый промежуток времени.

Общение между сервисами должно происходить через асинхронные сообщения. Это значит, что каждый элемент системы запрашивает информацию из другого элемента, но не ожидает получение результата сразу же. Вместо этого он продолжает выполнять свои задачи.

Когда стоит использовать ?

Реактивщину стоит использовать, когда есть поток событий, растянутый во времени (например, пользовательский ввод).

Функционал, написанный при помощи реактивных потоков, может быть легко дополнен и расширен.

Но самое важное, это то, что не надо использовать реактивщину везде где только попало)

Реактивщина + Реляционные БД

Многие в интернете говорят, что транзакционная БД не подходит для реактивной концепции. Концепция транзакции не совсем соответствует реактивному миру, так как она связана с блокировкой ресурса, а это именно то, чего стараются избежать, используя реактивность.

Например если вы используете реляционную БД без поставляемого реактивного драйвера, то все запросы к этой базе всё равно будут блокируемыми, так что никакой выгоды от использования реактивного стека к этим запросам здесь не будет, как бы этого не хотелось.

На данный момент предпринимаются первые шаги в реализации реактивного подхода для реляционных баз данных, хотя готовых решений ещё не существует.

Реактивный клиент для SQL DB:
ссылка

Некоторые примеры с Mono и Flux

Я хотел бы привести несколько примеров для полноты понимания работы потоков Mono и Flux.

После того, как Flux/Mono отправил какие-то данные, подписка на эти события происходит просто: вызываем метод subscribe().
Также мы можем передать туда лямбду, которая будет вызываться для каждого элемента в потоке

Flux.just("1", "2", "3", "4")
.subscribe(value -> System.out.println("Value: " + value));

В случае ошибки вызовется метод onError().
Этот обработчик можно задать вторым параметром subscribe() метода

Flux.just(1, 2, 3, 4, 5)
.subscribe(value -> {
if (value > 4) {
throw new IllegalArgumentException(value + " > than 4");
}
System.out.println("Value: " + value);
}, error -> System.out.println("Error: " + error.getMessage()));
илиFlux<Integer> ints = Flux.range(1, 4)
.map(i -> {
if (i <= 3) return i;
throw new RuntimeException("Got to 4");
});
ints.subscribe(System.out::println,
error -> System.err.println("Error: " + error));

В случае успешного завершения выполнится обработчик onComplete().
Его можно задать третьим параметром

Flux.just(1, 2, 3, 4)
.subscribe(value -> System.out.println("Value: " + value),
error -> {},
() -> System.out.println("Successfull"));

Четвертый вариант работы с subscribe() методом, это Consumer<Subscription>. Этот вариант требует, чтобы вы что-то сделали с Subscription (выполнить request(long) на нем или cancel()), иначе Flux просто повиснет. Определяем его как четвертый параметр.
Здесь мы говорим, что мы хотим до 10 элементов из источника (который на самом деле испустит 4 элемента и завершится).

Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(System.out::println,
error -> System.err.println("Error " + error),
() -> System.out.println("Done"),
sub -> sub.request(10));

Экземпляры типа Mono и Flux можно “преобразовывать” друг в друга.

Например Flux<T>.collectList() вернет Mono<List<T>>, а Mono<T>.flux() — вернет Flux<T>.

А метод block() — блокируем пока не будет получен следующий сигнал или не истечет время ожидания (метод перегружен: public T block(Duration timeout))

Mono<List<Integer>> listMono = Flux.just(1, 2, 3, 4)
.filter(value -> value % 2 == 0)
.collectList();
System.out.println(listMono.block());

Вы можете отменить подписку с помощью объекта Disposable.
Метод onDispose() может использоваться для отмены подписки после завершения Flux или ошибок или очистки.
В результате получите: 1, 2

Disposable disposable = Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.delayElements(Duration.ofSeconds(3))
.subscribe(value -> System.out.println("Value: " + value));

Thread.sleep(7000);
disposable.dispose();
System.out.println("Cancelling subscription");

Документация говорит следующее:

These variants [of operators] return a reference to the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values and clean up any resources it created. This cancel and clean-up behavior is represented in Reactor by the general-purpose Disposable interface.

Вы также можете отменить подписку с помощью onCancel().
Метод onCancel() может использоваться для выполнения любых действий, специфичных для отмены до того как работает onDispose().

Под капотом вызывается Subscription.cancel().

Метод onCancel() вызывается первым — можно использовать для выполнения любых действий, относящихся к отмене до работы onDispose().

Метод onDispose() можно использовать для выполнения очистки, когда Flux завершает работу, выдает ошибки или отменяется.

Вы можете создать свой Flux

- при помощи метода .generate()
- при помощи метода .create()
- при помощи just()
- justOrEmpty — выводит Mono<>, если элемент не null, иначе сигнал завершения
- fromArray
- fromIterable
- range
- fromStream
- fromSupplier
- fromRunnable
- fromFuture
- из стороннего Publisher

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);Mono<String> noData = Mono.empty();  //  пустой MonoMono<String> data = Mono.just("foo");Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);  //  Первый параметр - это начало диапазона, а второй параметр - количество элементов, которые нужно произвести.

Flux<Integer> ints = Flux.range(1, 3);
ints.subscribe(); // подписка на поток

или

Flux<Integer> ints = Flux.range(1, 3);
ints.subscribe(i -> System.out.println(i));  //  будет выводить значения в консоль. Получим: 1, 2, 3

Publisher<String> publisher = redisson.getKeys().getKeys();
Flux<String> from = Flux.from(publisher);

Что такое Reactor IPC ?

Reactor IPC — это расширение Reactor, которое позволяет интегрироваться с различными платформами и системами, поддерживающими неблокирующий ввод / вывод (например, Netty, Kafka).

Когда запрос поступает в Netty, он немедленно обрабатывается с использованием класса ChannelOperations . Далее вызывается цепочка вызовов и, наконец, запрос достигает DispatcherHandler (он направляет входящие запросы другим контроллерам).

Затем запрос достигает контроллера.

Дело в том, что по умолчанию все Mono и Flux холодные (ниже мы рассмотрим более подробно, что это означает).

Затем на основе Publisher начал выстраиваться поток, достигший класса ChannelOperations, и только здесь, в ChannelOperations, был вызван метод subscribe(), и только в этот момент этот поток начал работать (начали происходить HTTP-вызовы).

Спасибо что дочитали до конца.

Продолжение во второй статье.

Если вы нашли неточности в описании данной статьи, вы можете написать мне на email и я с радостью вам отвечу.

Kirill Sereda

email: kirill.serada@gmail.com

skype: kirill-sereda

linkedin: www.linkedin.com/in/ksereda

--

--