RxSubjects, Cold and Hot Observables, ConnectableObservable

We know about observables that produces the events and subscribers that subscribes to the observables to consume those events. What if we have something that can work like both as an observable as well as an subscriber. That something is RxSubjects.

If we see at its implementation, its extending the Observable class and implementing the Observer interface. That’s why it can act like both.

public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
protected Subject(OnSubscribe<R> onSubscribe) {
super(onSubscribe);
}

public abstract boolean hasObservers();

public final SerializedSubject<T, R> toSerialized() {
return this.getClass() == SerializedSubject.class?(SerializedSubject)this:new SerializedSubject(this);
}
}

RxSubjects are by default HOT observable. Before moving to subjects, Let’s first talk about HOT and COLD observables.

We have two kinds of Observables (HOT and COLD) in RxJava.

Cold Observable

  1. Observable that doesn’t emit items until a subscriber subscribes.
  2. If we have more than one subscriber, then observable will emit sequence of items to all subscribers one by one.

Let’s have an example where we have an observable which is cold and two subscribers that are subscribed to that observable.

public static void main(String[] args) {

Observable<Integer> cold = Observable.create(subscriber -> {
for (int i = 0; i <= 1; i++) {
System.out.println("Source Emit " + i);
subscriber.onNext(i);
}
});

cold.subscribe(subscriber1);
cold.subscribe(subscriber2);
}
private static Action1<Integer> subscriber1 = new Action1<Integer>()
{
@Override
public void call(Integer integer)
{
System.out.println("Subscriber 1 :" + integer);
}
};

private static Action1<Integer> subscriber2 = new Action1<Integer>()
{
@Override
public void call(Integer integer)
{
System.out.println("Subscriber 2 :" + integer);
}
};

If we look at the output, we can see that the values 0,1 and 2 are emitted again for second subscriber.

Source emit : 0
Subscriber 1 : 0
Source emit : 1
Subscriber 1 : 1
Source emit : 0
Subscriber 2 : 0
Source emit : 1
Subscriber 2 : 1

If we don’t want to produce the sequence of data again, we have another choice i.e Hot Observables.

Hot Observables

  1. Observables that don’t wait for any subscription. They start emitting items when created.
  2. They don’t emit the sequence of items again for a new subscriber.
  3. When an item is emitted by hot observable, all the subscribers that are subscribed will get the emitted item at once.
ConnectableObservable<Integer> connectble = cold.publish();
connectble.subscribe(subscriber1);
connectble.subscribe(subscriber2);
connectble.connect();

Here, we have used ConnectableObservable to convert a cold into hot observable. If we see the output, both the subscribers will get the item at once when item gets emitted.

Source Emit : 0
Subscriber 1 : 0
Subscriber 2 : 0
Source Emit : 1
Subscriber 1 : 1
Subscriber 2 : 1

And, if there is no subscriber still the observable will emit all the items.

ConnectableObservable<Integer> connectable = cold.publish();
connectableObservable.connect();
Hot Observable Emit 0
Hot Observable Emit 1
Hot Observable Emit 2

So, now we can list some of the use cases where HOT observables are best suited.

  1. When we want to do some job without having subscription.
  2. When we want to broadcast to all subscribers at once.
  3. When we don’t want to trigger the source of data again and again for new subscribers.

HOW TO CREATE HOT OBSERVABLES

We have two ways of creating HOT observables.

  1. Subject : Using subjects, we can not only convert the cold into hot observable but also can create the hot observable from scratch.
  2. ConnectableObservable : By using ConnectableObservables, we can only convert the cold observable into hot observable by using its publish and connect methods and various variants like refCount, autoConnect and replay etc.

Talking about Subjects, We have four varieties of subjects in RxJava that are designed for particular use cases.

AsyncSubject : AsyncSubject will emit only the last value to its subscribers when source observable completes. AsyncSubject will get all the items emitted by source observable but only emit the last item when source observable calls its onCompleted method.

Let’s explore this with an example. I have a cold observable and when AsyncSubject subscribes to this cold observable, it becomes hot means it started emitting items.

Observable<Integer> cold = Observable.create(subscriber -> {
for (int i = 0; i <= 2; i++) {
System.out.println("Source Emits : " + i);
subscriber.onNext(i);
}
subscriber.onCompleted();
});

AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
cold.subscribe(asyncSubject);
asyncSubject.subscribe(subscriber1);
asyncSubject.subscribe(subscriber2);

If we look at the output, we can see the behaviour of AsyncSubject. Our source is emitting all the values but subscriber is getting only last value.

