Howdy RxJava.

During my research on Reactive Programming, almost every article I’ve found, begins with the notion that it is difficult to learn. Finding articles focused on people with zero or little knowledge of reactive programming has been difficult. This article attempts to demystify the fundamentals of reactive programming for newcomers — using RxJava on android.

What is Reactive Programming?

Reactive programming is programming with asynchronous data streams.

Wait. I can easily do that using callbacks. So what’s different in Reactive programming?

Yes, this concept is not new. It can be done imperatively — and often is.

Let’s not only think about callbacks, but also the supporting mechanism that is required to get it up and running. This support usually involves managing states and considering the side effects of changing states — the imperative approach. These considerations have been the cause of countless errors within the software community. Reactive programming takes a functional approach; it avoids global state and side effects and instead, works with streams.

What is a Stream?

“Everything flows, nothing stands still” — Heraclitus

A Stream represents a sequence of data. Consider our transportation network. The vehicles on a particular highway forms a stream of objects endlessly flowing with the occasional bottleneck.Reactive programming, we receive a continuous flow of data — a stream — and we provide the operations to apply to the stream. The source of the data doesn’t (shouldn’t) really matter.

Streams are ubiquitous, anything can be a stream: variables, user inputs, properties, caches, data structures, etc.

What is declarative and imperative programming?

  • Imperative guides how to do it
  • Declarative tells what to do

Before diving into code, let’s go back to our transportation network. Let’s assume the mayor wants to pause the flow of vehicles temporarily on a given highway, at particular intervals — stop signs. The mayor says, “split the the highway into even segments and place a stop sign at each boundary point”. The contractor says, “wait, before splitting anything, I need to determine the length of each segment; and to do that, I need to know the length of the highway, how many stop signs we have, and the average vehicle length.” In this situation, the mayor has enough functional units (including the DOT), at her disposal such that she only has to focus on declaring her intentions and not worry about the details of getting the job done — this is the declarative approach. On the other hand, the contractor is tasked with ensuring every detail of the procedure is accounted for and done correctly — the imperative approach. What if you can build a software similar to how the mayor builds her city? Let’s look at an example:

E.g. Imperative approach of filtering out even numbers.

Integer[] numbers = {1, 2, 3, 4, 5};
List<Integer> lists = Arrays.asList(numbers);
List<Integer> results = new ArrayList<>();

for (Integer num : numbers) {
if (num % 2 != 0)
results.add(num);
}

Declarative approach

List<Integer> results = lists.stream()
.filter(s -> s % 2 != 0)
.collect(Collectors.toList());

Cool, I like the Declarative approach, but how does a computer know what to do when we don’t tell it what to do?

In today’s world, everything ends up being imperative at the point when it hits the operating system and hardware. Reactive programming, however, is an abstraction of functional programming. Similar to how our high-level imperative programming languages are an abstraction for the underlying binary and assembly instructions. (The mayor needs her DOT contractors).

So, how do we adjust to the declarative style of programming in Java?

Java 8 has an awesome Stream API. But, if you are an android developer like me, you cannot use the Stream API because android does not support all the features of Java 8 yet. Nevertheless, you can use RxJava, a reactive extension to Java that was ported by developers at Netflix.

How does RxJava work?

The basic building blocks of reactive code are Observable and Observer.

  • An Observable can be observed (similar to the Subject in observer pattern) &
  • An Observer listens to Observable.

An Observable is a class that emits a stream of data or events, and an Observer is a class that acts upon the emitted items. An Observable can have multiple Observers, and for each event/item emitted by the Observable, the item will be sent to the Observer.onNext() method to be handled. Once an Observable has finished emitting items it will call Observer.onComplete(). If there is an error the Observable will call the Observer.onError() method.

Note: There are some Observable that never terminates (e.g. Temperature Sensor output).

The channel that connects the Observer with Observable is called a Subscription and it can be later used to unsubscribe from an observable.

It sounds similar to Observer Pattern, what’s the difference between Observer Pattern and RxJava’s architecture?

RxJava’s Observable adds two missing semantics to Observer Pattern.

  • The ability for the producer to signal to the consumer that there is no more data available. (i.e. onComplete())
  • The ability for the producer to signal to the consumer that an error has occurred. (i.e. onError())

