RX publish() examples
Probably the easiest way to understand RX’s ConnectableObservable is with examples — here are some.
3 min readMar 7, 2019
To share a stream among subscribers we use ConnectableObservable.
We have then to specify:
1. Should we replay the last events to new subscribers (.publish() / .replay())
2. When to connect upstream (.refCount() / .connect() / .autoConnect())
Below are logs of multiple consumers connecting to Observable.interval()
in different ways.
Not connectable
val stream = Observable.interval(1, TimeUnit.SECONDS)stream
.subscribe { Timber.i("Subscription A: $it")
stream
.delaySubscription(4, TimeUnit.SECONDS)
.subscribe { Timber.e("Subscription B: $it") }
Replaying to new subscribers
.publish()
val stream = Observable
.interval(0, 2, TimeUnit.SECONDS)
.publish().refCount()stream
.doOnSubscribe { Timber.i("A subscribed.") }
.subscribe { Timber.i("Subscription A: $it") }stream
.delaySubscription(1, TimeUnit.SECONDS)
.doOnSubscribe { Timber.e("B subscribed.") }
.subscribe { Timber.e("Subscription B: $it") }
.replay()
val stream = Observable
.interval(0, 2, TimeUnit.SECONDS)
.replay(1).refCount()stream
.doOnSubscribe { Timber.i("A subscribed.") }
.subscribe { Timber.i("Subscription A: $it") }stream
.delaySubscription(1, TimeUnit.SECONDS)
.doOnSubscribe { Timber.e("B subscribed.") }
.subscribe { Timber.e("Subscription B: $it") }
Connecting upstream
.refCount()
val stream = Observable
.interval(1, TimeUnit.SECONDS)
.publish().refCount()stream
.take(5)
.doOnTerminate { Timber.i("A disposed.") }
.subscribe { Timber.i("Subscription A: $it") }
stream
.delaySubscription(2, TimeUnit.SECONDS)
.take(2)
.doOnTerminate { Timber.e("B disposed.") }
.subscribe { Timber.e("Subscription B: $it") }
stream
.delaySubscription(10, TimeUnit.SECONDS)
.take(2)
.doOnTerminate { Timber.e("C disposed.") }
.subscribe { Timber.e("Subscription C: $it") }
.autoConnect()
val stream = Observable
.interval(1, TimeUnit.SECONDS)
.publish().autoConnect(1) { disposables.add(it) }
stream
.take(5)
.doOnTerminate { Timber.i("A disposed.") }
.subscribe { Timber.i("Subscription A: $it") }
stream
.delaySubscription(2, TimeUnit.SECONDS)
.take(2)
.doOnTerminate { Timber.e("B disposed.") }
.subscribe { Timber.e("Subscription B: $it") }
stream
.delaySubscription(10, TimeUnit.SECONDS)
.take(2)
.doOnTerminate { Timber.e("C disposed.") }
.subscribe { Timber.e("Subscription C: $it") }
.publish { … }
Observable
.interval(0, 1, TimeUnit.SECONDS)
.publish { published ->
Observable.merge(
published.map { "Stream A: $it" },
published.map { "Stream B: $it" }
)
}
.subscribe { Timber.i(it) }