Reactive Programming: Reactor и Spring WebFlux — часть 2

Kirill Sereda
20 min readFeb 29, 2020

--

Это продолжение первой статьи про реактивную систему Spring 5.

Здесь представлены далеко не все методы, но на мой взгляд одни из самых часто встречаемых

  • map — преобразует элементы, испускаемые этим потоком, применяя синхронную функцию к каждому элементу.
Flux.just(1, 2, 3)
.map(s -> s + 1)
.subscribe(System.out::println);

Output:
2
3
4

Такие операции, как map или flatMap, не работают с результатом Mono, они создают новый Mono, который добавляет другое преобразование к выполнению исходного Mono.

  • flatMap — асинхронно преобразует элементы, испускаемые этим потоком, в Publisher-ы, а затем объединяет эти внутренние Publisher-ы в единый поток посредством слияния

flatMap не гарантирует порядок элементов!

пример 1:

Flux.just("1,2,3", "4,5,6")
.flatMap(i -> Flux.fromIterable(Arrays.asList(i.split(","))))
.collect(Collectors.toList())
.subscribe(System.out::println);

Output:
[1, 2, 3, 4, 5, 6]

пример 2:

Flux.range(1, 10)
.flatMap(v -> {
if (v < 5) {
return Flux.just(v * v);
}
return Flux.<Integer>error(new IOException("Error: "));
})
.subscribe(System.out::println, Throwable::printStackTrace);
Output:
1
4
9
16
java.io.IOException: Error:

пример 3:

List<String> list = Arrays.asList("a", "b", "c", "d", "e", "f");

Flux.fromIterable(list)
.flatMap( s -> {
return Flux.just(s + "x");
})
.collect(Collectors.toList())
.subscribe(System.out::println);

Output:
[ax, bx, cx, dx, ex, fx]

пример 4:

Flux.just("Hello", "world")
.flatMap(s -> Flux.fromArray(s.split("")))
.subscribe(System.out::println);

Output:
H
e
l
l
o
w
o
r
l
d
  • concatMap — работает почти так же, как flatMap, но сохраняет порядок элементов.

Например: сделать что-то для каждого элемента в отсортированном списке: FlatMap или switchMap не должны использоваться в этом случае. Из-за синхронных вызовов в concatMap необходимо учитывать увеличение времени обработки (что не всегда благоприятно).

Разница между flatMap(), concatMap() и switchMap():

System.out.println("Using flatMap():");
Flux.range(1, 15)
.flatMap(item -> Flux.just(item).delayElements(ofMillis(1)))
.subscribe(x -> System.out.print(x + " "));

Thread.sleep(100);

System.out.println("\n\nUsing concatMap():");
Flux.range(1, 15)
.concatMap(item -> Flux.just(item).delayElements(ofMillis(1)))
.subscribe(x -> System.out.print(x + " "));

Thread.sleep(100);
System.out.println("\n\nUsing switchMap():");
Flux.range(1, 15)
.switchMap(item -> Flux.just(item).delayElements(ofMillis(1)))
.subscribe(x -> System.out.print(x + " "));

Thread.sleep(100);
Output:Using flatMap():
2 3 4 5 6 1 7 8 9 10 11 13 14 12 15
Using concatMap():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Using switchMap():
15

Как вы видите основное отличие в порядке следования элементов!

flatMap: не сохраняет порядок элементов, работает асинхронно

switchMap: отписаться от предыдущей подписки после выпуска новой

concatMap: сохранение порядка элементов, работает синхронно

  • switchMap — он похож на flatMap, за исключением того, что он сохраняет результат только последней подписки, отбрасывая предыдущие.

Пример: обработка новых данных (например, список сообщений в ленте). Предположим, что где-то в приложении есть Observable, который периодически генерирует список объектов. Это может быть список сообщений на временной шкале, который обновляется каждый раз, когда пользователь взаимодействует с ним. В этом случае лучшим оператором для использования будет switchMap, потому что нас не волнует предыдущий результат, если у нас есть новый набор элементов. От него можно отказаться и подписаться на новейшие данные. Это может сэкономить нам время, если пропустить обработку предыдущего (старого) ответа.

Или поиск предметов по запросу. Предположим, что пользователь вводит буквы: «t», затем «h». Весь запрос теперь “th”, поэтому нет необходимости подписываться на результаты с буквой “t”. В этом случае мы можем смело использовать switchMap.

Flux.range(1, 15)
.switchMap(item -> Flux.just(item).delayElements(ofMillis(1)))
.subscribe(x -> System.out.print(x + " "));

Thread.sleep(100);


Output:
15
  • startWith — добавляет элементы перед последовательностью.

Пример 1:

Flux.range(1, 3)
.startWith(Flux.just(9, 8, 7))
.subscribe(System.out::println);

Output:
9
8
7
1
2
3

Пример 2:

Flux.range(1, 3)
.startWith(Mono.just(90))
.subscribe(System.out::println);

Output:
90
1
2
3
  • collectList -преобразование Flux в Mono (например Flux<T>.collectList() вернет Mono<List<T>>)