Furthermore, the power of RxJava is that it enables these Observable streams to be transformed, aggregated and filtered with only a few lines of code while also minimizing the need for storing state variables.

Show me some Code:

Creating Observable:

Integer[] numbers = {1, 2, 3, 4, 5, 6, 7};
List<Integer> lists = Arrays.asList(numbers);
Observable<Integer> integerObservable = Observable.from(lists);

integerObservable emits the integers 1, 2, 3, 4, 5, 6 and 7 and then completes.

Note: There are many ways to compose observables. Checkout the official documentation for more details.

Subscriber

Subscriber is a special type of observer which has ability to unsubscribe from the Observable.

Subscriber<Integer> mySubscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer data) {
Log.d("Rx", "onNext:"+data);
}

@Override
public void onCompleted() {
Log.d("Rx","Complete!");
}

@Override
public void onError(Throwable e) {
// handle your error here
}
};

Connecting Subscriber to Observable:

Observables are lazy as they don’t do anything unless there is someone listens to them.

myObservable.subscribe(mySubscriber);
// Outputs:
// onNext: 1
// onNext: 2
// onNext: 3
// onNext: 4

// onNext: 5
// onNext: 6
// onNext: 7

// Complete!

Transforming Streams:

There are numerous operators in RxJava for the transformation of the stream. Following are most commonly used operators.

  • Filter: The Filter operator filters an Observable by only allowing items through that pass a test that you specify in the form of a predicate function.
integerObservable.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer o) {
return o % 2 == 0;
}
}).subscribe(mySubscriber);
// Outputs :
// onNext: 2
// onNext: 4
// onNext: 6

// Complete!

Here i am filtering out all non-even numbers.

---1---2---3---4----5----6----7---|-->
         filter(x % 2 == 0)
-------2-------4---------6--------|-->
Note: Func<T, R> Represents a function with one argument. T is first argument type and R is result type.
  • Map: The Map operator applies a function of your choosing to each item emitted by the source Observable, and returns an Observable that emits the results of these function applications.
integerObservable.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer value) {
return value * value;
}
}).subscribe(mySubscriber);
// onNext:1
// onNext:4
// onNext:9
// onNext:16
// onNext:25
// onNext:36
// onNext:49
// Complete!

Here I am using map operator to transform one emitted item into another. I transforming the input stream so that all the items emitted by integerObservable are squared.


---1---2---3---4----5----6----7---|-->
        map(x -> x * x)
---1---4---9---16---25---36---49---|-->

There are tons of operators in RxJava.

Ok, but you said Reactive programming is asynchronous?

RxJava is synchronous until told otherwise.

But is this (synchronous) the desired behavior of the reactive system?

Determining whether to use an asynchronous or synchronous observable requires analysis of the specific problem. For example, it may be appropriate to synchronize the fetching of data from an in-memory cache and returning it immediately. On the other hand, if the observable production makes network calls or does time-consuming data processing then it should be asynchronous. A rule of thumb, when developing for a graphical system, use an asynchronous observable when the work is initiated on the UI thread and is a blocking and/or computation heavy operation.

RxJava is agnostic about where the asynchrony comes from.

Gotcha, now tell me how to compose observable asynchronously if needed?

First, let’s look at the old way of offloading lengthy I/O intensive operations to a different thread (non-ui thread).

Old way:

private class FetchUsersTask 
extends AsyncTask<String, Void, User> {

protected User doInBackground(String... someData) {
String userId=params[0];
User user = UsersService.getUser(userId);
return result;
}

protected void onPostExecute(User user) {
//handle the result and update the view
}
}

FetchUsersTask makes usersService.getUsers() call and return the list of strings to the onPostExecute(). It looks pretty easy and simple but there are some problems with this code.

  • Error Handling: Error/Exceptions might happen inside the doInBackground(), so in order to recover from errors we add a try-catch block. Generally, when we catch exceptions, we log them and notify the user of the failure which usually involves the UI thread. Using AsyncTask, this would require much more code — we could use an Object as the return type of doInBackground() and check the type in onPostExecute() using instanceof.
  • Memory Leak: Even if the Activity/Fragment that started AsyncTask is destroyed, AsyncTask keeps on running, doing doInBackground() method until it is done. Since asyncTask needs to report the view after the background work is complete, it has to keep a reference to the Activity/Fragment as it’s running. Which may result in the memory leak and/or crashes if Activity is destroyed before the background work is completed and the programmer did not use appropriate techniques such as Weak References. Another technique may be to use cancel(boolean) which will attempt to cancel the execution of task and onCancelled() method will be called instead of onPostExecute().
  • Chaining Multiple Network Calls: The only approach to orchestrating multiple AsyncTasks is to use nesting, which leads to unwieldy code.

