RxJava Ninja: Observable Factories Part 2

Tompee Balauag
Familiar Android
Published in
6 min readAug 2, 2018
Photo by Marius Masalar on Unsplash

This article is a part of the RxJava Ninja series. The complete list of the articles for this series can be found at the end of this article (and will be updated as more articles become available).

Today we will continue our discussion of observable factories. The first family that we will discuss are special because they are “time-based”.

Observable.timer

This operator let’s you specify a time delay before emitting an event. This emits a single Long with a value 0L before terminating.

Image from rx docs

Let’s see some code to demonstrate.

What do you think will be the output of the above code when run as is? Well, the most probable one is none. Why is that? Let’s look at the code.

Calling the timer method will instantiate an ObservableTimer class using a scheduler. Wait what? A scheduler? Yes you read that right. This scheduler will be used to schedule the task. And this scheduler will run in a separate execution context in this case, a computation thread. There are many types of schedulers and they will be discussed soon so don’t worry if some of the terms sound weird to you.

For now, you can think of using schedulers as executing in a background thread. This explains why there is no log in the above example. The task was scheduled to fire after 1 second but the main thread immediately expired after subscription. We can prove this by adding a thread sleep of 5 seconds after subscribe to prolong the main thread life. Doing so will net you the desired output.

0

Now, let us check if the runtime thread is really different from assembly or subscription time. Let’s run this code.

The output should not surprise you.

Assembly thread: main
Subscription thread: main
Runtime onNext thread: RxComputationThreadPool-1
Runtime completed thread: RxComputationThreadPool-1

This means that the subscription still happens in the main thread and the onNext and onCompleted is already triggered in a computation thread. Now what if you want to wait for the result in the main thread? You can use a blockingSubscribe instead. Doing so will net you this log

Assembly thread: main
Subscription thread: main
Runtime onNext thread: main
Runtime completed thread: main

Observable.Interval

This operator is similar to the timer but this time with periodic emits. There is also a variant without initial delay.

Image from rx docs

Let’s use the same code from timer, replacing it with interval.

The output is

Assembly thread: main
Subscription thread: main
Runtime onNext thread: RxComputationThreadPool-1
Runtime onNext thread: RxComputationThreadPool-1
Runtime onNext thread: RxComputationThreadPool-1
Runtime onNext thread: RxComputationThreadPool-1
Runtime onNext thread: RxComputationThreadPool-1

Observable.intervalRange

This operator is under the range family but with interval capability. You can use this to generate a incremental values within a range at a given interval.

Image from rx wiki

This will output 5 items as long as the main thread can outlast the computational thread.

Assembly thread: main
Subscription thread: main
Runtime onNext thread: RxComputationThreadPool-1 value: 1
Runtime onNext thread: RxComputationThreadPool-1 value: 2
Runtime onNext thread: RxComputationThreadPool-1 value: 3
Runtime onNext thread: RxComputationThreadPool-1 value: 4
Runtime onNext thread: RxComputationThreadPool-1 value: 5
Runtime completed thread: RxComputationThreadPool-1

Now we will look at another family of factories that does not really create observables from non-observables but repeats an existing observable source.

Observable.repeat

This operator repeats the source sequence up to a specified amount of time. If number of times is not specified, it will repeat until Long.MAX_VALUE times.

Image from rx docs

Note that this operator repeats the entire sequence of its source, not just an item. Let’s see.

This will output

hello
hi
hello
hi
....

indefinitely. Note that this does not run on a scheduler. A good way to check if an operator runs on a scheduler is look at the operator function definition and look for the @SchedulerSupport annotation.

Another thing to note is that repetition is perform via resubscribing. This means that if your observable source is created on the fly, it will emit different instances of items from every repeat. Let’s try this on a callable that returns the current time.

This will output different timestamps depending on resolution. Try it.

Observable.repeatUntil

This operator will repeat the source indefinitely until the a condition is satisfied. This is kind of counter-intuitive as you would expect a repeat if the condition is true.

Image from rx wiki

The output of this code will be the same as the repeat code above.

Observable.repeatWhen

The concept of this operator is quite difficult to understand at first but bear with me. First, look at the marble diagram.

Image from rx wiki

Based on the diagram, the operator creates an intermediate stream of onComplete events. This means that retryWhen will only react when onComplete of the input is triggered. Now, this stream of onComplete which we will be calling Observable<OnComplete> (in reality, it is a Observable<?> because retryWhen does not care about the item type) will be used as an input to a handler. Now, you can use this Observable<OnComplete> to decide if you want to repeat or not. The handler function should return the same Observable<OnComplete> for this to work. Now, the power of retryWhen lies on the range of operators you can apply to this Observable<OnComplete>. Let’s take an example.

Observe the code first before running. We can see that our handler function will only take 3 onComplete events from the Observable<OnComplete> before terminating. Will this be true? Let’s check the output below.

hello
hi
hello
hi
hello
hi
onComplete

Indeed it is. You can do tons of things to this input observable like adding a constant delay to implement a polling mechanism, or a combination of both delay and numbered repeats.

Observable.defer

This will be the last factory operator that we will be discussing in this series.

Observable.defer serves the same purpose as fromCallable, except this one is more verbose. They both provide state information to each subscriber. Check its marble diagram.

Image from rx docs

Let’s try and convert our previous fromCallable example to a defer.

We can represent it via a combination of just + defer.

That concludes observable factories. On the next tutorial, we will discuss different types (and kinds) of observables.

Check out the other articles in this series.

  1. Introduction to Reactive Programming
  2. Building your first Observables and Observers
  3. In Depth Observables and Observers
  4. Marble Diagrams and Operators
  5. Observable Factories Part 1
  6. Observable Factories Part 2
  7. Single, Maybe and Completable
  8. Hot and Cold Observables
  9. Filtering Operators Part 1
  10. Filtering Operators Part 2

--

--