Managing Back Pressure in reactive Streams
We talked about two ways a Reactive Steream can be produced. A Mono or a Flux. There are several ways these streams can be consumed.
In an traditional System , the publisher pushes the events to the subscriber and subscriber keeps processing.
What if the subscriber is slow , there could be a build up of events with Subscriber is left with to many events.
This Back pressure can be managed in Reactive streams
There are ways this can be done using various strategies.
1. onBackpressureDrop Strategy
2. onBackpressureLatest Strategy
3. onBackpressureBuffer Strategy
onBackpressureDrop Strategy:
Imagine we have a Flux emitting integers from 1 till 100 .
- We use
doOnNext
to log when a data element is produced. - The
onBackpressureDrop
method is used to drop elements if they cannot be processed fast enough - The example demonstrates how back pressure can prevent a fast producer from overwhelming a slow consumer. When the consumer can’t keep up, the
onBackpressureDrop
strategy drops elements and logs which ones were dropped. - The subscriber simulates a slow consumer by sleeping for 10ms for each element.
- We use
subscribeOn
with a bounded elastic scheduler to run subscription tasks on a separate thread pool.
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class BackPressureOnDropExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> flux = Flux.range(1, 100)
.delayElements(Duration.ofMillis(1))
.doOnNext(i -> System.out.println("Produced: " + i))
.onBackpressureDrop(item -> System.out.println("Dropped: " + item));
flux.subscribeOn(Schedulers.boundedElastic())
.subscribe(
data -> {
try {
// Simulating a task that takes 10ms
Thread.sleep(10);
System.out.println("Consumed: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
err -> System.err.println("Error: " + err),
() -> System.out.println("Completed")
);
// Keep the program running long enough to see the output
Thread.sleep(5000);
}
}
onBackpressureBuffer Strategy
In this strategy, the buffer can hold up to 10 elements. If the buffer overflows, the oldest elements are dropped, and a message is printed for each dropped element.
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class BackPressureBufferExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> flux = Flux.range(1, 100)
.delayElements(Duration.ofMillis(1))
.doOnNext(i -> System.out.println("Produced: " + i))
.onBackpressureBuffer(10, dropped -> System.out.println("Buffered, dropped: " + dropped));
flux.subscribeOn(Schedulers.boundedElastic())
.subscribe(
data -> {
try {
// Simulating a task that takes 10ms
Thread.sleep(10);
System.out.println("Consumed: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
err -> System.err.println("Error: " + err),
() -> System.out.println("Completed")
);
// Keep the program running long enough to see the output
Thread.sleep(5000);
}
}
onBackpressureLatest Strategy
onBackpressureLatest
ensures that if the subscriber can't keep up, it will only get the most recent value emitted by the producer, discarding any previous values that have not been processed yet.
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class BackPressureLatestExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> flux = Flux.range(1, 100)
.delayElements(Duration.ofMillis(1))
.doOnNext(i -> System.out.println("Produced: " + i))
.onBackpressureLatest();
flux.subscribeOn(Schedulers.boundedElastic())
.subscribe(
data -> {
try {
// Simulating a task that takes 10ms
Thread.sleep(10);
System.out.println("Consumed: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
err -> System.err.println("Error: " + err),
() -> System.out.println("Completed")
);
// Keep the program running long enough to see the output
Thread.sleep(5000);
}
}
I hope you would have enjoyed learning about these strategies , Please share and clap if you liked the content.