Reactive Programming — RxJava 101

What is Reactive Programming

Reactive Programming is a programming paradigm which involves in composing and consuming a stream of values.

Why we need Reactive Programming

Nowadays, modern web and mobile applications often require a responsive system which quickly reacts to its users in order to provide the most positive user experience. Think of Google Sheets Web/Mobile app as an example; in fact, each time a cell value get updated, all other cells related to it can be updated simultaneously as well.

Reactive Programing is based on the principle of pushing, rather than pulling the data; meanwhile, it can perform workloads in parallel, rather than serial. Reactive Programing also has a toolbox of features that can help developers to construct/transform and chain their stream of data in any possible way. This programming paradigm is indeed proven to be very efficient in speed and performance while providing flexibility to developers.

What is RxJava

RxJava is a Java implementation of Reactive Programming and follows the Observer pattern. It mainly consists of:

  • Observable object which will emit the data or notify certain events.
  • Observer or Subscriber object which subscribes and then listens to the data stream or event notification from the Observable object.

In order to understand the fundamental of RxJava, let walk through a simple example of it. First, we need to create an Observable object by using the “create” operator. The Observable object will emit a stream of String values via onNext() method, then notify the subscriber when it’s done with onCompleted().

Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("I'm Batman");
subscriber.onNext("Superman is coming");
subscriber.onCompleted(); //Notify subscriber this is end of the stream
}
});

Next, let’s have a Subscriber to listen to all these events

Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
//This get called when the observable object get done with its streaming
}

@Override
public void onError(Throwable e) {
//This get called when an error occurs and the whole stream will stop.
}

@Override
public void onNext(String s) {
System.out.println(s);
}
};
stringObservable.subscribe(subscriber);

And that’s it, it will print “I’m Batman”, then “Superman is coming” right when the Observable calls subscribe(subscriber). In fact, we often don’t care about the event of stream completion or error; thus, we can simplify this a bit by not having a Subscriber, but rather using an Action1 object.

stringObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});

RetroLambda

Do we need RetroLambda for RxJava? No, but it will be very nice to have it since it helps to remove a lot of unnecessary code, and makes our code look better/cleaner/nicer. With the simple example above, we can rewrite it with RetroLambda

stringObservable.subscribe(s -> {System.out.println(s);});

Or perhaps, we can also take the advantage of Method Reference:

stringObservable.subscribe(System.out::println);

You can find more information about RetroLambda at https://github.com/orfjackal/retrolambda

Operators

Up till now, we have been discussing about emitting and receiving data, but what about modifying or transforming these data values before they ultimately got delivered to the Subscriber. This is where RxJava operators fit perfectly in the whole big picture. In fact, RxJava offers a wide range of operators, but we would rather focus on these two: map() and flatmap().

map(): is used to transform one value to another value regardless of type. For example, the Observable object below will emit a stream of values, which are “Batman”, “Superman”, “Wonder Woman” (Type String) and the map operator will map each value to its character length (Type Integer); then, these integer values will be pushed to the Subscriber to consume.

Observable.just("Batman", "Superman", "Wonder Woman")
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.length();
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Number of characters = " + integer);
}
});

Again, we can just use RetroLambda to simplify all these

Observable.just("Batman", "Superman", "Wonder Woman")
.map(s -> s.length())
.subscribe(integer -> {
System.out.println("Number of characters = " + integer);
});

flatmap(): the map() operator is indeed powerful in helping us to manipulate stream of data, but it limits in just transforming from one value to another value of type T. Meanwhile, the flatmap() can take on one value of the stream and return an Observable<T>, meaning each values from the new Observable can be separately pushed further down the pipeline. Therefore, flatmap() is perfectly suitable to make asynchronous call inside the method.

Let take a dive into flatmap() with the below example, in which, a list of url strings are given to us, and we have to find out what server each url belongs to. The example takes 5 simple steps to do; thus, demonstrate how powerful and flexible RxJava can be:

- First, we just wants a list of url strings for the input:
Observable.fromCallable(() -> Arrays.asList(“https://www.google.com/", “https://www.yahoo.com/", “https://www.microsoft.com/en-us/"))
- Second, transform the list of url strings to an Observable object so that it can emit the values one by one:
flatMap(urls -> Observable.from(urls))
- Third, for each url, we use the method HttpUrl.getServerName(String url) to retrieve the server name, and construct the output string:
flatMap(url -> Observable.just(url + " is hosted at: " + HttpUrl.getServerName(url)))
- Fourth, set up the call to run on IO thread:
subscribeOn(Schedulers.io())
- Finally, create a subscriber to listen to the event and output the results:
subscribe(s -> {System.out.println(s);})

At last, we can chain all these method calls all in one to achieve our mission.

Observable.fromCallable(() -> Arrays.asList("https://www.google.com/", "https://www.yahoo.com/", "https://www.microsoft.com/en-us/"))
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> Observable.just(url + " is hosted at: " + HttpUrl.getServerName(url)))
.subscribeOn(Schedulers.io())
.subscribe(s -> {System.out.println(s);});

Schedulers

Multi-threading development has never been an easy task; especially with modern web or mobile application. In fact, RxJava can make developing multi-threads application so simple by instructing its Observable objects and operators to run on particular Schedulers (or in other words, Threads).

By default, the Observable object and its operators get executed on the same thread as the subscribe() method get called. However, we can use the subscribeOn() operator to tell the Observable object where to execute its operations. For example, let’s say an Android app needs to do an intensive data computation, it can use subscribeOn(Schedulers.computation()) so that all of the heavy computational work will be done on a dedicated thread; then, all of the results will be notified to its observers on the android main UI thread via observeOn(AndroidSchedulers.mainThread()). Or with the above flatmap() example, we indeed used Schedulers.io() to schedule those operations to be done on an IO-bound work thread.

Subscription

When a Subscriber subscribes to an Observable object, it will return a Subscription.

Subscription subscription = Observable.just("Batman", "Superman", "Wonder Woman")
.map(s -> s.length())
.subscribe(integer -> {
System.out.println("Number of characters = " + integer);
});

This Subscription can be used to unsubscribe or unlink between the Subscriber and the Observable. Furthermore, when the unsubscribe() method get called from a Subscription, it will immediately terminate the chain of operations; thus, making the app to be efficient and not wasteful on its power resource. Specially with Android app, subscription.unsubscribe() can be easily called in onStop() so that we don’t have to worry about any memory-leaks or crashes.

Few Extra Words

RxJava is indeed proven to be extremely powerful while giving developers a great flexibility to get their development done right. Some other well-known libraries such as: Retrofit, RxAndroid and RxRelay can integrate very well and make RxJava to become super useful in developing Android application. However, RxJava does require a steep learning curve and often needs developers to stop thinking in their imperative programming world, but instead immerse themselves in the world of Reactive Programing to make their imagination perhaps runs wild and comes true.

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.