RxJava2: An Early Preview

Mike Nakhimovich
5 min readSep 17, 2016

--

As an avid RxJava user I’ve been following the development of RxJava2 which just hit its first Release Candidate. Here are some of the most interesting updates and additions to the library and what they mean for the developer community.

New Dependency

Before going into the API differences, I want to make a note about RxJava2 having a dependency on ReactiveStreams which is a standard for asynchronous stream processing with non-blocking backpressure. Think of ReactiveStreams as a spec for Reactive libraries to build on top of. Adhering to this standard adds the ability for interpolation with other Reactive Libraries.

Imports

RxJava2 is available in a different package than RxJava1:

RxJava1:

compile ‘io.reactivex:rxjava:1.0.y-SNAPSHOT’

RxJava2:

compile ‘io.reactivex.rxjava2:rxjava:x.y.z’

This means that you can start using RxJava2 while still using RxJava1 rather than having to fix your code to work with the breaking changes between versions. This style of packaging is similar to the versioning strategy used by Square Libraries (Retrofit2, OKHTTP3, etc.) and is great if you’re trying to run both versions. That said, if you’re instead migrating at once, this means that you will now need to change all your imports to the new package. Android Studio find/replace should mitigate most of the pain.

Null Emissions No Longer Permitted

RxJava2 no longer permits emitting a null value from a producer. Emitting a null value from an observable, flowable, or subject will throw a NullPointerException:

Observable.just(null); //don’t do thissubject.onNext(null); //don’t do this either

I’m personally a big fan of this change and believe that null should not exist in an API. I’ve always appreciated that Dagger throws exceptions if returning a null from a provides and am glad that RxJava is now enforcing clean code practices. There are plenty of alternatives to passing null through a stream such as: Enums, Constants, and Guava/Java8 Optional types (my favorite).

Under (Back) Pressure

RxJava 2 introduces a new base type called Flowable which is an Observable with backpressure support. Backpressure is when an Observable emits values faster than an Observer is able to handle. With RxJava2, Observables do not support backpressure while Flowables will support backpressure (with different strategies).

Flowables seem to have the same operators as Observables plus hooks for defining/handling backpressure. The good news is that you’ll still have all your favorite operators like map/filter, etc. The bad news is that organizing imports might now grab the wrong one. Here’s an example of creating a Flowable from the wiki:

Flowable.create((FlowableEmitter<Integer> emitter) -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}, BackpressureStrategy.BUFFER);

This looks very similar to the current Observable.create but with the added param of how to handle Backpressure. Hopefully, making Flowable its own type will help developers consider Backpressure when designing reactive flows rather than learning about it for the first time they see it in crash logs.

The Reactive-Streams spec also mandates that all operators support backpressure (within Flowable in our case). What this means is that Operators can’t emit faster than a downstream Consumer can consume. It’s important to note that this does not mean that operators can’t have backpressure problems — it just means that they will now throw the MissingBackpressureException exception themselves rather than pushing too much through onNext. The exception will be thrown from the Operator rather than the Consumer/Subscriber.

Single Old and New

Single has been reengineered from the ground up and is now built on the ReactiveStreams spec. Subscribing to a Single is now done with a SingleObserver<T> which is an interface defining:

void onSubscribe(Disposable d);void onSuccess(T value);void onError(Throwable error);Subscribing to a single is done in the same way:single.subscribe(onSuccess,onError);

Hit Me Maybe One More Type

RxJava 2 introduces a new base reactive type called Maybe. Maybe is essentially a hybrid between Single and Completable. You can use Maybe when you have a provider that can emit 0 or 1 items. You don’t need to worry about handling backpressure because Maybe emits at most 1 item.

Maybe will offer only a subset of operators available in Flowable/Observable.

Example:

Maybe.just(1)
.map(v -> v + 1)
.filter(v -> v == 1)
.defaultIfEmpty(2)
.test()
.assertResult(2);

New BackPressured Subject: Processor

RxJava 2 introduces a new type, Processor, which is a Subject with backpressure support. AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject, UnicastSubject maintain their existing functionality from RxJava 1 and do not support backpressure. Their Processor counterparts do: AsyncProcessor, BehaviorProcessor, PublishProcessor, ReplayProcessor, and UnicastProcessor.

New Names for Function and Action

RxJava 2 brings new names for function interfaces: Func1 is now Function, Func2 is BiFunction and all other function interfaces in favor of Function<Object[],R>. Similar to Guava there will be a Predicate<T> for the common use case of Func1<T, Boolean>. Actions follow suit: Action0 becomes Consumer, Action1 is BiConsumer and ActionN becomes Consumer<Object[]>. (These two changes will lead to the largest amount of refactoring/migration in my apps.)

Subscriber is Now Disposable

Due to naming conflicts with Reactive-Streams, Subscriber has been renamed to Disposable. Disposables have a .dispose() method that’s similar to the .unsubscribe() method of Subscription:

ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {@Override
public void onStart() {
request(Long.MAX_VALUE);
}
@Overridepublic void onNext(Integer t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println(“Done”);
}
};
Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);subscriber.dispose();

In the example above we see one more renaming change: onCompleted will now be onComplete. Yay for find/replace.

Composite Subscriptions Changes

Composite Subscriptions have been replaced with CompositeDisposable and a new subscribeWith method.

RxJava1:

CompositeSubscription composite = new CompositeSubscription();composite.add(Observable.range(1, 5).subscribe(new TestSubscriber<Integer>()));

RxJava2:

CompositeDisposable composite = new CompositeDisposable();composite.add(Flowable.range(1, 5).subscribeWith(subscriber));

The API change is due to Reactive-Streams .subscribe now being a void method.

Blocking Calls

RxJava2 provides several new operators to help with transforming asynchronous Observables into BlockingObservables. Particularly observable.blockingFirst() as a convenience observable.toBlocking.first(). Yay for less code and better performance.

Better Hooks for Plugins

One of my favorite changes is a rewrite of the plugin system. Now you’ll be able to override the values returned by the built in schedulers. This could be useful for overriding Schedulers.io() to return synchronous values during testing or even for making debug Schedulers that wrap the current ones inside of more logging. Way to go guys! I know this was a much clamored after and difficult-to-implement feature.

Summary

Overall I’m excited about the direction that RxJava is going. My favorite feature of the release is the new not null checks, which will surely lead to fewer elusive bugs. I’m also excited about the standardization with Reactive-Streams and the new Maybe and Flowable types.

With the stable release targeted for October 29, I hope everyone’s upgrades will be quick and painless. It seems like Retrofit already has support for RxJava2 for those that want to be on the bleeding edge. There’s also a nice library that helps convert from RxJava1 to RxJava2 types.

A big kudos goes to David Karnok, the lead contributor of RxJava, for his work on this release. While he might not be a name that many Android devs are familiar with, he’s been a leader in pushing reactive patterns into the community.

Sources:

http://reactivex.io/RxJava/2.x/javadoc/

https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0

http://stackoverflow.com/questions/38423079/differences-between-rxjava1-and-rxjava2

--

--