Rx-Java in Android App Development — Part-II!
In the previous article, have explained about the usage and purpose of mainly used reactive operators (Rx-Operators) in android app development. Will continue discuss about few more Operators and concepts that are mandatory in android app development.
Topics Covered:
- How to apply Filters
- How to avoid Memory Leaks
- How to get the emissions in Order
ConcatMap
There is a problem with FlatMap i.e. it doesn’t preserve the order of emitting. To get emit those Observables, we need to use ConcatMap operator.
It also emits Observable as FlatMap.
It waits for each emission to complete to maintain the order of emission. So, takes more time than FlatMap.
Need to consider below things before taking decision to use these operators.
If Order is important, then use ConcatMap.
If Speed is important, then use FlatMap.
Buffer
Periodically gathers items emitted by an Observable into bundles and emit these bundles rather than emitting the items One at a time.
Observable<Integer> intervals = Observable.range(1, 16);myObservable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .buffer(4) .subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d){
} @Override
public void onNext(List<Integer> integers) {
/*Each time emits 4 integers as buffer value is 4, So, returning List<Integer> like 1 to 4 and 5 to 8 …etc. When ever it comes to onNext () method, it returns list of 4 integers as above.*/
} @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d(“Example”, “Emission completed!”); }});
There is a special purpose associated with this operator, will discuss when that scenario comes.
Filter
Emit only those items from an Observable that pass the Predicate test.
Observable<Integer> intervals = Observable.range(1, 16);ObservamyObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).filter(new Predicate <Integer>(){
@Override
public boolean test(Integer value){
return (value%3 == 0);
}}).subscribe(myObserver);
It returns only integers of multiples of 3.
Distinct
To avoid duplicate emissions.
Observable<Integer> intervals = Observable.just(3,3,6,9,6,16,12);ObservamyObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).filter(new Predicate <Integer>(){
@Override
public boolean test(Integer value){
return (value%3 == 0);
}})
.distinct()
.subscribe(myObserver);
Here, we are applying filter first to get the multiples of 3. that returns 3,3,6,9,6,12.
Then, by applying distinct(), it returns, 3,6,9,12
Skip(n)
First “n” items will be skipped and emitted from 7th element.
Observable<Integer> intervals = Observable.just(3,3,6,9,6,16,12);ObservamyObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).skip(2)
.subscribe(myObserver);
Returns 6,9,6,16,12. (Skips 3,3)
SkipLast(n)
Skips last n items.
Observable<Integer> intervals = Observable.just(3,3,6,9,6,16,12);ObservamyObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).skipLast(2)
.subscribe(myObserver);
Returns 3,3,6,9,6.
Disposable
The main purpose of this is to avoid Memory Leaks. If that network operation started in Background thread, but the component that started this operation is destroyed like Activity gets destroyed by executing onDestroy() method and still that network operation is not completed. In that case it acts as a memory leak if we don’t stop that operation when activity destroyed.
We can see that there is another callback method named as onSubscribe() that have Disposable object.
By just calling dispose() method, It will help to dispose that particular Observable object, when parent component that invoke this Observable gets destroyed.
So, In the above example,
myObservable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<>() {
@Override
public void onSubscribe(Disposable d){ d.dispose()
}
How to combine both Observable and Disposable together in Single component?
DisposableObserver
With this we don’t have to implement onSubscribe() method that have Disposable object (as with Observer implementation)
Example:
Disposable disposable = Observable.range(1, 5); disp.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onStart() {
}
@Override
public void onNext(Integer t) {
if (t == 0) {
dispose();
}
}
@Override
public void onError(Throwable t) { }
@Override
public void onComplete() {
}
);
So, we can directly call the dispose() method on this Disposable object.
disposable.dispose() while Destroying the Activity.
In corporate projects, we may need to make many REST/SOAP service calls in single screen. Then there is a possibility to forget calling dispose method on each Disposable separate. Eventually, will leads to memory leak.
How to handle this situation?
CompositeDisposable
When we have more than One observer, then better to use this to handle above situation. So that we can dispose single object rather than multiple.
Disposable d1 = Observable.just(1,2);
Disposable d2 = Observable.just(1,2);
Disposable d3 = Observable.just(1,2);
Disposable d4 = Observable.just(1,2);CompositeDisposable allAtOnce = new CompositeDisposable();allAtOnce.add(d1);
allAtOnce.add(d2);
allAtOnce.add(d3);
allAtOnce.add(d4);allAtOnce.dispose();//allAtOnce.clear(); Using clear will clear all, but can accept new disposable
//allAtOnce.dispose(); // Using dispose will clear all and set isDisposed = true, so it will not accept any new disposable
Scenario: When Observable emitting items fast that Observer not able to process all of them as fast as Observable emitting. This situation is called “Back pressure”.
How to get rid of that issue? and purpose of Rx — Subjects will see in the next Part(Part3).
Thanks for going through this article. You can ask questions in comments if you have any. Thanks.