
Bridging CompleteableFutures and RxJava
Asynchronous computations can be a tough to wrap your head around. We often think of programming as a series of steps, kind of like this:
var data = fetchData("http://someweb.api.com")
doTaskA(data)
doTaskB()In this example, each step is executed only after the previous line of code has completed. But suppose we don’t want to wait for long running tasks to complete. With asynchronous (hereafter called async) programming, we start the execution of a command and register a callback to be run when the command completes. For example, the call to fetchData may take some time to complete. In Java, we can use a CompleteableFuture to handle the result. This allows other code to be executed instead of waiting around.
CompleteableFuture.supplyAsync(() ->
fetchData("http://someweb.api.com"))
.thenAccept(data -> doTaskA(data))// Task B might be completed before Task A.
// Maybe.
// But we don't have to care.doTaskB()
Why write Asynchrous Code at all?
In today’s programming world, there’s a move towards distributed programming, microservices, and web API’s. Calling a web service somewhere on the internet and waiting for it to respond with the information you requested can take time. Sometimes millisecs. Sometimes much, much longer. It makes for a truly awful user experience to have an application wait, or hang, waiting for the service to respond. And many apps need to call multiple services, or the service they call may be calling multiple services. And bad things happen on the internet, servers go down, etc. Your users will trash your application in on-line reviews if it appears unresponsive. There’s already an excellent guide to using Java’s CompleteableFuture at Guide To CompletableFuture. Instead of rehashing that, I’ll talk about bridging futures with reactive observables.
Why rxJava?
I find using the Observer pattern easier to read and reason through than Java’s Futures. Especially when you start nesting or chaining them. Here’s an example, suppose I need to run a whole bunch of identical calls in parallel and collect the results of each? Using CompleteableFuture, I could create a method that runs all the futures and collects the results, like so:
public static <T, R> CompletableFuture<Collection<R>>
collectAll(Collection<T> items,
Function<T, CompletableFuture<R>> fn) { // Storage for data as futures complete
CopyOnWriteArrayList<R> returnValues =
new CopyOnWriteArrayList<>();
CompletableFuture[] futures = items.stream()
.map(fn)
.map(r -> r.thenAccept(returnValues::add))
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futures)
.thenApply(v -> returnValues);
}
Then I would call it like so:
var users = // Some list of users
var userLookup = user -> lookupUserFromWeb(user)
collectAll(users, userLookup)
.thenAccept(userInfo -> System.out.println("All done"))But what happens if one of the calls throws an error? Also, this only returns the collection when all the futures complete. That might not be what you really want. This is where an Observer may be handy, it can pass on the results as each future completes rather than waiting for all to complete AND it can signal an error if one occurs.
CompleteableFuture to rxJava
The first thing we need is a method to convert a CompleteableFuture to an Observer. That’s pretty simple:
public static <T> Observable<T>
observe(CompletableFuture<T> future) { return Observable.fromFuture(future);
}
This takes a future and returns an Observable that will eventually (we think) emit a value. And we can subscribe to the observable:
var future = CompleteableFuture.supplyAsync(() -> someTask());
var observable = observe(future)
observable.subscribe(v -> System.out.println("Got " + v)
e -> System.err.println("Exception " + e),
() -> System.out.println("All done");Running Many Future’s … But Only one Observable
Now we can get fancy. What if we want to run a bunch of futures in parallel like our earlier example? Here’s a method that takes a collection of items and a function that will convert an item to a CompleteableFuture, then returns an Observable that will emit results as each item is processed asynchronously.
public static <T, R> Observable<R>
observeAll(Collection<T> items,
Function<T, CompletableFuture<R>> fn) {
// Apply the function to each item
List<CompletableFuture<R>> futures = items.stream()
.map(fn)
.collect(Collectors.toList());
// Return a cold observable. Otherwise the observable may
// emit it's values before you subscribe. Not usually what
// you want.
return Observable.defer(() -> {
List<Observable<R>> observables = futures.stream()
.map(Observable::fromFuture)
.collect(Collectors.toList());
return Observable.concat(observables);
});
}Now we can process multiple items asynchronously, handle the results as each future completes and get a signal if an error occurs. Here’s an example:
var users = // Some collection of users// A function that looks up a user from the web
var userLookup = user -> CompleteableFuture.supplyAsync(() ->
lookupUserFromWeb(user))var observable = observerAll(users, userLookup)observable.subscribe(v -> System.out.println("Got " + v)
e -> System.err.println("Exception " + e),
() -> System.out.println("All done");
