Pro RXJava — Don’t subscribe observable in the constructor

Harshit Bangar
3 min readNov 6, 2017

--

import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
// Don't use this.
public class RXJavaNoobTip {

public RXJavaNoobTip(Observable<String> observable) {
observable.subscribe(new Consumer<String>() {
@Override public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
}

This code has 2 issues:

  1. There is no deinit method. Technically, finalize is the same but it is not guaranteed to be called. Check the article for the details.
import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Consumer;

public class RXJavaNoobTip {

CompositeDisposable compositeDisposable = new CompositeDisposable();
public RXJavaNoobTip(Observable<String> observable) {
compositeDisposable.add(observable.subscribe(new Consumer<String>() {
@Override public void accept(String s) throws Exception {
System.out.println(s);
}
}));
}

// Not guaranteed to be called.
@Override protected void finalize() throws Throwable {
compositeDisposable.dispose();
super.finalize();
}
}

2. Don’t let the “this reference” escape during the object construction. Check the article for details.

One of the mistakes that can introduce a data race into your class is to expose the this reference to another thread before the constructor has completed. Sometimes the reference is explicit, such as directly storing this in a static field or collection, but other times it can be implicit, such as when you publish a reference to an instance of a non-static inner class in a constructor. Constructors are not ordinary methods -- they have special semantics for initialisation safety. An object is assumed to be in a predictable, consistent state after the constructor has completed, and publishing a reference to an incompletely constructed object is dangerous. — Safe construction techniques

Starting a new thread or subscribing an observer in constructor can lead to the “this reference” escape. The other thread sees an incompletely constructed object. A similar example with threads.

// Don't use this
public class ThreadNoobTip {
public ThreadNoobTip() {
Thread thread = new Thread(new Runnable() {
@Override public void run() {
// Sees an incompletely constructed object.
}
});
thread.start(); // It is wrong.
}
}

It is safe however to create a thread or an observable in a constructor. Just start the thread or subscribe the constructor in an init(lifecycle) method.

A happier version of the ThreadProTip class:

public class ThreadProTip {
Thread thread;
public ThreadProTip() {
thread = new Thread(new Runnable() {
@Override public void run() {
// :) Happy code.
}
});

}

public void init() {
thread.start();
}
}


class Caller {
public static void main(String[] args) {
ThreadProTip threadProTip = new ThreadProTip();
threadProTip.init();
}
}

A similar example for RX-Java:

import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Consumer;

public class RXJavaProTip {

CompositeDisposable compositeDisposable = new CompositeDisposable();
Observable<String> observable;
public RXJavaProTip(Observable<String> observable) {
this.observable = observable;
}

public void init() {
compositeDisposable.add(observable.subscribe(new Consumer<String>() {
@Override public void accept(String s) throws Exception {
System.out.println(s);
}
}));
}

public void deinit() {
compositeDisposable.dispose();
}
}

class Caller {
public static void main(String[] args) {
Observable observable;
RXJavaProTip rxJavaProTip = new RXJavaProTip(observable);
rxJavaProTip.init();
// Do your stuff.
rxJavaProTip.deinit();
}
}

Thanks to Sidharth Raja and Dharak Kharod for proof-reading.

--

--