Intro to RxJava & It’s use in Android

Piyushkhapekar
Intro to RxJava and its use in Android
6 min readJan 6, 2021

RxJava is the open-source implementation of ReactiveX. It is for enabling Reactive Programming in Android development. The two main classes of RxJava are Observable and Subscriber. In RxJava, Observable is a class that emits a stream of data or events. It is similar to an Iterator in that, given a sequence, it iterates through and produces those items in an orderly fashion, and a Subscriber is a class that acts upon the emitted items. The standard flow of an Observable is to emit one or more items and then complete successfully or with an error. An Observable can have multiple Subscribers, and for each item emitted by the Observable, the item will be sent to the Subscriber.onNext() method to be handled. Once an Observable has finished emitting items, it will call the Subscriber.onCompleted() method, or if there is an error the Observable will call the Subscriber.onError() method. Now let’s look at creating and subscribing to Observable.

Observable intObservable = Observable.create(new Observable.OnSubscribe() {   @Override
public void call (Subscriber subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
});

This observable emits integer 1, 2, 3 and then completes. Now let’s create Subscriber so that we can act upon the stream of emission.

Subscriber intSubscriber = new Subscriber () {
@Override
public void onCompleted () {
System.out.println("onCompleted: ");
}

@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer value) {
System.out.println("onNext: " + value);
}
};

Above Subscriber will print emitted items and notify upon completion. Once you have an Observable and Subscriber you can connect them with Observable.Subscribe() method.

intObservable.subscribe(intSubscriber);// Outputs will be:
onNext : 1
onNext : 2
onNext : 3
onCompleted :

The above code can be optimized using Observable.just() method to create an Observable to emits only the values defined.

Observable.just(1, 2 , 3 ).subscribe(new Subscriber() {
@Override
public void onCompleted() {
System.out.println("onCompleted: " );
}
@Override
public void onError(Throwable e){}
@Override
public void onNext(Integer value) {
System.out.println( "onNext: " + value );
}
});

Observable, Observer, and Operator — The 3 O’s of RxJava Core:

Observable:

As discussed in above Observable is a class that emits a stream of data or events. It is similar to an Iterator in that, given a sequence, it iterates through and produces those items in an orderly fashion. In addition to that Observables allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you can use for collections of data items like arrays. It frees you from complicated callbacks and makes your code more readable and less prone to bugs

Observer:

The next component of an Observable stream is Observer subscribed to it. It notified whenever interesting or we can say something expected happens in the stream. Observers are notified using following events.

  1. onNext(T): Invoked when items are emitted from the stream.

2. onError(Throwable): Invoked when an error has occurred within the Stream.

3. onCompleted(): Invoked when the stream is finishing emitting items.

To subscribe to a stream, simply call Observable<T>#subscribe(…) and pass Observer instance as below.

Observable<Integer> intObservable = Observable.just(1, 2, 3);
intObservable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted () {
System.out.println(“onCompleted: “);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println(“onNext: “ + value );
}
});

For stop observing items, simply call unsubscribe() of the returned subscription. Say for above example intObservable.unscubscribe() in right place.

Operators:

Creating and subscribing to an Observable is simple enough, and may not seem overly useful, but that is just the beginning of what is possible with RxJava. Item emitted by an Observable can be transformed, modified and filtered through Operators before notifying to the subscribed object using functional programming like map, filter, reduce etc.

For example: continuing with our previous Observable example, if we only want to emit odd numbers that we receive we can use filter() operator.

Observable.just(1, 2, 3, 4, 5, 6) //you can add more numbers
.filter(new Func1() {
@Override
public Boolean call(Integer value) {
return value % 2 == 1 ;
}
}).subscribe(new Subscriber() {
@Override
public void onCompleted() {
System.out.println( “onCompleted: “ );
}
@Override
public void onError (Throwable e) {}
@Override
public void onNext(Integer value) {
System.out.println(“onNext: “ + value );
}
});
// Outputs:
// onNext: 1
// onNext: 3
// onNext: 5
// onCompleted:

filter() operator defines a function that will take in our emitted Integer values, and return true for all odd numbers, and false for all even numbers. The value that returns false from filter() operator are not emitted to Subscriber hence not seen in produced output. filter() operator returns an observable that we can then subscribe.

Now consider a case where I want to find out the square root of the emitted odd numbers. One way of doing this is to calculate in onNext() of Subscriber but by doing this it would not be possible to further transform data stream, and another way of doing this is we can chain the map()operator with filter() operator.

Observable.just (1, 2, 3, 4, 5, 6 ) //you can add more numbers
.filter ( new Func1 () {

@Override
public Boolean call ( Integer value ) {
return value % 2 == 1 ;
}
}.map(new Func1 () {
@Override
public Double call( Integer value ) {
return Math.sqrt( value );
}
}).subscribe( new Subscriber () { // notice Subscriber type changed
to@Override
public void onCompleted() {
System.out.println("onCompleted: " );
}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Double value) {
System.out.println("onNext: " + value );
}
});
// Outputs:
// onNext: 1.0
// onNext: 1.7320508075688772
// onNext: 2.23606797749979
// onCompleted:

RxJava in Android:

A common task in Android is to load some amount of work in the background thread and once finished, post result on the main thread. In android we can do this using AsyncTasks, Loaders, Services etc. but this solution cannot be considered as best. AsyncTask can lead to memory leaks, CursorLoaders with ContentProvider needs a large amount of configurations and boilerplate code to setup, and services are used for long running operations. So let’s take an example of AsyncTask for the mentioned scenario. For this firstly we create inner AsyncTask in our Activity / Fragments, It will perform the network operation in background and take the result of that operation and update the UI in the main thread.

public class DownloadRequestTask extends AsyncTask< Void, Void, Data> {
private final String url ;
public DownloadRequestTask(String url) {
this.url = url ;
}
@Override
protected Data doInBackground(Void ... params) {
return networkObservableService.getData(url);
}
@Override
protected void onPostExecute(Data data) {
descTextView.setText(data.getDescription());
}
}
btn.setOnClickListener(new OnClickListener(){
public void onClick(View v) {
// TODO Auto-generated method stub
new DownloadRequestTask(url.execute ()
}
});

This approach has some issues and limitations namely Memory Leak are easily created since this AsyncTask is the inner class that holds an implicit reference to the outer class. Also if we think of another network operation after this Async call, we would have to nest two Async calls which reduce readability. Let’s see how RxJava can help us from this problem. To get used of RxJava in Android add the following dependencies to build.gradle file:

Gradle dependency to use:

compile 'io.reactivex:rxandroid:1.0.1'private Subscription subscription ;
btn.setOnClickListener( new OnClickListener (){
subscription = networkObservableService.getObservableData(url)
.subscribeOn(Schedulers.io()) // subscribe on I/O thread
.observeOn(AndroidSchedulers.mainThread()) // Observe the result on UI thread
.subscribe(new Action1<Data>() {
@Override
public void call ( Data data ) {
descTextView.setText(data.getDesc ());
}
});
});
@Override
protected void onDestroy() {
if (subscription != null && ! subscription.isUnsubscribed ()) {
subscription.unsubscribe();
}
super.onDestroy ();
}

This Observable will use Schedulers.io() to subscribe on the I/O thread and it will observe the result on UI thread using AndroidSchedulers.mainThread(). With this, we can solve the problem of memory leaks caused by running a thread holding reference to outer context by keeping the reference to the returned Subscription. This Subscription object is then tied to the Activity object’s onDestroy() method to guarantee that the Action1#call operation does not execute when the activity / Fragment needs to be destroyed.

--

--