Hot Observable和Cold Observable

靛青K
靛青K
Oct 21, 2016 · 15 min read

Observable 数据流有两种类型:Hot 和 Cold 。

Cold Observable

当订阅者订阅该 Observable 时,Observable 才执行发射数据的代码。有一个需要特别留意的细节,Observable 会为每个订阅者单独执行一次发射数据的代码。换句话说,每个订阅者都会独立的收到订阅者发射的数据。

我们来用实际的代码理解一下上述文字的含义。

var createTimes = 0

let intSequence = Observable<Int>
.create { (observer) -> Disposable in
createTimes += 1
print("Create \(createTimes).")
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()

return Disposables.create()
}

代码中发射了三个数据,分别是 1、2、3,此外我们定义了一个 createTimes 用来标记 create 中代码执行的次数。

当没有订阅者时,create 中的代码不会执行,即没有任何输出。当添加第一个订阅者时。

intSequence
.subscribe(onNext: { value in
print("Subscribe 1 Next \(value)")
})

可以看到输出结果为:

Create 1.
Subscribe 1 Next 1
Subscribe 1 Next 2
Subscribe 1 Next 3

这说明了 当订阅者订阅该 Observable 时,Observable 才执行发射数据的代码。

当我们再增加一个订阅者时。

intSequence
.subscribe(onNext: { value in
print("Subscribe 2 Next \(value)")
})

此时输出结果可能和我们最初的预期不同,这里增加了个 Create 2 。

Create 1.
Subscribe 1 Next 1
Subscribe 1 Next 2
Subscribe 1 Next 3
Create 2.
Subscribe 2 Next 1
Subscribe 2 Next 2
Subscribe 2 Next 3

这说明 create 中的代码执行了两次,而订阅者刚好有两个,即 Observable 会为每个订阅者单独执行一次发射数据的代码。

interval 也是一个 Code Observable ,比如下面这段代码。

let intervalSequence = Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.take(3)
intervalSequence
.subscribe(onNext: { value in
print("Subscribe 1 Next \(value)")
})
sleep(1) // 暂停一秒,第二个订阅者比第一个订阅者晚一秒订阅
intervalSequence
.subscribe(onNext: { value in
print("Subscribe 2 Next \(value)")
})

输出结果为:

Subscribe 1 Next 0
Subscribe 1 Next 1
Subscribe 2 Next 0
Subscribe 1 Next 2
Subscribe 2 Next 1
Subscribe 2 Next 2

可以看到第二个订阅者收到的数据总是晚于第一个订阅者一个数据。

2.5.2Hot Observable

Hot Observable 从创建后就开始发射数据,不考虑是否有订阅者订阅。一个经典的例子是点击 Button 。不管有没有订阅者订阅点击事件,点击事件都会发生。当有订阅者订阅后,Observable 会将订阅后的点击事件发送给 Observer ,即订阅者接收不到之前的点击事件。需要注意的是,当 Observer 取消订阅时,点击事件仍然会继续下去。

比如下面这段订阅点击事件代码。

