Tackling backpressured REST paging with RxJava 2
REST APIs are a standard pretty much everywhere nowadays and it’s obvious that endpoints returning big data sets have to be paged, so that client is able to process reasonable amount of data in timely manner, as needed.
The other day I was happily coding one of my smalls apps with a cup of warm tea in my hand. I suddenly run into an issue on how to marry together RxJava and paged RESTful APIs. I spent some time looking at various solutions on the Internet but could not find anything that would satisfy my needs - simplicity and wise backpressure handling. The latter would guarantee that a next page is queried from the server only when a downstream subscriber requests it.
In the end I decided to come up with another solution and would like to share my thoughts and the outcome.
Generating pages on Subscriber’s demand
The first task is to make sure that we are querying the server for a next page only when the Subscriber requests it. For this purpose, we are to going use
the Flowable.generate() method.
public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>, S> generator)
Let’s walk through how the function works on a simple example:
Once a Subscriber subscribes to a stream emitted by the statefulIntegers() function, the generator will be called only when requested. So, in example, if the downstream requests 2 items by calling request(2) method, the BiFunction instance will be called twice. In the first call index value will be equal to 0 (as provided by the initialState Callable) and value of 1 will be provided to the second call - as the state is passed between the calls. What’s also cool is that the mechanism prevents us from making simple mistakes - attempt to call the onNext() function twice in a single call will immediately throw an IllegalStateException.
For the example purposes, let’s define a PagedResponse class representing a single page of data coming from a hypothetical server.
The values field holds our data, nextPageStart indicates an offset to pass to a RESTful endpoint to obtain the next page and isLastPage indicates that there are no more pages to process - no more data is available on the server.
The final solution
Time to marry all the things together. A function creating a paged data stream looks as below.
Now it looks way better! :)
To bypass the need of creating SAM Interfaces instances with explicit type definition, we will need to create a simple utility class presented below. Unfortunately, this is needed as Flowable.generate() is a static factory method which cannot be extended and overloaded in Kotlin.
Let’s assume there is a hypothetical endpoint providing paged Strings.
A sample block of code, requesting exactly up to 2 pages from the server would look as below.
Thanks for reading! I hope you find the article useful. Let me know in the comments section below.