Basics Of RxJava (2) — Subjects

Akash Agrawal
3 min readApr 19, 2020

I would suggest checking my first blog in this series, if you havent checked before reading this.

So we discussed about Observables and Observers in our first tutorial, now this tutorial is about to discuss about the third thing Subjects

In the first tutorial I mentioned subject as something which is both Observable and an Observer. But doesn’t it look strange ? 🤔

Why would we need something which is both an observer and an observable and how would it listen and emit events both?

But Subjects are something which I use a lot, and are very important in initialising a stream of events. If you recall correctly, I mentioned in my first blog that onNext(), onError() and onCompleted() are methods of Observer interface but are called by Observable. So how can we achieve this?

We need to implement Observer and that same class should also extend Observable class , so that the new class can call its own methods ( onNext(), onError() and onCompleted() ). So for this purpose, RxJava came up with a powerful solution which are Subjects.

Enough of talking 😫, lets code 🚀

So for demonstrating this, I created a simple project with activity_main.xml and MainActivity.

So this is how activity_main.xml look

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">

<Button
android:id="@+id/rx_button"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="Hello World!"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintTop_toTopOf="parent" />

<Button
android:id="@+id/rx_button_2"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="Hello World 2!"
android:layout_marginTop="150dp"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintTop_toTopOf="parent" />

<Button
android:id="@+id/rx_button_3"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="Hello World 3!"
android:layout_marginTop="300dp"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintTop_toTopOf="parent" />


</androidx.constraintlayout.widget.ConstraintLayout>

And this is how MainActivity looks like

import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import io.reactivex.subjects.PublishSubject
import kotlinx.android.synthetic.main.activity_main.*

class MainActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val publishSubject = PublishSubject.create<Int>()
rx_button.setOnClickListener{
publishSubject.onNext(1)
}
rx_button_2.setOnClickListener{
publishSubject.onNext(2)
}
rx_button_3.setOnClickListener{
publishSubject.onNext(3)
}

publishSubject
.hide()
.subscribe{
Log.i("MainActivity", "ButtonClicked$it" )
}
}


}

So I created a PublishSubject here, using PublishSubject.create<Int> and then called onNext using this subject whenever any button is clicked.

So what does the onNext() function of Publish subject or for that matter any subject do ?

// onNext() method of Publish Subject@Override
public void onNext(T t) {
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
for (PublishDisposable<T> pd : subscribers.get()) {
pd.onNext(t);
}
}
// Now coming to onNext() method of PublishDisposable class/**
* Constructs a PublishSubscriber, wraps the actual subscriber and the state.
*
@param actual the actual subscriber
*
@param parent the parent PublishProcessor
*/
PublishDisposable(Observer<? super T> actual, PublishSubject<T> parent) {
this.downstream = actual;
this.parent = parent;
}

public void onNext(T t) {
if (!get()) {
downstream.onNext(t);
}
}

So in simple words, it basically calls the onNext() method of the actual observer, that is the observer we created on the subscribe() in our MainActivity code and hence this code is invoked and we can see the log when we run the code and any button is clicked

Log.i("MainActivity", "ButtonClicked$it" )

And then I used hide() . In this example hide() is not useful and we could have directly called subscribe() , but I wanted to demonstrate the use case of hide() function. So what does hide() do ? 🤔

So hide() method converts the subject into an observable and as per its documentation :
Hides the identity of this Observable and its Disposable.
Allows hiding extra features such as Subject’s Observer methods or preventing certain identity-based optimisations

So now after we do hide() on the subject we can no longer do onNext() on this subject and we can only perform methods of Observable on it. This is very useful as in most cases we want to initialise the subject and call onNext on a private class and then only expose the resulting Observable to the rest of the world via an internal or public function.

--

--