Flux.range(1, 3)
.collectList()
.subscribe(System.out::println);


Output:
[1, 2, 3]
  • flux — преобразование Mono в Flux (например Mono<T>.flux() — вернет Flux<T>)
Mono.just(1)
.flux()
.subscribe(System.out::println);
  • collectSortedList — преобразование потока в Mono (собирает все элементы, испускаемые этим потоком, пока эта последовательность не завершится, а затем сортирует их в естественном порядке в список)
Flux.just(1, 4, 3, 5, 2)
.collectSortedList()
.subscribe(System.out::println);

Output:
[1, 2, 3, 4, 5]
  • collectMap — результатом операции collectMap является Mono, содержащий Map, отправленные входящим потоком.

Пример 1:

Flux.just(1, 2, 3)
.collectMap(a -> "key: " + a, b -> " value: " + b)
.subscribe(System.out::println);

Output:
{key: 2= value: 2, key: 1= value: 1, key: 3= value: 3}

Пример 2:

Flux.just("one", "three", "four")
.collectMap(a -> a.charAt(0), b -> b)
.subscribe(System.out::println);


Output:
{t=three, f=four, o=one}
  • collectMultiMap — соберает все элементы, испускаемые этим потоком, в MultiMap. Ключи для разных значений объединяются.
Flux.just("one", "two", "three", "four", "five")
.collectMultimap(a -> a.charAt(0), b -> b)
.subscribe(System.out::println);


Output:
{t=[two, three], f=[four, five], o=[one]}
  • collect — соберает все элементы, испускаемые этим потоком, в контейнер, применяя Java 8 Stream API Collector. Собранный результат будет выпущен, когда эта последовательность завершится.
Flux.just("one", "two", "three", "four", "five")
.map(s -> s + "_new")
.collect(Collectors.toList())
.subscribe(System.out::println);


Output:
[one_new, two_new, three_new, four_new, five_new]
  • reduce — позволяет конвертировать все элементы потока в один объект. Также позволяет выполнять агрегатные функции для всей коллекции (например, значения SUM, MIN и MAX)

Пример 1:

Flux.just(4, 5, 6)
.reduce(Integer::sum)
.subscribe(System.out::println);


Output:
15

Пример 2:

Flux.just(4, 5, 6)
// .reduce((s1, s2) -> s1 > s2 ? s1 : s2)
.reduce(Integer::max)
.subscribe(System.out::println);

Output:
6

Пример 3: вернет последний элемент

Flux.just(4, 5, 6)
.reduce((s1, s2) -> s2)
.subscribe(System.out::println);


Output:
6

Пример 4: найти самого старшего мужчину

public class People {

private String name;
private Integer age;
private Sex sex;

enum Sex {
MAN, WOMEN

}

public People(String name, Integer age, Sex sex) {
this.name = name;
this.age = age;
this.sex = sex;
}

// get/set
}

public class TestWork2 {
public static void main(String[] args) {

Collection<People> peoples = Arrays.asList(
new People("Vasya", 16, People.Sex.MAN),
new People("Petr", 23, People.Sex.MAN),
new People("Elena", 42, People.Sex.WOMEN),
new People("Ivan", 69, People.Sex.MAN)
);

Flux.fromIterable(peoples)
.filter((s) -> s.getSex() == People.Sex.MAN)
.map(People::getAge)
.reduce(Integer::max)
.subscribe(System.out::println);

}
}


Output:
69

Пример 5: найти минимальный возрас человека, им которого содержит букву “e”

public class TestWork2 {
public static void main(String[] args) {

Collection<People> peoples = Arrays.asList(
new People("Vasya", 16, People.Sex.MAN),
new People("Petr", 23, People.Sex.MAN),
new People("Elena", 42, People.Sex.WOMEN),
new People("Ivan", 69, People.Sex.MAN)
);

Flux.fromIterable(peoples)
.filter((s) -> s.getName().contains("e"))
.map(People::getAge)
.reduce(Integer::min)
.subscribe(System.out::println);

}
}


Output:
23
  • runOn

Reactor помогает распараллелить работу, предоставляя специальный тип ParallelFlux, который предоставляет операторы, оптимизированные для параллельной работы.

Чтобы получить ParallelFlux, вы можете использовать оператор parallel() на любом Flux. Сам по себе этот метод не распараллеливает работу. Скорее, он делит рабочую нагрузку на так называемые «rails» (по умолчанию столько, сколько имеется ядер в вашем ЦП).

Чтобы сообщить результирующему ParallelFlux, где запускать каждую направляющую (и, соответственно, запускать направляющие параллельно), вы должны использовать runOn (планировщик). Обратите внимание, что для параллельной работы рекомендуется использовать специальный планировщик Schedulers.parallel().

Если после параллельной обработки вашей последовательности вы захотите вернуться к «обычному» потоку и последовательно применить остальную часть цепочки операторов, вы можете использовать метод sequential() в ParallelFlux.