Source Emits : 0
Source Emits : 1
Source Emits : 2
Subscriber 1 : 2
Subscriber 2 : 2

We usually use this particular kind of Subject whenever we don’t care about the intermediate results, but we just want the final computation result when the full chain is over.

BehaviourSubject : BehaviourSubject emits the most recently item at the time of subscription or a default item if none has been emitted and then continues the sequence until complete.

Quite Confusing, Let’s see an example to clear the confusion. Here, interval operator of RxJava is used to emit sequence of integers spaced by a given timestamp.

Note, I used addSomeDelay() method as I want to block the main thread for some time so that source observable is able to emit some items before a subscriber subscribes to BehaviourSubject to understand the behaviour.

public static void main(String[] args) {
Observable<Long> cold = Observable.interval(1000, TimeUnit.MILLISECONDS);

BehaviorSubject<Long> behaviorSubject = BehaviorSubject.create(-1L);
cold.subscribe(behaviorSubject);

addSomeDelay();

behaviorSubject.subscribe(subscriber1);
behaviorSubject.subscribe(subscriber2);

addSomeDelay();
}

private static void addSomeDelay() {
try {
System.out.println("Wait for some seconds");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

When Subscribers gets subscribed to BehaviourSubject after a delay of 3 seconds, the source observable has already emitted 0,1 and 2 as per the interval operator does. So, subscribers will get the recently emitted item i.e. 2 and then continues until the main function completes.

Wait for some seconds
Subscriber 1 : 2
Subscriber 2 : 2
Wait for some seconds
Subscriber 1 : 3
Subscriber 2 : 3
Subscriber 1 : 4
Subscriber 2 : 4

Here, you can see we are not getting any default value i.e. -1. Why we are not getting this. Because our subscribers are subscribing to BehaviourSubject after some delay and source observable has started emitting items during that delay. If the subscribers subscribed before the source observable started emiting items, then subscribers will get the emitted items followed by the default value. We can see this in the given example.

public static void main(String[] args) {

Observable<Long> cold = Observable.interval(1000, TimeUnit.MILLISECONDS);

BehaviorSubject<Long> behaviorSubject = BehaviorSubject.create(-1L);

behaviorSubject.subscribe(subscriber1);
behaviorSubject.subscribe(subscriber2);

cold.subscribe(behaviorSubject);

addSomeDelay();
}

Now, we get the default value first as subscribers are subscribing to BehaviourSubject before BehaviourSubject is going to subscribe to ColdObservable to make it hot.

Subscriber 1 : -1
Subscriber 2 : -1
Wait for some seconds
Subscriber 1 : 0
Subscriber 2 : 0
Subscriber 1 : 1
Subscriber 2 : 1

PublishSubject : PublishSubject is much similar to BehaviourSubject except that it emits only those items which are emitted after the subscription. Also, It doesn’t give any default value.

Let’s clarify this using a java example.

public static void main(String[] args) {

Observable<Long> cold = Observable.interval(1000, TimeUnit.MILLISECONDS);

PublishSubject<Long> publishSubject = PublishSubject.create();
cold.subscribe(publishSubject);

addSomeDelay();

publishSubject.subscribe(subscriber1);
publishSubject.subscribe(subscriber2);

addSomeDelay();
}

And output shows the real difference. When both subscriber subscribes, the source observable has already emitted 0,1 and 2 as we give a delay of 3 seconds before the subscription. So, both subscribes will get the items starting from 3.

Subscriber 1 : 3
Subscriber 2 : 3
Subscriber 1 : 4
Subscriber 2 : 4
Subscriber 1 : 5
Subscriber 2 : 5

ReplaySubject : It emits all the emitted items to the subscribers regardless of when the subscribers subscribes and then continues the sequence. There are also versions of ReplySubject that will throw away the items if the buffer size gets filled with items or specified timespan gets passed.

Let’s have an example to explain it further.

public static void main(String[] args) {

Observable<Long> cold = Observable.interval(1000, TimeUnit.MILLISECONDS);

ReplaySubject<Long> publishSubject = ReplaySubject.create();
cold.subscribe(publishSubject);

addSomeDelay();

publishSubject.subscribe(subscriber1);
publishSubject.subscribe(subscriber2);

addSomeDelay();
}

When both subscribers subscribes after a delay, source observable has already emitted 0,1 and 2. So, ReplaySubject will give all the emitted items to its subscribers first and then continues until the main function exit.

Subscriber 1 : 0
Subscriber 1 : 1
Subscriber 1 : 2
Subscriber 2 : 0
Subscriber 2 : 1
Subscriber 2 : 2
Subscriber 1 : 3
Subscriber 2 : 3
Subscriber 1 : 4
Subscriber 2 : 4
Subscriber 1 : 5
Subscriber 2 : 5

Now, Let’s figure out some variants of ConnectableObservable as well. We have the following methods that we used to convert a cold observable to hot observable.

  1. Publish : It convert an ordinary Observable into a connectable Observable. A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.
  2. Connect : It instruct a connectable Observable to begin emitting items to its subscribers.
  3. Replay : It works exactly the same way as the ReplaySubject does. It ensures that all Subscribers see the same sequence of emitted items, even if they subscribe after the Observable begins emitting the items.
  4. AutoConnect : This method will allow to emit the sequence of data once the specified number of subscribers have subscribed. We give the number of subscribers in its constructor.
Observable<Long> cold = Observable.create(subscriber -> {
for (long i = 1; i <= 2; i++) {
System.out.println("Source Emits : " + i);
subscriber.onNext(i);
}
});
Observable<Long> observable = cold.publish().autoConnect(2);

observable.subscribe(subscriber1);
addSomeDelay();
observable.subscribe(subscriber2);

We passed 2 in autoConnect means it will not going to emit anything until 2 subscribers are there. First subscriber gets subscribed , then second subscriber will subscribed after some seconds. Then it starts emitting sequences starting from 0.

Wait for some seconds
Source Emits : 0
Subscriber 1 : 0
Subscriber 2 : 0
Source Emits : 1
Subscriber 1 : 1
Subscriber 2 : 1
Source Emits : 2
Subscriber 1 : 2
Subscriber 2 : 2

5. RefCount : This method makes the hot observable little cold. It converts the connectable observable into an ordinary observable until first subscriber subscribes. When RefCount get its first subscriber, then it makes the source observable hot.

public static void main(String[] args) {

Observable<Long> coldObservable = Observable.interval(1000,TimeUnit.MILLISECONDS);
Observable<Long> observable = coldObservable.publish().refCount();

addSomeDelay();
observable.subscribe(subscriber1);
addSomeDelay();
observable.subscribe(subscriber2);
addSomeDelay();
}

First it doesn’t emit anyting even after waiting for some seconds. When it gets its first subscriber, emission gets started from 0. And when second subscriber comes again after some delay, it also getting the ongoing sequence of data.

Wait for some seconds 
Wait for some seconds
Subscriber 1 :0
Subscriber 1 :1
Subscriber 1 :2
Wait for some seconds
Subscriber 1 :3
Subscriber 2 :3
Subscriber 1 :4
Subscriber 2 :4
Subscriber 1 :5
Subscriber 2 :5

Now, Let’s talk about some disadvantages of using Hot Observables.

  1. Source of data should becomes unpredictable to subscriber : If we are converting a cold observable into hot using subjects then source of data becomes unpredictable. But How, Let’s see an example.
public static void main(String[] args) {
Observable<String> coldObservable = Observable.create(subscriber -> {
String[] values = new String[]{"One","Two","Three"};
for(String s : values)
{
subscriber.onNext(s);
}});

PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.subscribe(subscriber1);
coldObservable.subscribe(publishSubject);

publishSubject.onNext("I can also emit.");

}

Our Subscriber is subscribing to publishSubject which is subscribed to coldObservable. It means Subscriber will assumed to get One,Two and Three Only. But As PublishSubject can also emit data, it also emit an event. Now, subscriber doesn’t know from where this data is coming. For subscriber, its becomes unpredictable.

One
Two
Three
I can also emit.

If we used connectable observable, then it only emits the sequence emitted by the source observable.

So, what experts says is If you want to convert already existing cold observable into hot , then connectable observable should be used. And, if you want to create a hot observable from scratch, then subject should be used.

2. Subscribers can have subscription effects based on their subscription time : If we are creating an hot observable, the subscribers will get the data based on their subscription time. If they subscribed early, they will get the emitted data. There are more chances that they can miss data as well. So, In both Subjects and ConnectableObservable, we should convert cold observable into hot when all subscribers get’s subscribed.

All the examples used here are hosted at github repository.

References :

  1. http://reactivex.io/documentation/subject.html
  2. https://medium.com/crunching-rxandroid/crunching-rxandroid-part-8-bf1808c08f95#.bqameargm
  3. http://tomstechnicalblog.blogspot.in/2016/03/rxjava-problem-with-subjects.html