RX publish() examples

Probably the easiest way to understand RX’s ConnectableObservable is with examples — here are some.

Maor Korakin
3 min readMar 7, 2019

To share a stream among subscribers we use ConnectableObservable.
We have then to specify:
1. Should we replay the last events to new subscribers (.publish() / .replay())
2. When to connect upstream (.refCount() / .connect() / .autoConnect())

Below are logs of multiple consumers connecting to Observable.interval() in different ways.

Not connectable

Not Connectable
val stream = Observable.interval(1, TimeUnit.SECONDS)stream
.subscribe { Timber.i("Subscription A: $it")

stream
.delaySubscription(4, TimeUnit.SECONDS)
.subscribe { Timber.e("Subscription B: $it") }

Replaying to new subscribers

.publish()

.publish()
val stream = Observable
.interval(0, 2, TimeUnit.SECONDS)
.publish().refCount()
stream
.doOnSubscribe { Timber.i("A subscribed.") }
.subscribe { Timber.i("Subscription A: $it") }
stream
.delaySubscription(1, TimeUnit.SECONDS)
.doOnSubscribe { Timber.e("B subscribed.") }
.subscribe { Timber.e("Subscription B: $it") }

.replay()

.replay()
val stream = Observable
.interval(0, 2, TimeUnit.SECONDS)
.replay(1).refCount()
stream
.doOnSubscribe { Timber.i("A subscribed.") }
.subscribe { Timber.i("Subscription A: $it") }
stream
.delaySubscription(1, TimeUnit.SECONDS)
.doOnSubscribe { Timber.e("B subscribed.") }
.subscribe { Timber.e("Subscription B: $it") }

Connecting upstream

.refCount()

refCount()
val stream = Observable
.interval(1, TimeUnit.SECONDS)
.publish().refCount()
stream
.take(5)
.doOnTerminate { Timber.i("A disposed.") }
.subscribe { Timber.i("Subscription A: $it") }

stream
.delaySubscription(2, TimeUnit.SECONDS)
.take(2)
.doOnTerminate { Timber.e("B disposed.") }
.subscribe { Timber.e("Subscription B: $it") }

stream
.delaySubscription(10, TimeUnit.SECONDS)
.take(2)
.doOnTerminate { Timber.e("C disposed.") }
.subscribe { Timber.e("Subscription C: $it") }

.autoConnect()

autoConnect()
val stream = Observable
.interval(1, TimeUnit.SECONDS)
.publish().autoConnect(1) { disposables.add(it) }

stream
.take(5)
.doOnTerminate { Timber.i("A disposed.") }
.subscribe { Timber.i("Subscription A: $it") }

stream
.delaySubscription(2, TimeUnit.SECONDS)
.take(2)
.doOnTerminate { Timber.e("B disposed.") }
.subscribe { Timber.e("Subscription B: $it") }

stream
.delaySubscription(10, TimeUnit.SECONDS)
.take(2)
.doOnTerminate { Timber.e("C disposed.") }
.subscribe { Timber.e("Subscription C: $it") }

.publish { … }

.publish { … }
Observable
.interval(0, 1, TimeUnit.SECONDS)
.publish { published ->
Observable.merge(
published.map { "Stream A: $it" },
published.map { "Stream B: $it" }
)
}
.subscribe { Timber.i(it) }

--

--