Пример 1:

Flux.range(1, 10)
.parallel(2)
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

Output:
main -> 1
main -> 2
main -> 3
main -> 4
main -> 5
main -> 6
main -> 7
main -> 8
main -> 9
main -> 10

Пример 2:

Flux.range(1, 10)
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

Output:
parallel-1 -> 1
parallel-1 -> 3
parallel-1 -> 5
parallel-2 -> 2
parallel-1 -> 7
  • zip — объединение данных из двух Publisher-ов
Flux.just("a", "b", "c")
.zipWith(Flux.just(1, 2, 3), (word, number) -> word + number)
.subscribe(System.out::println);

Output:
a1
b2
c3

Пример 2: у нас етсь 3 потока и мы хотим получить один

Flux<Users> usersFlux(Flux<String> firstName, Flux<String> lastName, Flux<String> email) {
return Flux.zip(firstName, lastName, email)
.map(t->new Users(t.getT1(),t.getT2(),t.getT3()));
}
  • concat — применяется если вам нужно объединить потоки так, чтобы сначала шли элементы из первого потока, а затем из второго, независимо от того, как они поступают в режиме реального времени.

concat() и concatWith() — эквивалентны.

Flux<Integer> oddFlux = Flux.just(1, 3);
Flux<Integer> evenFlux = Flux.just(2, 4);

Flux.concat(evenFlux, oddFlux)
.subscribe(value -> System.out.println("Outer: " + value));


Output:
Outer: 2
Outer: 4
Outer: 1
Outer: 3
  • merge — объединение элементов двух Publisher-ов в один

merge() и mergeWith() — эквивалентны.

Flux<Integer> oddFlux = Flux.just(1, 3);
Flux<Integer> evenFlux = Flux.just(2, 4);

oddFlux.mergeWith(evenFlux)
.subscribe(value -> System.out.println("Outer: " + value));


Output:

Outer: 1
Outer: 3
Outer: 2
Outer: 4
  • transform — позволяет вам инкапсулировать часть оператора в функцию. Эта функция применяется к исходной цепочке операторов во время сборки, чтобы дополнить ее некоторыми инкапсулированными операторами.

Те. мы берем отдельный метод фильтра и map в отдельную функцию, а затем инкапсулируем ее в нашу логику.

Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
.map(String::toUpperCase);

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.transform(filterAndMap)
.subscribe(d -> System.out.println("MapAndFilter for: " + d));
  • hasElements — выдает логическое значение true, если эта последовательность Mono/Flux имеет хотя бы один элемент в последовательности (аналог any в Stream API)
Flux<Integer> oddFlux = Flux.just(1, 3);

oddFlux.hasElements()
.subscribe(value -> System.out.println("true/false: " + value));


Output:
true/false: true
  • hasElement — выдает логическое значение true, если какой-либо из элементов этой последовательности Mono/Flux равен предоставленному значению
Flux<Integer> oddFlux = Flux.just(1, 3);

oddFlux.hasElement(1)
.subscribe(value -> System.out.println("true/false: " + value));


Output:
true/false: true
  • groupBy — разделяет последовательность на динамически создаваемый поток (или группы) для каждого уникального ключа этой последовательности

Пример 1:

Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even:" : "odd:")
.concatMap(Flux::collectList)
.subscribe(System.out::println);

Output:
[1, 3, 5, 11, 13]
[2, 4, 6, 12]

Пример 2:

Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even:" : "odd:")
.concatMap(g -> g.defaultIfEmpty(1) // if empty groups, show them
.map(String::valueOf) // map to string
.startWith(g.key())) // start with the group's key
.subscribe(System.out::println);

Output:
odd:
1
3
5
11
13
even:
2
4
6
12

Пример 3:

Flux.just("Hello", "world")
.map(String::toUpperCase)
.flatMap(s -> Flux.fromArray(s.split("")))
.groupBy(String::toString)
.concatMap(Flux::collectList)
.subscribe(System.out::println);

Output:
[H]
[E]
[L, L, L]
[O, O]
[W]
[R]
[D]
  • repeat/repeatWhen — подписка на источник на неопределенный срок после завершения предыдущей подписки.

Обратите внимание, что repeat и repeatWhen работают только до тех пор, пока метод не выдаст ошибку. Если вы хотите повторить попытку при возникновении ошибки, вы можете использовать retry или retryWhen

Пример 1:

public class TestWork {

private int testNumbers(int value) {
if (value > 4) {
System.out.println("Test checked");
}
return value;
}

private Flux<Integer> checkOnErrorMethodRetry() {
Flux.range(1, 5)
.map(this::testNumbers)
.repeat(2)
.subscribe(value -> System.out.println("Value: " + value),
error -> System.out.println("Error: " + error.getMessage()));

return null;

}

public static void main(String[] args) {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodRetry());
}

}


Output:
Value: 1
Value: 2
Test checked
Value: 3
Value: 1
Value: 2
Test checked
Value: 3
Value: 1
Value: 2
Test checked
Value: 3