let tap = button.rx.tap
tap.subscribe(onNext: {
print("Tap 1")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
tap.subscribe(onNext: {
print("Tap 2")
})
}

3 秒前点击 Button 只能收到 Tap 1 的打印,比如 3 秒前,我们点击了三次 Button ,此后又点击了两次。输出结果如下。

Tap 1
Tap 1
Tap 1
Tap 1
Tap 2
Tap 1
Tap 2

3 秒前第二个订阅者并没有订阅 tap ,故不会有 Tap 2 的输出。3 秒后第二个订阅者订阅了 tap ,此时点击 Button ,可以看到打印结果多了 Tap 2。但与 Code Observable 不同的是,第二个订阅者不会收到之前三次的点击事件

比如将 button.rx.tap 替换成 Observable<Int>.interval(1, scheduler: MainScheduler.instance).take(5).map { _ in } ,可以看到打印结果有 5 个 Tap 2 。

Tap 1
Tap 1
Tap 1
Tap 1
Tap 2
Tap 1
Tap 2
Tap 2
Tap 2
Tap 2

Cold Observable 和 Hot Observable 之间的相互转换

使用 publish、replay、multicast 操作符可以将 Cold Observable 转换成 Hot Observable 。

extension ObservableType {
public func publish() -> RxSwift.ConnectableObservable<Self.E>
}
extension ObservableType {
public func replay(_ bufferSize: Int) -> RxSwift.ConnectableObservable<Self.E>
}

publish

publish 返回一个 ConnectableObservable ,这是 Observable 的子类,多了两个方法,分别是 connect 和 refCount 。

Connect

只有调用 ConnectableObservable 的 connect 方法时,才会触发数据流的执行。如果不调用 connect ,订阅者则永远收不到值。

let tap = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.take(5)
.map { _ in }
.publish()
tap.connect()
tap.subscribe(onNext: {
print("Tap 1")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
tap.subscribe(onNext: {
print("Tap 2")
})
}

打印结果会和和之前点击场景的一样。第二个订阅者无法收到前三个值。

Disconnect

调用 connect 后,返回一个 Disposable ,和調用 subscribe 一样,返回一个 Disposable 。在 connect 中,我们可以调用 dispose 进行 disconnect ,即取消连接。取消连接后,停止向 Observer 发送数据。需要注意的是, Observer 并没有取消订阅,只是接收不到数据。

再次调用 connect 会重新创建一次订阅, Observer 会重新接收 Observable 的数据。我们在 3 秒后 disconnect 在 connect 。

let tap = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.take(5)
.map { _ in }
.publish()
let s = tap.connect()
tap.subscribe(onNext: {
print("Tap 1")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
s.dispose() // Disconnect
tap.connect() // Connect
tap.subscribe(onNext: {
print("Tap 2")
})
}

打印结果如下:

Tap 1
Tap 1
Tap 1 // 此时 Disconnect 再 Connect
Tap 1
Tap 2
Tap 1
Tap 2
Tap 1
Tap 2
Tap 1
Tap 2
Tap 1
Tap 2

可以看到 Tap 1 一共输出了 8 次。在输出第 3 个 Tap 1 后,我们调用了 disconnect 和 connect ,订阅重新开始,此后第 1 个订阅者重新收到 5 个数据。

RefCount

调用 refCount 会返回一个 Observable ,将 ConnectableObservable 转换成 Observable 。在这一过程中,实现了自动管理 connect 和 disconnect 。即只有有订阅者 Observable 就保持 connect 状态,没有订阅者订阅时,改为 disconnect 。这里我们从源码解释会更好理解一些。

在有订阅者订阅时,会执行下面这段代码。

if _parent._count == 0 {
_parent._count = 1
_parent._connectableSubscription = _parent._source.connect()
}
else {
_parent._count = _parent._count + 1
}

Observer 数量从 0 到 1 时,会调用 connect 方法。

在 Observer 解除订阅时,会执行下面这段代码。

if self._parent._count == 1 {
self._parent._connectableSubscription!.dispose()
self._parent._count = 0
self._parent._connectableSubscription = nil
}
else if self._parent._count > 1 {
self._parent._count = self._parent._count - 1
}

如果 Observer 数量从 1 到 0 时,则调用 dispose 方法,即 disconnect 。

这就完成了只要有订阅者订阅,则为 connect 状态,不再有订阅者订阅时,为 disconnect 状态。

replay

继续之前的例子,我们将 publish 改为 replay(1) ,为了更好的描述 replay 的含义,我们去掉 map { _ in } 方法,将递增的值传递下去。

let tap = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.take(5)
.replay(1)
tap.connect()
tap.subscribe(onNext: { value in
print("1 Subscribe: \(value)")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
tap.subscribe(onNext: { value in
print("2 Subscribe: \(value)")
})
}

输出结果多了一个 2 Subscribe: 2 。

1 Subscribe: 0
1 Subscribe: 1
1 Subscribe: 2
2 Subscribe: 2 // 调用 publish 则不会打印该行
1 Subscribe: 3
2 Subscribe: 3
1 Subscribe: 4
2 Subscribe: 4

相比 publish 不同的是,我们对第 2 个订阅者多发射了一个 2 ,即重放( replay )了一个值。 replay 的个数是可以通过参数指定的,比如改为 replay(2) ,输出结果则会在 2 Subscribe: 2 前面多一个 2 Subscribe: 1 。 replay 会缓存指定个数的值,将这些值发送给订阅它的 Observer。

需要注意的是,当我们调用 replay(0) 时,效果等同于 publish 。

当我们需要缓存全部值时,我们可以调用 replayAll。

multicast

我们还可以以 Subject 为媒介,通过 multicast 方法将一个 Observable 转换成一个 ConnectableObservable 。

比如调用一个 multicast(PublishSubject()) ,就是以 PublishSubject 为媒介,转换成 ConnectableObservable 。

let tap = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.take(5)
.multicast(PublishSubject()) // 等价于 publish()
tap.connect()
tap.subscribe(onNext: { value in
print("1 Subscribe: \(value)")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
tap.subscribe(onNext: { value in
print("2 Subscribe: \(value)")
})
}

第 2 个订阅者不会收到 0、1、2 三个值,这与 publish() 效果是完全一样的。

需要指出的是, publish() 就是调用了 multicast(PublishSubject()) 。

extension ObservableType {
public func publish() -> ConnectableObservable<E> {
return self.multicast(PublishSubject())
}
}

而 replay 则是借助 ReplaySubject 完成的。

extension ObservableType {
public func replay(_ bufferSize: Int) -> ConnectableObservable<E> {
return self.multicast(ReplaySubject.create(bufferSize: bufferSize))
}
}

multicast 接收一个 Subject ,返回一个 ConnectableObservableAdapter ,在这个类中有下面这段代码。

let disposable = _source.subscribe(_subject.asObserver())

传入的 Subject 被做为 Observer ,订阅原 Observable 。可以理解为,我们将一个 Observable 转换成了 Observer ,对外暴露了一个 ConnectableObservable 。

multicast 还有一个方法,注意该方法返回的不再是 ConnectableObservable ,只是一个 Observable 。

extension ObservableType {
public func multicast<S : SubjectType, R where S.SubjectObserverType.E == E>(_ subjectSelector: @escaping () throws -> S, selector: @escaping (RxSwift.Observable<S.E>) throws -> RxSwift.Observable<R>) -> RxSwift.Observable<R>
}

multicast 允许我们手动传入一个返回 Subject 的闭包和一个 selector ,这个 selector 不同于 Cocoa 中的 Selector ,这里的只是提供了一个对 multicast 后的 Observable 进行一次处理的机会。

参考资源

  1. http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html
  2. http://reactivex.io/documentation/operators/refcount.html
  3. http://reactivex.io/documentation/operators/publish.html
  4. http://reactivex.io/documentation/operators/replay.html
  5. http://blog.chengyunfeng.com/?p=975

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade