RxJava Parallelization Concurrency : Zip() Operator

Rohit Singh
AndroidPub
Published in
3 min readJun 25, 2018

Often zip() operator is misunderstood when it comes to parallelism. Think of a situation when you have multiple http request and combine the result in one, it actually happens Sequentially instead of Parallel.

For folks who don’t know about zip operator

Zip operator is used to combine emission from multiple observable into a single observable.

Marble Diagram for Zip Operator

Zip Operator Works in Sequence. That means each observable will be executed sequentially.

Lets look at the example

val source1 = Flowable.fromCallable {
for (i in 1..3) {
Thread.sleep(1000)
println("Source 1 emitted : " + i + " " + Thread.currentThread())
}
"source 1 Completed"
}

val source2 = Flowable.fromCallable {
for (i in 1..3) {
Thread.sleep(1000)
println("Source 2 emitted : " + i + " " + Thread.currentThread())
}
"source 2 completed"
}

Flowable.zip(source1, source2, BiFunction<String, String, Pair<String, String>>({ t1, t2 -> Pair(t1, t2) }))
.subscribeOn(Schedulers.io())
.subscribe({
println("Combined Result " + Thread.currentThread())
})

Output for the above code snippet

So as we see the output source2 only emits item after source1 has emitted all items. That implies that zip operator works on a single thread and execute sequentially.

So guys you would be thinking, how to achieve parallelization?

You can achieve parallelization by using subscribeOn(Schedulers.newThread()) on each observable that is used in zip operator. Scheduler.newThread() creates a new thread for each observable.

Editing our earlier code with subscribeOn to each observable

val source1 = Flowable.fromCallable {
for (i in 1..3) {
Thread.sleep(1000)
println("Source 1 emitted : " + i + " " + Thread.currentThread())
}
"source 1 Completed"
}.subscribeOn(Schedulers.newThread())

val source2 = Flowable.fromCallable {
for (i in 1..3) {
Thread.sleep(1000)
println("Source 2 emitted : " + i + " " + Thread.currentThread())
}
"source 2 completed"
}.subscribeOn(Schedulers.newThread())

Flowable.zip(source1, source2, BiFunction<String, String, Pair<String, String>>({ t1, t2 -> Pair(t1, t2) }))
.subscribeOn(Schedulers.io())
.subscribe({
println("Combined Result " + Thread.currentThread())
})

Output of the above code snippet

As we see both observable source1 and source2 emits item in parallel.

P.S. Thread creation in mobile world might be costly when you have too many observable so use it wisely.

Thanks for reading and if you like the article, remember to clap. Note that if you hold the clap button, you can leave more claps!

--

--