Но если мы добавим выбрасываемое исключение, то повтор не работает!!!

public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
System.out.println("Test checked");
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethodRetry() {
Flux.range(1, 3)
.map(this::testNumbers)
.repeat(2)
.subscribe(value -> System.out.println("Value: " + value),
error -> System.out.println("Error: " + error.getMessage()));

return null;

}

public static void main(String[] args) {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodRetry());
}

}

Output:
Value: 1
Value: 2
Test checked
Error: value is too high!
  • retry/retryWhen — аналог repeat, но используется если вы хотите повторить выполнение при возникновении ошибки.
public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethodRetry() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.retry(2)
.subscribe(value -> System.out.println("Value: " + value),
error -> System.out.println("Error: " + error.getMessage()));

return null;

}

public static void main(String[] args) {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodRetry());
}

}

Мы также можем использовать оператор retryWhen. Эквивалент повторения (N).

Здесь мы можем указать количество попыток. Сначала мы вызываем ошибку и печатаем ее (используя doOnError), а затем пытаемся выполнить 3 раза (и печатаем каждую попытку вызова)

Flux<String> flux = Flux
.<String>error(new IllegalArgumentException())
.doOnError(System.out::println)
.retryWhen(companion -> companion.take(3));

Output:
java.lang.IllegalArgumentException
java.lang.IllegalArgumentException
java.lang.IllegalArgumentException
java.lang.IllegalArgumentException
  • interval (duration) — устанавливает некоторый промежуток времени.

Здесь через каждую секунду будет печататься результат от 1 до 3:

Flux.range(1, 3)
.delayElements(Duration.ofMillis(1000))
.doOnNext(System.out::println)
.blockLast();


Output:
1
2
3
  • defaultIfEmpty — вернет значение по умолчанию, если последовательность пуста.

Пример 1:

Mono.empty()
.defaultIfEmpty("No such elements!")
.subscribe(System.out::println);
Output:
No such elements!

Пример 2:

Flux.range(1, 6)
.defaultIfEmpty(222)
.subscribe(System.out::println);

Output:
1
2
3
4
5
6
  • switchIfEmpty — переключает на альтернативного Publisher-а, если эта последовательность завершена без каких-либо данных.
Flux.empty()
.switchIfEmpty(Flux.range(1,3))
.subscribe(System.out::println);

Output:
1
2
3
  • ignoreElements

Для Flux: игнорирует сигналы onNext (отбрасывает их) и распространяет только события завершения.

Для Mono: создает новый Mono, который игнорирует элементы из источника (отбрасывая их), но завершается, когда источник завершает работу.

public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
System.out.println("Failed");
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethodRetry() {
Flux.range(1, 3)
.map(this::testNumbers)
.doOnError(
error -> {
System.err.println("Statement failed: " + error);
})
.ignoreElements()
.subscribe(System.out::println);

return null;

}

public static void main(String[] args) {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodRetry());
}

}



Output:
Failed
null
Statement failed: java.lang.IllegalArgumentException: value is too high!

Но если мы удалим метод ignoreElements() — то получим такой результат:

1
2
Failed
null
Statement failed: java.lang.IllegalArgumentException: value is too high!
  • doOnNext — следующий элемент в последовательности (без изменения последовательности), то есть выполнить дополнительное действие. Если у нас есть, например, 5 элементов в потоке, то когда doOnNext вызывается один раз, он будет проходить через все 5 элементов.

doOnEach и doOnNext действительно довольно близки. Основное различие между этими методами заключается в том, что в doOnEach вы также получаете уведомления об ошибках и событиях завершения.

Пример 1:

Flux<Integer> oddFlux = Flux.just(1, 3);
Flux<Integer> evenFlux = Flux.just(2, 4)
.doOnNext(value -> System.out.println("Inner: " + value));

oddFlux.mergeWith(evenFlux)
.subscribe(value -> System.out.println("Outer: " + value));

Output:

Outer: 1
Outer: 3
Inner: 2
Outer: 2
Inner: 4
Outer: 4

Пример 2:

Flux.just("one", "two", "three")
.doOnNext(item -> {
if (item.equals("one")) System.out.println("First item: " + item);
else if (item.equals("three")) System.out.println("Last item: " + item);
})
.subscribe();

Output:
First item: one
Last item: three
  • doOnEach — Добавление поведения (побочные эффекты) запускается, когда Flux выдает элемент, завершается с ошибкой или завершается успешно.

doOnEach и doOnNext действительно довольно близки. Основное различие между этими методами заключается в том, что в doOnEach вы также получаете уведомления об ошибках и событиях завершения.

public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
System.out.println("Failed");
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> doOnEachMethod() {
Flux.range(1, 5)
.map(this::testNumbers)
.doOnEach(
item -> {
if (item.isOnComplete()) System.out.println("Item done!");
else if (item.isOnError()) System.out.println("Item fail!");
}
)
.subscribe(System.out::println);

return null;
}

public static void main(String[] args) throws InterruptedException {
TestWork testWork = new TestWork();
System.out.println(testWork.doOnEachMethod());
}
}



