# Understanding Marble Diagrams for Reactive Streams

--

Learning Reactive Programming can be a daunting task so the Rx team came up with a concept of marble diagrams to visually explain ReactiveX operators. If you find them confusing rather than helpful, this article is for you. Once you understand the concepts, you should be well on your way to grasp the different Rx operators and Reactive Programmig in general.

# The Basics

What is Reactive Programming? In its basic form, it deals with event streams (a sequence of events that happen over time). Whenever an event occurs, we react to it by doing something. We could react to events by using `for` loops but in Functional Programming the transformations are done via `map()`, `filter()` and other Rx operators.

Diagram A is a boring one and simply shows the timeline (left to right as time goes on) with no items being emitted. This basically means that you read the Marble diagrams from left to right.

Diagram B below is getting to be more interesting. 3 items are emitted and then the stream completes successfully.

How does an Observable emit items? The Observable simply calls onNext(item) to emit each item.

The items were emitted in this order: circle, pentagon and triangle.

Diagram C shows that 3 items were emitted and then the stream terminated with an error.

Diagram D shows that 3 items were emitted and the stream never terminated.

# Common Operators

Let’s analyze some common ReactiveX operators. We will use RxJava for code samples but the same concepts apply to other ReactiveX implementations. Here is a list of what we will cover with clickable links to their sections:

# filter()

Emits only those items from an Observable that pass a predicate test (certain condition is met). This is one of the most commonly used operators.

`Observable.just(2, 30, 22, 5, 60, 1)    .filter(x -> x > 10)    .subscribe(x -> Timber.d("item: " + x)));Printsitem: 30item: 22item: 60`

For each input item x, if x > 10, emit it in the resulting stream. 3 out of the 6 source values in the diagram are greater than 10 so the resulting stream should emit those 3 items.

1. Source stream emits item with value 2. It’s not greater than 10 so it does not get emitted by the resulting stream.
2. Source stream emits item with value 30. It’s greater than 10 so the resulting stream emits it.
3. Source stream emits item with value 22. It’s greater than 10 so the resulting stream emits it.
4. Source stream emits item with value 5. It’s not greater than 10 so it does not get emitted by the resulting stream.
5. Source stream emits item with value 60. It’s greater than 10 so the resulting stream emits it.
6. Source terminates which causes the resulting stream to terminate.

# map()

Transforms the items emitted by an Observable by applying a function to each item (or converting from one item type into another). This is one of the most commonly used operators.

`Observable.just(1, 2, 3)    .map(x -> 10 * x)    .subscribe(x -> Timber.d("item: " + x)));Printsitem: 10item: 20item: 30`

The source Observable stream contains 3 items with numeric values (1, 2 and 3 in that order). We apply the `map()` operator which takes numeric value of each item (x) and multiplies it by 10. The result Observable contains items with numeric values 10 (10 * 1), 20 (10 * 2) and 30 (10 * 3).

# flatMap()

Transforms the items in Observable stream into separate Observables, then flattens them into a single Observable stream. This is one of the most commonly used operators.

Here we have a stream of 3 circles each being transformed into 2 diamonds. Then the resulting 6 diamonds are flattened into a new Observable stream (that’s why the operator name is called `flatMap`).

1. 1 red circle is emitted resulting in 2 red diamonds being emitted.
2. 1 green circle is emitted resulting in 2 green diamonds being emitted.
3. 1 blue circle is emitted resulting in 2 blue diamonds being emitted.
4. The source stream successfully terminates and so does the resulting stream.

## map() vs flatMap()

So what is different between `map()` and `flatMap()`? The `map()` operator transforms each value in an Observable stream into a single value. `flatMap()` operator transforms each value in an Observable stream into an arbitrary number (zero or more) values. Given an input of type `T` and an output of type `R`, this can be represented as:

`map():input: Observable<T>transformation: (T -> R)output: Observable<R>flatMap():input: Observable<T>transformation: (T -> Observable<R>)output: Observable<R>`

For instance, let’s look up employees in an Engineering department by transforming a department stream into a stream of employees:

`departmentRepo.getDepartmentByType(Engineering).flatMap { dept ->    employeeRepo.getEmployeesByDepartment(dept.id)}.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(/* handle success */, /* handle error */)`

Unlike the `map()` operation, `flatMap()` is often used when each item emitted by the source stream needs to have its own threading operators applied to it. For instance, a stream of user ids, is flatMapped into `Observable<User>` objects where each `User` object is looked up from the database on the IO thread using the `subscribeOn(Schedulers.io())` operator.

`userRepo.getActiveUserIds().flatMap { userId ->    userRepo.getUserById(userId)        .subscribeOn(Schedulers.io())}.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(/* handle success */, /* handle error */)`

Note that `flatMap()` does not guarantee the order of the items in the resulting stream. That’s why you see that green and blue diamonds above interleaved. If the order of the items in the resulting stream is important, use `concatMap()`

# concatMap()

Similar to flatMap(), but it transforms only a single source event at a time. Therefore, it guarantees that the items emitted in the resulting stream maintain the same order and will not be interleaved.

