Concurrent tasks with zip operator

Giuseppe Villani
3 min readJul 3, 2017

--

One of the most compelling reason for using RxJava is the possibility to easily combine asynchronous sequences of Observable. This also means that it is possible to execute parallel (or at least concurrent) tasks, exploiting Java multithreading capabilities.

The well known zip() operator is a useful tool for executing and combining multiple streams, returning just a single stream of data.

Let’s suppose that we want to combine the following Observables, running them concurrently.

public Observable<List<Order>> getOrders();public Observable<List<Customer>> getCustomers();

With the zip operator, we can easily obtain the desired result:

Observalbe.zip(getOrders(), getCustomers(),
(order, customer) -> {/* … */})
.subscribe();

The zip operator will do its job, and at the end of the execution the two streams will be combined and transformed into a single stream. The only problem is that the snippet above doesn’t execute the two observables concurrently.

This is because, if we don’t instruct the Observable to do its work on a particular thread, by calling the Observable’s SubscribeOn operator, all the work will be done in the Main thread (or more generally, the thread from which we are subscribing), so in order to make them running concurrently, what most of developers think it makes sense to do, it is to apply subscribeOn() to the Observable obtained from the zip operator:

Observalbe.zip(getOrders(), getCustomers(),
(order, customer) -> {/* ... */}))
.subscribeOn(Schedulers.newThread())
.subscribe();

Unfortunately, this will execute ALL the work in a thread different from the main one, but still in a sequence manner. The two operations (retrieving the orders, and retrieving the customers) will be executed, indeed, one after the other.

In order to fix it, we may be tempted to use a different Scheduler, like Schedulers.io() that make use of a thread pool, or Schedulers.computation() that uses a thread pool based on the number of processors available (parallelism):

Observalbe.zip(getOrders(), getCustomers(),
(order, customer) -> {/* … */}))
.subscribeOn(Schedulers.io())
.subscribe();

But this will actually result in the same behaviour, except that now the thread is different. Our tasks will still not be executed at the same time. The reason is that, regardless of which thread we use for “executing” the zip operator, each inner observable (the parameters of the zip operator) will run in the same specified thread, so that they will be executed sequentially.

The solution is to instruct all the Observables, in order to execute their job on a different thread:

Observalbe.zip(getOrders().subscribeOn(Schedulers.newThread()), 
getCustomers().subscribeOn(Schedulers.newThread()),
(order, customer) -> {/* … */}))
.subscribe();

In this way we are going to execute the two tasks at the same time, and we are observing the result (unless we switch thread using observeOn) on the thread that finishes as last.

In order to execute concurrent works, associated to each Observable as parameter of the zip operator, we have to assign a different thread for each of them, using subscribeOn.

We can also use the Schedulers.computation() (keep in mind that this Scheduler shouldn’t be used to perform IO-bound work), in this way we are going to parallelize our tasks based on the number of the CPUs available: still, if our tasks are more then the number of processors, then not all of them will start at the same time, but as soon as a core will be free, a new pending task will be executed.

I’d love to hear your feedback in the comments.

--

--

Giuseppe Villani

Android developer — Passionate about mobile development, clean code and reactive programming