Output:
1
2
Failed
Item fail!
  • doOnComplete — сделать что-то если операция завершилась успешно (для Flux).

Пример 1:

Object result = Flux.just(5, 6, 7)
.doOnComplete(() -> System.out.println("Yes!"))
.subscribe(System.out::println);

Output:
5
6
7
Yes!

Пример 2:

Disposable result = Flux.just(5, 6, 7)
.doOnCancel(() -> System.out.println("Cancel!"))
.doOnComplete(() -> System.out.println("Yes!"))
.subscribe(System.out::println,
e -> {},
() -> {});
Output:5
6
7
Yes!
  • doOnSuccess — добавление поведения срабатывает при успешном завершении Mono — результатом является T или null.
Object result = Mono.just(5)
.doOnNext(m -> System.out.println(m + "_"))
.flatMap(m -> Mono.empty())
.doOnNext(m -> System.out.println(m + "N")) // не вызовется, т.к. Mono теперь пустой (на предыдущем шаге)
.doOnSuccess(m -> System.out.println("Done"))
.block();

System.out.println(result);


Output:
5_
Done
null
  • doOnCancel — добавление поведения (побочный эффект) срабатывает при отмене Flux/Mono (аналог для вызова Subscription.cancel()).
Disposable result = Flux.just(5, 6, 7)
.doOnCancel(() -> System.out.println("Cancel!"))
.doOnComplete(() -> System.out.println("Yes!"))
.subscribe(System.out::println,
e -> {},
() -> {},
Subscription::cancel);

Output:
Cancel!

где Subscription::cancel — означает отмену подписки

  • first — вернет тот Mono, который выдаст значение быстрее
Mono<String> mono1 = Mono.just("Test");
Mono<String> mono2 = Mono.just("Test2");

firstMono(mono1, mono2);
static Mono<String> firstMono(Mono<String> mono1, Mono<String> mono2) {
return Mono.first(mono1, mono2);
}
  • justOrEmpty — напечатает Mono, если элемент не нулевой, в противном случае сигнал завершения.
String string = new String("Peter");
justOrEmptyMethod(string);
static Mono<String> justOrEmptyMethod(String string) {
return Mono.justOrEmpty(string);
}
  • doFinally — аналог try-catch-finally
public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethodTryCatchFinally() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.doFinally(signalType -> {
if (signalType == SignalType.ON_ERROR) {
System.out.println("Error signal");
} else if (signalType == SignalType.CANCEL) {
System.out.println("Cancel signal");
}
})
.subscribe(value -> System.out.println("Value: " + value),
error -> System.out.println("Error: " +
error.getMessage()));
return null;
}

public static void main(String[] args) {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodTryCatchFinally());
}

}
  • using — аналог try-with-resource
AtomicBoolean isDisposed = new AtomicBoolean();Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true);
}

@Override
public String toString() {
return "Closed!";
}
};


Flux<String> flux =
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose
);
flux.subscribe(System.out::println);


Output:
Closed!
  • log — для вывода логов в консоль
Flux<Integer> reactiveStream = Flux.range(1, 5).log();
  • fliter — для фильтрации элементов в потоке.

Мы фильтруем и оставляем пользователя с именем «name1»

Flux.just(new User("Name1", 20), new User("name2", 25), new User("name3", 30))
.filter(s -> s.getName().equals("name1"))
.subscribe(s -> System.out.println("Value: " + s.getName()));
  • dispose — может использоваться для отмены подписки после завершения Flux или ошибок или очистки.
Disposable disposable = Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.delayElements(Duration.ofSeconds(3))
.subscribe(value -> System.out.println("Value: " + value));

Thread.sleep(7000);
disposable.dispose();
System.out.println("Cancelling subscription");


Output:
Value: 1
Value: 2
Cancelling subscription
  • doAfterTerminate — добавление поведения (побочный эффект) срабатывает после завершения Mono, либо путем успешного завершения нисходящего потока, либо с ошибкой.

Аналог Subscriber.onComplete() или Subscriber.onError(java.lang.Throwable)

Flux<Integer> stream = Flux.just(4, 5, 6);

stream.doAfterTerminate(() -> System.out.println("Closed!")).subscribe(System.out::println);

Output:
4
5
6
Closed!
  • then/thenMany — подписаться на второго Publisher-а после окончания первого Publisher-а.

Для Mono:

Mono<Integer> mono1 = Mono.just(1);
Mono<Integer> mono2 = Mono.just(2);

mono1.then(mono2)
.subscribe(value -> System.out.println("Value: " + value));

Output:
Value: 2

Для Flux:

Flux<Integer> oddFlux = Flux.just(1, 3, 5);
Flux<Integer> evenFlux = Flux.just(2, 4, 6);

oddFlux.thenMany(evenFlux)
.subscribe(value -> System.out.println("Value: " + value));

Output:
Value: 2
Value: 4
Value: 6
  • duration — интервал.

