RxJava2: Sync / Async APIs to Reactive
1. Overview
Today’s article is about converting synchronous or asynchronous, non-reactive API to reactive API using RxJava2. We will first demonstrate the basic flow from one reactive to another reactive API, showing how a stream can be decomposed across classes or methods. Then, we will show more complex transformations from async / sync to reactive.
Reactive programming is asynchronous in nature, so the complexity for transforming a non-reactive API to reactive depends on the nature of the source API. Basically, the transformation logic shall accept a source API type and return a reactive main type, either of Observable, Flowable, Completable, Single and Maybe (referred to as Observables).
A reactive API is built with one of 3 different sources. From either:
- Another reactive API
- An asynchronous API of various origin
- A synchronous API
Now, let’s dive in the crux of the matter.
2. Maven dependencies
For our examples, we need the rx-java2 libraries as well as the Vert.x core and Vert.x rxJava2 extensions:
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.14</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>3.5.2</version>
</dependency>
</dependencies>
Always check for the latest version on Maven Central
3. Use Case and Examples
We will implement the same use case for all examples:
- Loading a list of Members from a text file into a Buffer or a String
- Transforming the buffer to a String and to a JsonArray
- Morphing the JsonArray to a Flowable<Member> API
3.1. Simple case: from Reactive to… Reactive
Vert.x is a library that implements RxJava2 types, so this case is relatively easy since we’ll use Vert.x rx…() methods that return Observables matching the types of our target API. Let’s start by returning a Single<JsonArray> that contains the whole list of members.
Note: Double check that your IDE imports the reactivex.core Vertx and FileSystem versions:
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.file.FileSystem;
This method returns a Single<JsonArray> that contains the list of Members:
Single<JsonArray> getMembersAsArray(String filepath) {
Vertx vertx = Vertx.vertx();
FileSystem fileSystem = vertx.fileSystem();
return fileSystem.rxReadFile(filepath)
.map(buffer -> buffer.toString())
.map(content -> new JsonArray(content));
}
From another method (or class), we call getMemberAsArray() that returns our Single<JsonArray> which we transform and return as a Flowable<Member>:
Flowable<Member> getMemberArrayAsFlowable(String filepath) {
return getMembersAsArray(filepath)
.flatMapPublisher(array -> Flowable.fromIterable(array))
.cast(JsonObject.class)
.map(json -> json.mapTo(Member.class));
}
Of course, we could have gone from the rxReadFile() input to Flowable<Member> all at once:
Flowable<Member> getMembersAsFlowable(String filepath) {
Vertx vertx = Vertx.vertx();
FileSystem fileSystem = vertx.fileSystem();
return fileSystem.rxReadFile(filepath)
.map(buffer -> buffer.toString())
.map(content -> new JsonArray(content))
.flatMapPublisher(array -> Flowable.fromIterable(array))
.cast(JsonObject.class)
.map(json -> json.mapTo(Member.class));
}
Remember that all intermediary operators implement the Observer interface, so they accept any Observers as input from upstream.
In our example, the rxReadFile() returns a Single<Buffer> that the subsequent map transforms into a Single<String>. That large String, which actually is a literal array of members, is then transformed into a formal JsonArray that is pushed downstream as a Single<JsonArray>. In turn, the flatMapPublisher()accepts the array and pushes each element downstream as a Flowable<String>. The subsequent cast and map operators transform each string to a Flowable<Member> object.
Interestingly, Array being a synchronous object class, we use the fromIterable() operator to synchronously extract each array’s element and push them, one by one, on the asynchronous stream (see below). Also, we must consider that the source API is also asynchronous. We’ll see in the next example how to handle this case.
3.2. From Asynchronous to Reactive
In this section, we will use the JDK’s AsynchronousFileChannel as a Java async implementation that does not directly support reactive types.
Since there is no specific Observables type for every possible async input, we create an adapter that converts a non-reactive async API to reactive. Such adapter must accept the input from an async source and push each element through the stream of operators (Observer) until the final Subscriber. The push entry point is an Emitter<T> injected in the Observer implementation through the Observables.create(emitter -> {…}) static method argument.
In our example, we return a Single, so we need to implement the SingleObserver<T> onSuccess and onError methods to emit on. First, we asynchronously load the member list content in a Single<ByteBuffer> using the AsynchronousFileChannel. Then, a similar operators chain as in our previous example transforms the Single<ByteBuffer> to a Flowable<Member>:
Now let’s rattle the keyboard:
public Flowable<Member> getMemberListAsyncAsFlowable(String filepath){
File file = new File(filepath); return Single.<ByteBuffer> create(emitter -> {
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(file.toPath());
ByteBuffer buffer = ByteBuffer.allocate((int) file.length()); channel.read(buffer, 0, null,
new CompletionHandler<Integer, Void>() { @Override
public void completed(Integer result, Void attachment) {
try {
channel.close();
} catch (IOException e) {
emitter.onError(e);
return;
}
emitter.onSuccess(buffer);
} @Override
public void failed(Throwable error, Void attachment) {
try {
channel.close();
} catch (IOException e) {
// ignore
}
emitter.onError(error);
}
});
})
.map(buffer ->
new String(buffer.array(), StandardCharsets.UTF_8))
.map(JsonArray::new)
.flatMapPublisher(Flowable::fromIterable)
.cast(JsonObject.class)
.map(json -> json.mapTo(Member.class));
}
The interesting part of this example (and the previous) lies in the calling site where we subscribe to this Flowable. Let’s look at the code below:
private AsyncToRx sample = new AsyncToRx();@Test
public void givenAsyncFileLoad_whenStreamedToFlowable_thenWaitForCompletion()
throws InterruptedException { this.sample
.getMemberListAsyncAsFlowable("filepath...")
.subscribe(member ->
System.out::println,
Throwable::printStackTrace); Thread.sleep(2000);
}
The Thread.sleep(2000) blocks the calling thread for two seconds because the AsynchronousFileChannel is executing asynchronously. As soon as subscribe() is called, it returns immediately, the calling thread keeps moving to the end of the method and exits before the response is returned. Blocking the thread for two seconds is enough for the work to complete and call the CompletionHandler with the response.
Some clever reader will ask why the subscribe() returns immediately. Well, reactive streams are intrinsically asynchronous. An async function never blocks, waiting for a response. Instead, it provides a callback function that the callee will use to return the response when done.
To check it out, remove the sleep step and run the function: it’ll complete normally without any output at all. Also, to better demonstrate the asynchrony and achieve the same result more efficiently, we use a CountDownLatch that awaits for the latch.countdown before exiting the function:
@Test
public void givenAsyncFileLoad_whenStreamedToFlowable_thenLatchOnCompletion()
throws InterruptedException { CountDownLatch latch = new CountDownLatch(1);
this.sample
.getMemberListAsyncAsFlowable("filepath...")
.subscribe(member ->
System.out::println,
Throwable::printStackTrace,
() -> latch.countDown());
latch.await();
}
3.3. From Synchronous to Reactive
This case is also a tad complex as again, we build an adapter to implement the reactive API from scratch.
The good news is that all reactive main types support some static “from…()” methods that accept synchronous input and return an instance of their type. We have already seen this in action when we used Flowable.fromIterable(). However, there are many others such as fromCallable(), fromArray(), etc. Each Observables type supports it’s own set of “from…()” so refer to the RxJava2 specific type’s Javadoc for details.
Luckily, for this example, we’ll need the fromIterable method we already know. To read our members.json file synchronously, we’ll use Apache commons-io FileUtils class.
Add the latest version of this dependency to the pom.xml file:
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
Alright now, let’s code this:
Flowable<Member> getMemberListAsFlowable(String filename) {
return Single.<String>create(emitter -> {
try {
ClassLoader classldr = getClass().getClassLoader();
File file = new File(classldr
.getResource(filename)
.getFile());
String memberList = FileUtils
.readFileToString(file, StandardCharsets.UTF_8);
emitter.onSuccess(memberList);
} catch (Exception e) {
emitter.onError(e);
}
})
.map(list -> new JsonArray(list))
.flatMapPublisher(array -> Flowable.fromIterable(array))
.cast(JsonObject.class)
.map(json -> json.mapTo(Member.class));
}
We return a Single<String> populated by the synchronous loading of our members.json file. As in the previous example, we call onSuccess() passing the received memberList as a String. This Single<String> is then fed to a similar operator chain as previous examples that transforms the memberList String into a Flowable<Member> reactive API.
Also, be aware that the synchronous code should be encapsulated in the Single<.String>create(emitter -> {…}) block. Doing so runs the blocking method when subscribed to and on the subscriber’s thread. Otherwise, the calling function will assume the execution cost of the heavy synchronous work. This often seen anti-pattern looks like:
Flowable<Member> getMemberListAsFlowableWithAntiPattern(String filename) {
String memberList;
try {memberList = ... read file synchronously... } catch {...}
return Single.<String>just(memberList).map...
}
4. Conclusion
Even if RxJava2 is an extensive internal rewrite of RxJava, great efforts have been made to keep the original API intact as much as possible. The changes, however, are real improvements over the previous versions (see What’s new in RxJava2).
One of the most significant innovation is the removal of backpressure support from Observable and the advent of Flowable. This new kid on the block is an Observable evolution that implements better support for backpressure. In the RxJava library, using the Observable.create() constructor forced backpressure handling to be manually implemented, leading to many inconsistencies and bugs. Moreover, it made bridging sync / async APIs much more complex than what we just did.
This post presented many approaches to bridge various API types to reactive using RxJava2. Upcoming posts will introduce the schedulers, testing helpers and backpressure so stay tuned. In the meantime, you can check out the examples provided in this article on Github.