RxJava Backpressure

Backpressure in RxJava comes into picture when you have an observable which emits items so fast that consumer can’t keep up with the flow leading to the existence of emitted but unconsumed items.

How unconsumed items, which are emitted by observables but not consumed by subscribers, are managed and controlled is what backpressure strategy deals with.

Since it requires system resources to handle backpressure, you need to choose right backpressure strategy that suits your requirement. In this post, I’ll explain the backpressure strategies which you can use with RxJava.

Reducing Number of Items

If your application cares about only latest items at a given point of time and doesn’t need all the items emitted by an observable then applying this backpressure strategy of reducing number of items is the right choice, which can be implemented using RxJava operators.

For example, if you want latest item from observable every time after certain amount of time expires, you can use sample operator. It ignores previously emitted items and returns latest one at that point of time.

 Observable<Integer> observable = Observable.range(1, 50000).sample(1, TimeUnit.NANOSECONDS);
observable.subscribe(s -> { LOGGER.info(“value after every 1 nano secs “+s);});

If you want only first few or last few items, you can use take or takeLast operators respectively.

Observable<Integer> observable = Observable.range(1, 200000).take(15);
observable.subscribe(s -> { LOGGER.info(“take first 15 “+s);});

Here are some other RxJava operators which can be used to reduce the number of items emitted by source observable: filter, first, last, debounce, skip and throttleFirst etc.

Collecting Items

In this strategy of controlling backpressure, items are not reduced or dropped, but items are collected and emitted as a collection using RxJava operators. This way consumer receives collection of items which are emitted by source observable and it is up to consumer to decide which item to process out of the collection.

For example, you can use window operator on source observable, which emits a collection with specified number of items in it.

Observable<Observable<Integer>> observable = Observable.range(1, 133).window(3);
observable.subscribe(s -> { LOGGER.info(“next window “);
s.subscribe(i -> LOGGER.info(“items in window “+i));});

Other RxJava operator that can be used for this backpressure strategy is buffer.

Reactive Pull

Above two strategies manage backpressure by dropping items, if your application need to process all the items emitted by source observable then you can’t use those strategies.

In this scenario, using reactive pull strategy is the right choice. In reactive pull, subscriber requests required number of items from observable by calling request().

In RxJava2, Flowable needs to be used to utilize reactive pull backpressure as observable in RxJava2 is non-backpressured.

 Flowable<Integer> observable = Flowable.range(1, 133);
observable.subscribe(new DefaultSubscriber<Integer>() {
@Override public void onStart() {
request(1);
}
@Override public void onNext(Integer t) {
LOGGER.info(“item “+t);
request(1);
}
@Override public void onError(Throwable t) {
LOGGER.info(“”+t);
}
@Override public void onComplete() {
LOGGER.info(“complete”);
}
});

One requirement for reactive pull backpressure to work is that observable should support reactive pull and respond to request() call as expected. For observables which don’t support reactive pull, you can use onBackpressureBuffer, and onBackpressureDrop operators on source observable.

Operator onBackpressureBuffer keeps items emitted by source observable in buffer and respond to request() calls from subscriber.

Reference

https://github.com/ReactiveX/RxJava/wiki
http://www.zoftino.com/rxjava-examples
http://www.zoftino.com/rxjava-operators-part-2