Мы ожидаем получать данные в течение 3 секунд и только после этого мы обрабатываем события. Этот код ничего не выводит, потому что, когда основной поток выполнения завершается, сбор данных останавливается и программа завершается.

Пример 1:

String resultMono = Mono.just("one")
.delayElement(ofSeconds(3))
.block();

System.out.println(resultMono);

Output:
one

Пример 2:

Integer resultFlux = Flux.range(1, 3)
.delayElements(ofSeconds(3))
.blockLast();

System.out.println(resultFlux);

Output:
3
  • take — берет первые N элементов из потока (если они доступны).
Flux.range(1, 10)
.take(3)
.subscribe(System.out::println);


Output:
1
2
3
  • takeLast — берет последний N элементов из потока (если они доступны).

Пример 1:

Flux.range(1, 10)
.takeLast(2)
.subscribe(System.out::println);

Output:
9
10

Пример 2:

Flux.just(1, 2, 3, 4)
.takeLast(2)
.subscribe(System.out::println);

Output:
3
4
  • takeWhile — берет значения из потока пока не будет выполнено некоторое условие.
Flux.range(1, 10)
.takeWhile(i -> i <= 5)
.subscribe(System.out::println);

Output:
1
2
3
4
5
  • takeUntilOther — публикация значений из потока до тех пор, пока конкретный Publisher не выдаст свои значения.
Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.takeUntilOther(Mono
.just(10) // hot publisher

// задержка '10' на 3 секунды
.delayElement(Duration.ofSeconds(3))
)
.subscribe(System.out::print);

Thread.sleep(5_000);


Output:
12
  • elementAt — получить конкретный элемент в данной позиции индекса или IndexOutOfBoundsException, если последовательность короче чем конкретный элемент в потоке.
Flux.range(1, 10)
.elementAt(2)
.subscribe(System.out::println);

Output:
3
  • last — последний элемент в потоке или ошибка NoSuchElementException если его нет.
Flux.range(1, 10)
.last()
.subscribe(System.out::println);

Output:
10
  • skip — пропускает указанное количество элементов с начала потока, а затем испускает оставшиеся элементыв потоке.
Flux.range(1, 10)
.skip(3)
.subscribe(System.out::println);

Output:
4
5
6
7
8
9
10
  • skipLast — пропускает указанное количество элементов с конца потока и печатает те, что были ДО!
Flux.range(1, 10)
.skipLast(3)
.subscribe(System.out::println);


Output:
1
2
3
4
5
6
7
  • skipUntil — пропускает значения из этого потока, пока предикат не вернет значение true.
Flux.range(1, 10)
.skipUntil(s -> s == 3)
.subscribe(System.out::println);


Output:
3
4
5
6
7
8
9
10
  • error — Mono/Flux заканчивается указанной ошибкой сразу после подписки.

Пример 1:

Flux.error(new RuntimeException())
.subscribe(System.out::println);


Output:
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException
Caused by: java.lang.RuntimeException

Пример 2:

Object result = Flux.error(new RuntimeException())
.onErrorResume(ex -> Mono.just(1))
.blockLast();

System.out.println(result);


Output:
1
  • onErrorReturn — вы можете вернуть значение по умолчанию в случае ошибки.
public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethod() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.onErrorReturn(100)
.subscribe(value -> System.out.println("Value: " + value));
return null;
}


public static void main(String[] args) {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethod());
}

}

Аналог в нереактивном подходе:

try {
return testNumbers(3);
} catch (Throwable error) {
return 100;
}
  • onErorResume — в случае ошибки вы можете восстановить и продолжить работу (выполнить метод резервного копирования.
public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethod() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.onErrorResume(error -> Flux.just(4, 5, 6))
.subscribe(value -> System.out.println("Value: " + value));
return null;
}


public static void main(String[] args) {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethod());
}

}
  • doOnError — оператор doOnError, как и все операторы с префиксом doOn, иногда называют «побочным эффектом». Они позволяют вам заглянуть внутрь последовательности событий, не меняя их. Мы знаем, что метод потерпит неудачу, но мы все еще можем обещать (зарегистрировать) эту ошибку или сделать что-либо по нашему усмотрению.
public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethodRetry() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.doOnError(e -> {
// to do something
System.out.println("Log error");
})
.subscribe(value -> System.out.println("Value: " + value));

return null;

}

public static void main(String[] args) {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodRetry());
}

}
  • block — блокирует текущий поток, пока мы не получим объект (для Mono)

Пример 1:

String result = Mono.just("one")
.block();
System.out.println(result);

Пример 2:

String result2 = Mono.just("one")
.map(String::toUpperCase)
.block();
System.out.println(result2);
  • blockLast — блокирует весь поток, пока он полностью не будет завершен (для Flux)

Пример 1:

Flux<Integer> sequence = Flux.range(0,10)
.publishOn(Schedulers.single()); // onNext, onComplete и onError будут происходить в "single" sheduler.

sequence.subscribe(n -> {
System.out.println("n = " + n);
System.out.println("Thread.currentThread() = " + Thread.currentThread());
});