For each circle emission the resulting stream produces 2 diamond emissions of the same color while maintaining the original sequence. 1 red circle is transformed into 2 red diamonds. Then 1 green circle is transformed into 2 green diamonds. The transformation of the blue circle into 2 blue diamonds is delayed until transforming the green shapes is done). Therefore the order of the resulting events is the same as the original: red, followed by green, followed by blue).

# concat()

Emit the items from two or more Observables without interleaving them.

`Observable<Integer> obs1 = Observable.just(1, 1, 1);Observable<Integer> obs2 = Observable.just(2, 2);Observable.concat(obs1, obs2)    .subscribe(x -> Timber.d("item: " + x)));Printsitem: 1item: 1item: 1item: 2item: 2`

Note that emissions from source Observable are mirrored in the resulting stream (items 1, 1, 1 in source stream are also emitted first in the resulting stream) and when that source stream terminates, items from the second stream (items 2 and 2) are emitted in the resulting stream. Since there are no more Observables to process (both source streams terminated), the resulting stream terminates.

# combineLatest()

Here we have 2 observable streams (let’s call them S1 and S2) emitted: circles with values 1, 2, 3, 4, 5 and circles with values A, B, C, D. Our `combineLatest()` operator concatenates the values for both and emits it in the resulting Observable (e.g. when item “1” is combined with item “A”, the result will be a “1A” emission). Whenever, any of the streams emits an item (as long as there was at least 1 item already emitted by the other source stream, the new combined item is emitted in the resulting stream.

There are 8 items emitted in the resulted (combined) Observable. Let’s trace the sequence:

1. Item “1” is emitted in stream S1 (top stream). No item is emitted in the resulting stream since there was no previously emitted items in stream S2.
2. Item “A” is emitted in stream S2. The resulting stream emits the newest product of `combineLatest()` operator: “1A”.
3. Stream S1 emits item “2”. The resulting stream emits “2A” (a combination of the new item “2” and previously emitted item “A”).
4. Stream S2 emits item “B”. The resulting stream emits “2B” (a combination of the previously emitted item “2” and the new item “B”).
5. Stream S2 emits item “C”. The resulting stream emits “2C” (a combination of the previously emitted item “2” and the new item “C”).
6. Stream S2 emits item “D”. The resulting stream emits “2D” (a combination of the previously emitted item “2” and the new item “D”).
7. Stream S1 emits item “3”. The resulting stream emits “3D” (a combination of the new item “3” and the previously emitted item “D”).
8. Stream S1 emits item “4”. The resulting stream emits “4D” (a combination of the new item “4” and the previously emitted item “D”).
9. Stream S1 emits item “5”. The resulting stream emits “5D” (a combination of the new item “5” and the previously emitted item “D”).
10. Both source streams successfully terminate and so does the resulting stream.

combineLatest() is similar to zip() operator except the latter will emit a resulting Observable only when each source Observable emits a new (previously unzipped) item. The combineLatest() operator, on the other hand, will reuse previously emitted items if no new items are available as long as at least one source Observable emits a new item (see step 3 above).

# zip()

Combines the emissions of multiple Observables together via a specified function.

`zip()` operator will emit a resulting Observable only when each source Observable emits a new (previously unzipped) item. If you’d like to “re-use” previously emitted item, check out the `combineLatest()` operator above.

Let’s observe the sequence of events above where items in the source Observables are zipped together (“1” is concatenated with “A” to make “1A”)

1. Item “1” is emitted in stream S1 (top stream). No item is emitted in the resulting stream since there was no new emitted items in stream S2.
2. Stream S2 emits item “A”. The resulting stream emits “1A” (a combination of the new item “A” and the previously emitted but not yet zipped item “1”).
3. Item “2” is emitted in stream S1. No item is emitted in the resulting stream since there was no new emitted items in stream S2.
4. Stream S2 emits item “B”. The resulting stream emits “2B” (a combination of the new item “B” and the previously emitted but not yet zipped item “2”).
5. Item “C” is emitted in stream S2. No item is emitted in the resulting stream since there was no new emitted items in stream S1.
6. Item “D” is emitted in stream S2. No item is emitted in the resulting stream since there was no new emitted items in stream S1.
7. Stream S1 emits item “3”. The resulting stream emits “3C” (a combination of the new item “3” and the previously emitted but not yet zipped item “C”).
8. Stream S1 emits item “4”. The resulting stream emits “4D” (a combination of the new item “4” and the previously emitted but not yet zipped item “D”).
9. Item “5” is emitted in stream S1. No item is emitted in the resulting stream since there was no new emitted items in stream S2.
10. Both source streams successfully terminate and so does the resulting stream.

# scan()

Applies a function to each item emitted by an Observable in the same sequence.

In the diagram above, our source Observable stream emits items with numeric values (1, 2, 3, 4, 5). In this case, the scan operator applies a function (sum of the current item and the previous item) to each item sequentially.

Here is the sequence of events:

1. Item with value 1 is emitted. Since there was no previous item emitted, the scan performs 0 + 1 and emits the resulting item with value 1.
2. Item with value 2 is emitted. The scan operator adds previous value to the new one (1 + 2) and emits the resulting item with value 3.
3. Item with value 3 is emitted. The scan operator adds previous value to the new one (3 + 3) and emits the resulting item with value 6.
4. Item with value 4 is emitted. The scan operator adds previous value to the new one (6 + 4) and emits the resulting item with value 10.
5. Item with value 5 is emitted. The scan operator adds previous value to the new one (10 + 5) and emits the resulting item with value 15.
6. The source stream successfully terminates and so does the resulting stream.

# reduce()

Applies a function to each item emitted by an Observable, sequentially, and emits the final value.

Our `reduce()` operator adds current and previous values and keeps track of the sum. When new item is emitted, its value is added to the previously calculated sum. When the source stream terminates, the resulting stream emits the sum of all items and immediately terminates.

1. Source stream emits a value with value 1. The sum becomes 1.
2. Source stream emits a value with value 2. The sum becomes 2 + 1 = 3.
3. Source stream emits a value with value 3. The sum becomes 3 + 3 = 6.
4. Source stream emits a value with value 4. The sum becomes 6 + 4 = 10.
5. Source stream emits a value with value 5. The sum becomes 10 + 5 = 15.
6. Source stream terminates successfully causing the resulting stream to emit the sum value of 15 and immediately terminate as well.

# debounce()

Only emits an item from an Observable if a particular timespan has passed without it emitting another item.

Let’s trace the sequence of events:

1. Source stream emits item 1 and only after a certain timespan T has passed, the resulting stream emits item 1.
2. Source stream emits item 2 quickly followed by 3, 4, 5 and stops emitting for a while. The `debounce()` operator determines that no new items were emitted from the source Observable within a certain time period T and proceeds to emit the latest item emitted by the source Observable (item 5).
3. Source stream emits item 6 and stops emitting for a while. The `debounce()` operator determines that no new items were emitted from the source Observable within a certain time period T and proceeds to emit the latest item emitted by the source Observable (item 6).
4. Source stream terminates and so does the resulting stream.

`debounce()` is useful for processing button clicks, observing text changes, etc. For instance, the following code will only emit button clicks after no new clicks where observed in the past 400 milliseconds:

`myButton.clicks()    .debounce(400, TimeUnit.MILLISECONDS)    .subscribe()`

# distinct()

Suppresses duplicate items by an Observable.

`Observable.just(1, 2, 2, 1, 3)    .distinct()    .subscribe(x -> Timber.d("item: " + x)));Printsitem: 1item: 2item: 3`
1. Source stream emits item 1. Since item 1 was never emitted before, the resulting stream emits it as well.
2. Source stream emits item 2. Since item 2 was never emitted before, the resulting stream emits it as well.
3. Source stream emits item 2. Item 2 was emitted before. Therefore, the resulting stream does not emit it again.
4. Source stream emits item 1. Item 1 was emitted before. Therefore, the resulting stream does not emit it again.
5. Source stream emits item 3. Since item 3 was never emitted before, the resulting stream emits it as well.
6. Source stream terminates successfully followed by the target stream to terminate successfully.

# distinctUntilChanged()

A variation of `distinct()` which will only emit values only if they are different from the previous value.

`Observable.just(1, 2, 2, 1, 3)    .distinctUntilChanged()    .subscribe(x -> Timber.d("item: " + x)));Printsitem: 1item: 2item: 1item: 3`
1. Source stream emits item 1. Since item 1 was not emitted before this item, the resulting stream emits it as well.
2. Source stream emits item 2. Since item 2 was not emitted before this item, the resulting stream emits it as well.
3. Source stream emits item 2. Item 2 was emitted before this item. Therefore, the resulting stream does not emit it again.
4. Source stream emits item 1. Item 1 was not emitted before this item. Therefore, the resulting stream emits it as well.
5. Source stream emits item 3. Since item 3 was not emitted before this item, the resulting stream emits it as well.
6. Source stream terminates successfully followed by the target stream to terminate successfully.

`distinctUntilChanged()` lets us emit a value if it’s different from the previous value. For instance, we could use it to emit weather (temperature) values only if it changes. Or we’d emit new soccer score only if the value has changed.

# takeUntil()

Mirrors items from first source Observable until after the second source Observable starts emitting items or terminates.

Here our resulting Observable mirrors the first source observable (emitting items 1 thru 5) and as soon as the second source Observable emits first item, our resulting Observable terminates.

Niklas Baudy has a great example of the `takeUntil()` operator:

`modelProvider.getItems()    .flatMap(retroApiInterface::doBackendRequest)    .takeUntil(response -> response.isSuccessful())`

Here we automatically terminate (stop sending backend requests) once the network response is successful.

# defaultIfEmpty()

Emit items from the source Observable, or a default item if the source terminates without emitting any items.

Here we specify that we would like a red circle to be emitted if the source stream terminates without emitting any items. And that’s exactly what happens after the source streams terminates without any items emitted — the resulting stream emits the red circle and then terminates.

Hope this was helpful! If something is not clear or there are other Rx operators that you’d like to cover here, feel free to leave a comment.