RxJava Way:

Now, below is how we load data asynchronously using RxJava.

Observable.fromCallable(new Callable<User>() {
@Override public User call() throws Exception {
return UsersService.getUser(userId);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<User>() {
@Override public void onCompleted() {
Log.d("Rx", "Completed");
}

@Override public void onError(Throwable e) {
Log.d("Rx", e.getMessage());
}

@Override public void onNext(User user) {
Log.d("Rx", user.getName());
}
});

Here, subscribeOn(Schedulers.io()) makes the observable to work on new thread and observeOn(AndroidSchedulers.mainThread())) makes the subscriber action to execute its result on main UI Thread.

It’s similar to AsyncTask but more easier and cleaner. RxJava solves all the problems that I mentioned earlier.

  • Error handling : Error handling is much more easier in RxJava approach because every possible errors and exceptions are thrown to onError() method. Since, we are observing on AndroidSchedulers.mainThread() we can easily interact with UI and notify user about error.
  • Memory Leaks: RxJava will not magically alleviate memory leak woes, but preventing them is fairly simple. RxJava has cleaner way of unsubscribing the ongoing asynchronous calls. Calling unsubscribe() method of subscriber or subscription will unregister activity/fragment from getting notifications. If you have multiple subscription, you can use CompositeSubscription to hold all of your Subscriptions, and then unsubscribe all at once in onDestroy() or onDestroyView().
private CompositeSubscription allSubscriptions = 
new CompositeSubscription();
//add all the subscription to allSubscriptions
allSubscriptions.add(subscription1);
allSubscriptions.add(subscription2);
allSubscriptions.add(subscription3);
//clear all subscription on onDestroy
@Override
public void onDestroy()
{
super.onDestroy();
allSubscriptions.clear();
}
  • Chaining Multiple Network Calls: There are a lot of operators which help us to chain and transform the observable. Once understood, it is very easy to chain multiple network calls. Lets consider a scenario where we get list of userId’s from one call and must make a getUser() for each userId to fetch the user information.
Observable.fromCallable(new Callable<List<String>>() {
@Override public List<String> call() throws Exception {
return UserService.getUserIds();
}
}).flatMap(new Func1<List<String>, Observable<String>>() {
@Override public Observable<String> call(List<String> userIds) {
return Observable.from(userIds);
}
}).flatMap(new Func1<String, Observable<User>>() {
@Override public Observable<User> call(String userId) {
return Observable.just(UserSerive.getUser(userId));
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<User>() {
@Override public void onCompleted() {
Log.d("Rx", "emit","Completed!");
}

@Override public void onError(Throwable e) {
Log.d("Rx", "emit", e.getMessage());
}

@Override public void onNext(User user) {
Log.d("Rx", user.getName());
}
});

Same Code after using Lambdas:

Observable.fromCallable(() -> UsersService.getUserIds())
.flatMap(userIds -> Observable.from(userIds))
.flatMap(userId -> Observable.just(UserService.getUser(userId))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()))
.subscribe(new Subscriber<User>() {
@Override
public void onCompleted() {
Log.d("Rx", "emit","Completed!");
}

@Override
public void onError(Throwable e) {
Log.d("Rx", "emit", e.getMessage());
}

@Override
public void onNext(User user) {
Log.d("Rx", "emit", user.getName());
}
});

Following diagram depicts the transformation of a Stream with a single List<String> to Stream for multiple Users.

-------{~~~~~~~~~~~~list of user ids [1,2,3,4,5]~~~~~~~~~}---|-->

flatMap(userIds -> Observable.from(userIds))
-------1------------2----------3------------4------------5---|--->
          flatMap (userId -> UserService.getUser(userId))
----user1--------user2------user3--------user4-------user5---|--->

References

Staltz, André. “The Introduction to Reactive Programming You’ve Been Missing.” .

Christensen, Ben, and Tomasz Nurkiewicz. Reactive Programming with RxJava.