sequence.blockLast();


Output: будет отображен 10 раз

n = 0
Thread.currentThread() = Thread[single-1,5,main]

Пример 2:

Integer resultFlux = Flux.range(1, 5)
.blockLast();

System.out.println(resultFlux);

Output:
5
  • blockFirst — испускает первый элемент и блокирует поток.
Integer resultFlux = Flux.range(1, 3)
.delayElements(ofSeconds(2))
.blockFirst();

System.out.println(resultFlux);

Output:
1
  • subscribeOn — вы можете изменить поток выполнения (принимает Scheduler в качестве аргумента и позволяет изменить контекст выполнения на указанный Scheduler). Он работает сразу во всей цепочке подписчиков, то есть все остальные подписчики будут выполняться в контексте нового указанного Scheduler-а.
  • publishOn — отличие от subscribeOn в том, что его можно использовать в середине цепочки вызовов, и все остальные подписчики будут выполняться в контексте указанного Scheduler-а (в оригинале, который был указан)
Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
.doOnNext(consumer)
.map(i -> {
System.out.println("Inside map the thread is " + Thread.currentThread().getName());
return i * 10;
})

.publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
.doOnNext(consumer)
.subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
.subscribe();

Output:
1 : subscribeOn_thread-3
Inside map the thread is subscribeOn_thread-3
2 : subscribeOn_thread-3
Inside map the thread is subscribeOn_thread-3
3 : subscribeOn_thread-3
Inside map the thread is subscribeOn_thread-3
10 : First_PublishOn()_thread-4
4 : subscribeOn_thread-3
Inside map the thread is subscribeOn_thread-3
20 : First_PublishOn()_thread-4
5 : subscribeOn_thread-3
Inside map the thread is subscribeOn_thread-3
30 : First_PublishOn()_thread-4
40 : First_PublishOn()_thread-4
50 : First_PublishOn()_thread-4
  • parallelFlux — Чтобы получить ParallelFlux, вы можете использовать оператор parallel() на любом Flux. Сам по себе этот метод не распараллеливает работу. Скорее, он делит рабочую нагрузку на «рельсы» (по умолчанию столько рельсов, сколько имеется ядер ЦП).

Пример 1: здесь мы явно указали 2 процесса, вместо того, чтобы позволить самой программе определять, сколько им нужно, в зависимости от ядер процессора

Flux.range(1, 10)
.parallel(2)
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

Пример 2: здесь исполнение в два потока

Flux.range(1, 10)
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
  • connectableFlux — когда нужно отложить обработку какого-то значения до времени подписки одного из подписчиков.
    Также, когда необходимо подождать до того момента, пока не подпишутся все подписчики.
    В этом случае нам поможет ConnectableFlux.

В Flux API представлены два основных шаблона, которые возвращают ConnectableFlux:
- publish
- replay

ConnectableFlux предоставляет дополнительные методы для управления подписками:

- connect() — запускает подписку вручную когда подписалось N-количество подписчиков

ConnectableFlux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("Subscribed"))
.publish();
source.subscribe(System.out::println); // 1-ый подписчик
source.subscribe(x -> System.out.println(x + "_new")); // 2-ой подписчик
System.out.println("Ready");
Thread.sleep(500);
System.out.println("Connection");
source.connect(); // стартуемВывод:Ready
Connection
Subscribed
1
1_new
2
2_new
3
3_new

- autoConnect(n) — запускает подписку автоматически после N-количества подписчиков

Flux<Integer> source2 = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("Subscribed"));
Flux<Integer> autoConnect = source2.publish().autoConnect(2); // как только подписалось 2 подписчика - стартуем !autoConnect.subscribe(System.out::println);
System.out.println("first subscriber"); // 1-ый подписчик
Thread.sleep(3000);
System.out.println("second subscriber"); // 2-ой подписчик
autoConnect.subscribe(System.out::println);
Вывод:first subscriber
second subscriber
Subscribed
1
1
2
2
3
3

- refCount(n) — позволяет автоматически отслеживает входящие подписки и также определяет, когда эти подписки отменяются

Обработка ошибок

  • onErrorReturn

Вы можете вернуть значение по умолчанию в случае ошибки выполнения метода

public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethod() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.onErrorReturn(100)
.subscribe(value -> System.out.println("Value: " + value));
return null;
}


public static void main(String[] args) throws InterruptedException {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethod());
}

}

аналог

try {
return testNumbers(3);
} catch (Throwable error) {
return 100;
}
  • onErrorResume

Кроме того, в случае ошибки вы можете восстановить и продолжить работу (выполнить метод резервного копирования)

public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethod() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.onErrorResume(error -> Flux.just(4, 5, 6))
.subscribe(value -> System.out.println("Value: " + value));
return null;
}


public static void main(String[] args) throws InterruptedException {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethod());
}

}
  • doOnError

