RxJava Tidbits #2: Parallelism with Observable.zip()

Jens Driller
RxJava Tidbits
Published in
2 min readSep 10, 2016

The zip operator is really handy. I use it a lot.
In the RxJava documentation it is described as follows:

The Zip method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable.

ReactiveX — Zip operator

Note the two words “in sequence”. What do they mean?

Let’s assume we are dealing with the following Observable which simulates an intense calculation that takes X ms to complete and simply prints out its duration:

private static Observable<Object> sleepyObservable(final long duration) {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("Sleeping " + duration + "ms");
Thread.sleep(duration);
return null;
}
});
}

If we are zipping three of the above Observables together and want to measure the total execution time, we will end up with something like this:

long startTime;Observable.zip(
sleepyObservable(300),
sleepyObservable(200),
sleepyObservable(100),
new Func3<Object, Object, Object, Object>() {
@Override
public Object call(Object o1, Object o2, Object o3) {
return null; // combine objects...
}
}
).doOnSubscribe(new Action0() {
@Override
public void call() {
startTime = System.currentTimeMillis();
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
long totalDuration = System.currentTimeMillis() - startTime;
System.out.println("Total execution time: " + totalDuration + "ms");
}
}).subscribe();

Can you guess the output? Here it is:

Sleeping 300ms
Sleeping 200ms
Sleeping 100ms
Total execution time: 641ms

So it takes 600ms in total for the three Observables to complete, plus some milliseconds of additional ceremony around it.

Now how can we run those three Observables in parallel to receive the final result faster? We simply subscribe to each stream on a separate Thread like so:

long startTime;Observable.zip(
sleepyObservable(300).subscribeOn(Schedulers.newThread()),
sleepyObservable(200).subscribeOn(Schedulers.newThread()),
sleepyObservable(100).subscribeOn(Schedulers.newThread()),
new Func3<Object, Object, Object, Object>() {
@Override
public Object call(Object o1, Object o2, Object o3) {
return null; // combine objects...
}
}
).doOnSubscribe(new Action0() {
@Override
public void call() {
startTime = System.currentTimeMillis();
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
long totalDuration = System.currentTimeMillis() - startTime;
System.out.println("Total execution time: " + totalDuration + "ms");
}
}).subscribe();

The subscribeOn(Schedulers.newThread()) guarantees that each call to sleepyObservable() happens on a separate background thread. That implies that the Observables are no longer executed sequentially. The order can vary with every subscription.

Here is the new output:

Sleeping 200ms
Sleeping 100ms
Sleeping 300ms
Total execution time: 306ms

--

--

Jens Driller
RxJava Tidbits

Tech Enthusiast • Android Developer • Footy Addict