Reactive Programming in Android

In most Android applications, you are reacting to user interactions (clicks, swipes and etc.) while doing something else in the background for eg. networking.

One hard thing is that we should orchestrate all these and the application should react accordingly.

Consider that in an application we send a request to a database over the network to fetch the details of a user and after that completes based on some key we need to fetch the preferences of the user at the same time, and after all of that is we should populate the UI combining both the responses.
Normally in android, we will possibly use:

  1. 3–4 different AsyncTasks.
  2. Some locking mechanism.
  3. Object-level fields to store the results.
  4. Some functionality to combine the results.
  5. Some code to manipulate the incoming data if needed.

Here comes the advantage of Reactive programming with RxJava. All of this can be written in a single flow in one place based on a functional paradigm.

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

It is a style of programming where you define a source of data and a consumer of that data. Once you connect the consumer to the source, the framework takes care of pushing the data, generated by the source, to the consumer.

The above definition talks about three important things. I will be explaining each of these in detail.

  • Source of data
  • Consumer of data
  • Connecting Consumer to Source

The major building blocks of RxJava are the Observer and the Observable.
Observer — any object that wishes to be notified when the state of another object changes(consumer of data). The corresponding functions are onNext(), onError(), and onCompleted() from the Observer interface.

Observable — any object whose state may be of interest, and in whom another object may register an interest(source of data).

Subscription instance- is responsible for connecting the consumer to the source.

A Subscription instance represents the connection between an observer and an observable. We can call unsubscribe() on the particular instance whenever we want to remove the connection between the observer and the observable.When an observable object is newly created, its set of observers is empty until we connect an Observer with it using the Subscription interface.

For using RxJava in Android we need to add the following libraries in your build.gradle file.

compile 'io.reactivex:rxjava:1.1.0'
compile 'io.reactivex:rxandroid:1.1.0' // for android platform

Please note: recommended to use the latest versions.

Let us go through a simple example:

Observable<String> observable = Observable.just("Piano", "Violin");

This is an Observable which will emit the strings “Piano” and “Violin” and it can be observed by an Observer.

Now Let us create an Observer:

Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
//do something when the emission completes
}

@Override
public void onError(Throwable e) {
//do something when error happens
}

@Override
public void onNext(String response) {
System.out.println("instruments : " + response);
}
};

Finally, we have to connect both Observable and Observer with a subscription.

observable.subscribe(observer);

This will produce the following output:

instruments : Piano
instruments : Violin

The Observable emits the two objects (strings in this case) which are observed by the Observer. At this point, you can use a variety of operators to filter or transform the data according to your business needs. See the official Rx operator documentation or this page for a list of operators.

The onCompleted() is invoked once the observable has no other data to emit, terminating the emission.

The onError() function will be invoked if there are any errors during processing an object. You can use this callback to handle error propagation.

sources :

http://reactivex.io/documentation/observable.html
https://github.com/ReactiveX/RxJava/wiki
https://dzone.com/articles/rxjava-part-1-a-quick-introduction
http://blog.feedpresso.com/2016/01/25/why-you-should-use-rxjava-in-android-a-short-introduction-to-rxjava.html

Show your support

Clapping shows how much you appreciated Vinay John’s story.