Photo by J.H.YE

Side effect of messing up with Rx operators: concatMap() vs flatMap() in RxJava. Part I

WeiJung Yang
SWAG
Published in
3 min readMar 11, 2019

--

When the question comes to What is the difference between concatMap() and flatMap() in RxJava? People would tell you:

flatMap() doesn’t care about orders, while concatMap does.

And I bet a lot of people read it from

So do I. That explains why their signatures are the same, and why people always get confused about when to use which.

At first I was like “ I don’t really care, sometimes I use flatMap, sometimes I use concatMap.”. I never had a chance to have the problem that needs to take care of orders.

And like the author himself said: he didn’t took a look at the Rx document before he uses it. Neither did I. I don’t even understand what his code is to do. Until one day, a strange bug came to me. I finally realized that it is something to do with the concatMap/flatMap things.

Here is what I found.

TL;DR

Make sure every observable completes if you use concatMap(), and it really matters

Say I have 3 Observables that looks like below

val source = Observable.create<Int>{
it
.onNext(1)
it.onNext(2)
it.onComplete()
it.onNext(3)
}
fun operationCompletes(number: Int): Observable<Int> {
return Observable.create<Int> {
it
.onNext(number)
it.onComplete()
}
}
fun operationNoCompletes(number: Int): Observable<Int> {
return Observable.create {
it
.onNext(number)
}
}

The source emits onNext(1), onNext(2), onComplete() and then try to fire onNext(3). To make things easier, the functions do nothing but pass the variable to downstream. One is with onComplete() while the other one is not.

The expected result looks like:

I/System.out: [Original]OnNext: 1
I/System.out: [Original]OnNext: 2
I/System.out: [Original]Completed

And then let’s try to connect observables with flatMap()

And the result is

I/System.out: [FlatMapC]OnNext: 1
I/System.out: [FlatMapC]OnNext: 2
I/System.out: [FlatMapC]OnComplete
I/System.out: ######
I/System.out: [FlatMapNC]OnNext: 1
I/System.out: [FlatMapNC]OnNext: 2
I/System.out: ######

Hmm… the onComplete() is gone after the non-complete observable. But so what? onNext() is enough for most of the time, and the non-complete observable itself choose not to fire onComplete(). That’s the desired behavior.

Okay, Let’s change nothing but flatMap() to concatMap()

The result is:

I/System.out: [ConcatC]OnNext: 1
I/System.out: [ConcatC]OnNext: 2
I/System.out: [ConcatC]OnComplete
I/System.out: ######
I/System.out: [ConcatNC]OnNext 1
I/System.out: ######

wait… what? Only onNext(1) was emitted in the last case.

This actually makes sense. concatMap() takes orders in consideration, so it must wait for a signal to tell him that he can do next.

I would like to go deeper with the source code and Rx documents, but it is more complicated than I thought. So the part I is just to share this common error that happens to Rx beginners. Part II will be wait until I found how to tell the story.

Any feedback is welcome. I would be happy to hear how programmers are to avoid this things from happening in a team work project.

References

flatMap()

RxJava Observable tranformation: concatMap() vs flatMap()

discussions on stackoverflows

--

--