Managing Back Pressure in reactive Streams

Vikas Taank
3 min readJan 18, 2024

--

We talked about two ways a Reactive Steream can be produced. A Mono or a Flux. There are several ways these streams can be consumed.

In an traditional System , the publisher pushes the events to the subscriber and subscriber keeps processing.

What if the subscriber is slow , there could be a build up of events with Subscriber is left with to many events.

This Back pressure can be managed in Reactive streams

There are ways this can be done using various strategies.

1. onBackpressureDrop Strategy

2. onBackpressureLatest Strategy

3. onBackpressureBuffer Strategy

onBackpressureDrop Strategy:

Imagine we have a Flux emitting integers from 1 till 100 .

  • We use doOnNext to log when a data element is produced.
  • The onBackpressureDrop method is used to drop elements if they cannot be processed fast enough
  • The example demonstrates how back pressure can prevent a fast producer from overwhelming a slow consumer. When the consumer can’t keep up, the onBackpressureDrop strategy drops elements and logs which ones were dropped.
  • The subscriber simulates a slow consumer by sleeping for 10ms for each element.
  • We use subscribeOn with a bounded elastic scheduler to run subscription tasks on a separate thread pool.
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

public class BackPressureOnDropExample {

public static void main(String[] args) throws InterruptedException {
Flux<Integer> flux = Flux.range(1, 100)
.delayElements(Duration.ofMillis(1))
.doOnNext(i -> System.out.println("Produced: " + i))
.onBackpressureDrop(item -> System.out.println("Dropped: " + item));

flux.subscribeOn(Schedulers.boundedElastic())
.subscribe(
data -> {
try {
// Simulating a task that takes 10ms
Thread.sleep(10);
System.out.println("Consumed: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
err -> System.err.println("Error: " + err),
() -> System.out.println("Completed")
);

// Keep the program running long enough to see the output
Thread.sleep(5000);
}
}

onBackpressureBuffer Strategy

In this strategy, the buffer can hold up to 10 elements. If the buffer overflows, the oldest elements are dropped, and a message is printed for each dropped element.

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

public class BackPressureBufferExample {

public static void main(String[] args) throws InterruptedException {
Flux<Integer> flux = Flux.range(1, 100)
.delayElements(Duration.ofMillis(1))
.doOnNext(i -> System.out.println("Produced: " + i))
.onBackpressureBuffer(10, dropped -> System.out.println("Buffered, dropped: " + dropped));

flux.subscribeOn(Schedulers.boundedElastic())
.subscribe(
data -> {
try {
// Simulating a task that takes 10ms
Thread.sleep(10);
System.out.println("Consumed: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
err -> System.err.println("Error: " + err),
() -> System.out.println("Completed")
);

// Keep the program running long enough to see the output
Thread.sleep(5000);
}
}

onBackpressureLatest Strategy

onBackpressureLatest ensures that if the subscriber can't keep up, it will only get the most recent value emitted by the producer, discarding any previous values that have not been processed yet.

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

public class BackPressureLatestExample {

public static void main(String[] args) throws InterruptedException {
Flux<Integer> flux = Flux.range(1, 100)
.delayElements(Duration.ofMillis(1))
.doOnNext(i -> System.out.println("Produced: " + i))
.onBackpressureLatest();

flux.subscribeOn(Schedulers.boundedElastic())
.subscribe(
data -> {
try {
// Simulating a task that takes 10ms
Thread.sleep(10);
System.out.println("Consumed: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
err -> System.err.println("Error: " + err),
() -> System.out.println("Completed")
);

// Keep the program running long enough to see the output
Thread.sleep(5000);
}
}

I hope you would have enjoyed learning about these strategies , Please share and clap if you liked the content.

--

--

Vikas Taank

I am new to Medium, trying to articulate my learnings so far . Please Join medium to read my articles. Please support- https://ko-fi.com/vikastaank