Оператор doOnError, как и все операторы с префиксом doOn, иногда называют «побочным эффектом». Они позволяют вам заглянуть внутрь последовательности событий, не меняя их. Мы знаем, что метод потерпит неудачу, но мы все еще можем обещать (зарегистрировать) эту ошибку или сделать что-либо по нашему усмотрению.

public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethodRetry() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.doOnError(e -> {
// to do something
System.out.println("Log error");
})
.subscribe(value -> System.out.println("Value: " + value));

return null;

}

public static void main(String[] args) throws InterruptedException {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodRetry());
}

}
  • doFinally

Аналог try-catch-finally

public class TestWork {

private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}

private Flux<Integer> checkOnErrorMethodTryCatchFinally() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.doFinally(signalType -> {
if (signalType == SignalType.ON_ERROR) {
System.out.println("Error signal");
} else if (signalType == SignalType.CANCEL) {
System.out.println("Cancel signal");
}
})
.subscribe(value -> System.out.println("Value: " + value),
error -> System.out.println("Error: " +
error.getMessage()));
return null;
}

public static void main(String[] args) throws InterruptedException {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodTryCatchFinally());
}

}
  • retry

Если произошла ошибка, вы можете попробовать еще раз.

Это аналог repeat, но используется если вы хотите повторить выполнение при возникновении ошибки.

public class TestWork {    private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}
private Flux<Integer> checkOnErrorMethodRetry() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.retry(2)
.subscribe(value -> System.out.println("Value: " + value),
error -> System.out.println("Error: " + error.getMessage()));
return null; }public static void main(String[] args) {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodRetry());
}
}

Мы также можем использовать оператор retryWhen. Эквивалент повторения (N).

Здесь мы можем указать количество попыток. Сначала мы вызываем ошибку и печатаем ее (используя doOnError), а затем пытаемся выполнить 3 раза (и печатаем каждую попытку вызова)

Flux<String> flux = Flux
.<String>error(new IllegalArgumentException())
.doOnError(System.out::println)
.retryWhen(companion -> companion.take(3));
Output:
java.lang.IllegalArgumentException
java.lang.IllegalArgumentException
java.lang.IllegalArgumentException
java.lang.IllegalArgumentException

В случае ошибки вы можете выбросить собственное исключение

public class TestWork {    private int testNumbers(int value) {
if (value > 2) {
throw new IllegalArgumentException("value is too high!");
}
return value;
}
private Flux<Integer> checkOnErrorMethodMyException() {
Flux.just(1, 2, 3)
.map(this::testNumbers)
.onErrorMap(error -> new SQLException("this is SQLException"))
.subscribe(value -> System.out.println("Value: " + value),
error -> System.out.println("Error: " + error.getClass().getSimpleName() + ". Message: " + error.getMessage()));
return null;
}
public static void main(String[] args) throws InterruptedException {
TestWork testWork = new TestWork();
System.out.println(testWork.checkOnErrorMethodMyException());
}
}Output:
Value: 1
Value: 2
Error: SQLException. Message: this is SQLException

Subscribers

Мы также можем сэмулировать ситуацию 3 разных подписчиков на наш поток данных

Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber 1: " + d));
source.subscribe(d -> System.out.println("Subscriber 2: " + d));
source.subscribe(d -> System.out.println("Subscriber 3: " + d));

Процесс выполняется независимо от того, когда подписки были присоединены.
Поэтому может быть такая ситуация, при которой данные будут выполняться не последовательно.

В результате получим:

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE
Subscriber 3: BLUE
Subscriber 3: GREEN
Subscriber 3: ORANGE
Subscriber 3: PURPLE

Преобразуем тип User в String

Flux.just(new User("name1", 20), new User("name2", 25), new User("name3", 30))
.map(User::getName)
.subscribe(System.out::println);

Разделение элементов

Когда у вас есть большое количество элементов и вы хотите их разделить, вы можете это сделать с помощью:
- grouping
- windowing
- buffering

Я не стал переводить эти термины на русский.

a) grouping — это процесс разделения источника Flux<T> на несколько пакетов по ключу.
Каждая группа представлена ​​как GroupedFlux<T>, что позволяет получить ключ через метод key().

b) windowing — это процесс разделения источника Flux<T> на “окна” по критериям размера, времени, предикатов.
Использует операторы: windowTimeout, windowUntil, windowWhile, и windowWhen.
В большинстве случаев окна открываются последовательно, в отличие от группировки.

c) buffering — Когда соответствующий оператор управления окнами открывает “окно”, оператор буферизации создает новую коллекцию и начинает добавлять в нее элементы (Collection<T> — по умолчанию List<T>).
Использует операторы: bufferTimeout, bufferUntil, bufferWhile, и bufferWhen.
Буферизация также может привести к удалению исходных элементов или наличию перекрывающихся буферов.

Продолжение в третьей статье :)

Если вы нашли неточности в описании данной статьи, вы можете написать мне на email и я с радостью вам отвечу.

Kirill Sereda

email: kirill.serada@gmail.com

skype: kirill-sereda

linkedin: www.linkedin.com/in/ksereda

--

--