Crunching RxAndroid — Part 8

Roberto Orgiu
Crunching RxAndroid
6 min readApr 5, 2016

--

In the previous part of this series, we dissected a little bit of the core RxJava framework, with the idea of learning the most we could about the mechanics of the reactive programming on Android and how to create and use custom operators. This time, we will start talking about a very important — yet very hard to catch — element of the famous library, trying to understand where we can use this new component and where we do not need it.

Subjects

We already saw a lot of times that a Subscriber can listen to an Observable in order to receive the items emitted by the latter and, as far as we know, an Observable is an Observable and a Subscriber is a Subscriber. But… what if we have something that can be both? That something is a Subject, and we can see it as a pipe that subscribes to an Observable and to which a Subscriber can subscribe to. Moreover, a Subject can be used as a “dynamic” source of events, instead of using Observable.create() and similar, which is a synchronous operation. What’s the deal with that object, though? Let’s imagine a real life scenario where we have an Observable representing the status of a very long operation lasting several seconds, with a result that we want to display in the UI.

Here’s a pic of the user having fun destroying the beloved subscription

We could create our Observable in the onCreate method of the Activity and subscribe to it immediately, at that point the user might rotate the device, triggering a configuration change: we would lose our subscription (even worse, we would risk leaking memory, if we do not unsubscribe from it) and, with it, all the work we already did before the user came in like a wrecking ball (pun intended).

What could we do, to avoid that? One solution would be to use a Subject to subscribe to our Observable in our onCreate method, store the Subject safely so that it can survive to a rotation and then subscribe to it in the onResume method with our proper subscriber, so that we get back the results.

To do so, we could do something like this: as an example, we can create a headless Fragment (but we could have done it with any POJO stored in a static field or outside the Activity, just to keep it as simple as possible for the demonstration) that will contain our Observable and in its onCreate method, we do what we need to do, such as emitting items every second. We then subscribe to it using a Subject and then, in the onResume method of the same Fragment, we give back the Subject to the Activity as if it was an Observable:

In this way, every time our Activity is restored, we don’t lose any of the work we previously did but we have to remember that we have to unsubscribe from the Subject, otherwise we leak precious resources. And we don’t want to do that, do we?

From a general point of view, we could be done with the main idea behind Subjects and that would be great… if only there weren’t four kinds! Don’t you worry, though, because they share the same properties and they differ mainly in the way they emit the items received.

AsyncSubject

The first type of Subject we are going to explore is the AsyncSubject: it emits the last value (and only the last value) emitted by the source Observable, and only after that source Observable completes. Plus, if the source Observable does not emit any values, the AsyncSubject also completes without emitting any values. That means that no matter how many items we are trying and emitting, the Subject will receive them all, but it will only emit the last one and only after it receives the onCompleted event from the Observable it’s subscribed to. We usually use this particular kind of Subject whenever we don’t care about the intermediate results, but we just want the final computation result when the full chain is over. That is usually when the source Observable calls its onCompleted method, that will trigger the AsyncSubject emission.

ReplaySubject

The ReplaySubject is by far the easiest of the group as it emits to any observer all of the items that were emitted by the source Observable, regardless of when the observer subscribes. That means that no matter when you subscribe, you will receive all the items previously emitted, so that you always have the full chain of events without the need to worry about timing issues or losing any data.

PublishSubject

This kind of Subject is very similar to the BehaviorSubject, except from the fact that the final Subscriber will receive only the items that are emitted after the subscription takes place. Let’s understand it better by taking a look to the previous example, replacing the BehaviorSubject bs with new PublishSubject ps.

source.subscribe(bs);
source emits 1
source emits 2
bs.subscribe(s1)
source emits 3 -> s1 starts receiving items, starting from 3
source emits 4
source emits 5
bs.subscribe(s2)
source emits 6 -> s2 starts receiving items, starting from 5
bs.subscribe(s3)
source emits onComplete -> s3 receives the onComplete without receiving anything, s1, s2 receives onComplete

As we can see from the gist, the PublishSubject behaves similarly to the BehaviorSubject except for the initial value: with the latter, you will receive the last item emitted before your Subscriber subscribed to the Subject, making that kind of object a very ductile companion for most of your background work, while the PublishSubject cannot guarantee that you won’t lose something if, for instance, an item is emitted while the device was rotating.

BehaviorSubject

Using the BehaviorSubject, each Subscriber will receive the items starting from the one most recently emitted by the source Observable (or an optional seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s). Let’s give an example in order to clarify the mechanics of this Subject with an Observable that emits a number every second (like the source one we saw at the beginning of the article), bs is a BehaviorSubject that subscribes to source and let s1, s2 and s3 be Subscribers that will subscribe to bs in different instants in the timeline.

source.subscribe(bs);
source emits 1
source emits 2
bs.subscribe(s1) -> s1 starts receiving items, starting from 2
source emits 3
source emits 4
source emits 5
bs.subscribe(s2) -> s2 starts receiving items, starting from 5
source emits 6
bs.subscribe(s3) -> s3 starts receiving items, starting from 6

As we can see, when a Subscriber subscribes to the BehaviorSubject, it will receive the items starting from the last one emitted before it subscribed.

Conclusion

Subjects are a very important part of the framework and they are very versatile and useful in a pletora of situations, but they have a catch: once they call their onComplete or onError methods, they lose the possibility of moving data and this is usually not the desired behavior.
As usual, all the source and the examples for this article are available on the GitHub repository.

--

--