Hot Observable和Cold Observable

靛青K
15 min readOct 21, 2016

--

在 RAC 中我们通常称之为热信号和冷信号。

Observable 数据流有两种类型:Hot 和 Cold 。

Cold Observable

当订阅者订阅该 Observable 时,Observable 才执行发射数据的代码。有一个需要特别留意的细节,Observable 会为每个订阅者单独执行一次发射数据的代码。换句话说,每个订阅者都会独立的收到订阅者发射的数据。

我们来用实际的代码理解一下上述文字的含义。

var createTimes = 0

let intSequence = Observable<Int>
.create { (observer) -> Disposable in
createTimes += 1
print("Create \(createTimes).")
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()

return Disposables.create()
}

代码中发射了三个数据,分别是 1、2、3,此外我们定义了一个 createTimes 用来标记 create 中代码执行的次数。

当没有订阅者时,create 中的代码不会执行,即没有任何输出。当添加第一个订阅者时。

intSequence
.subscribe(onNext: { value in
print("Subscribe 1 Next \(value)")
})

可以看到输出结果为:

Create 1.
Subscribe 1 Next 1
Subscribe 1 Next 2
Subscribe 1 Next 3

这说明了 当订阅者订阅该 Observable 时,Observable 才执行发射数据的代码。

当我们再增加一个订阅者时。

intSequence
.subscribe(onNext: { value in
print("Subscribe 2 Next \(value)")
})

此时输出结果可能和我们最初的预期不同,这里增加了个 Create 2 。

Create 1.
Subscribe 1 Next 1
Subscribe 1 Next 2
Subscribe 1 Next 3
Create 2.
Subscribe 2 Next 1
Subscribe 2 Next 2
Subscribe 2 Next 3

这说明 create 中的代码执行了两次,而订阅者刚好有两个,即 Observable 会为每个订阅者单独执行一次发射数据的代码。

interval 也是一个 Code Observable ,比如下面这段代码。

注:interval 是一个创建 Observable 的方法,使用 interval 可以创建一个按照指定间隔时间发射递增序列的 Observable ,比如Observable<Int>.interval(1, scheduler: MainScheduler.instance) 是每隔一秒发射一个整数,这个整数逐渐递增。即每隔一秒发射 1,2,3,4,5…

let intervalSequence = Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.take(3)
intervalSequence
.subscribe(onNext: { value in
print("Subscribe 1 Next \(value)")
})
sleep(1) // 暂停一秒,第二个订阅者比第一个订阅者晚一秒订阅
intervalSequence
.subscribe(onNext: { value in
print("Subscribe 2 Next \(value)")
})

输出结果为:

Subscribe 1 Next 0
Subscribe 1 Next 1
Subscribe 2 Next 0
Subscribe 1 Next 2
Subscribe 2 Next 1
Subscribe 2 Next 2

可以看到第二个订阅者收到的数据总是晚于第一个订阅者一个数据。

2.5.2Hot Observable

Hot Observable 从创建后就开始发射数据,不考虑是否有订阅者订阅。一个经典的例子是点击 Button 。不管有没有订阅者订阅点击事件,点击事件都会发生。当有订阅者订阅后,Observable 会将订阅后的点击事件发送给 Observer ,即订阅者接收不到之前的点击事件。需要注意的是,当 Observer 取消订阅时,点击事件仍然会继续下去。

比如下面这段订阅点击事件代码。

let tap = button.rx.tap
tap.subscribe(onNext: {
print("Tap 1")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
tap.subscribe(onNext: {
print("Tap 2")
})
}

3 秒前点击 Button 只能收到 Tap 1 的打印,比如 3 秒前,我们点击了三次 Button ,此后又点击了两次。输出结果如下。

Tap 1
Tap 1
Tap 1
Tap 1
Tap 2
Tap 1
Tap 2

3 秒前第二个订阅者并没有订阅 tap ,故不会有 Tap 2 的输出。3 秒后第二个订阅者订阅了 tap ,此时点击 Button ,可以看到打印结果多了 Tap 2。但与 Code Observable 不同的是,第二个订阅者不会收到之前三次的点击事件

比如将 button.rx.tap 替换成 Observable<Int>.interval(1, scheduler: MainScheduler.instance).take(5).map { _ in } ,可以看到打印结果有 5 个 Tap 2 。

Tap 1
Tap 1
Tap 1
Tap 1
Tap 2
Tap 1
Tap 2
Tap 2
Tap 2
Tap 2

Cold Observable 和 Hot Observable 之间的相互转换

使用 publish、replay、multicast 操作符可以将 Cold Observable 转换成 Hot Observable 。

extension ObservableType {
public func publish() -> RxSwift.ConnectableObservable<Self.E>
}
extension ObservableType {
public func replay(_ bufferSize: Int) -> RxSwift.ConnectableObservable<Self.E>
}

publish

publish 返回一个 ConnectableObservable ,这是 Observable 的子类,多了两个方法,分别是 connect 和 refCount 。

Connect

只有调用 ConnectableObservable 的 connect 方法时,才会触发数据流的执行。如果不调用 connect ,订阅者则永远收不到值。

let tap = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.take(5)
.map { _ in }
.publish()
tap.connect()
tap.subscribe(onNext: {
print("Tap 1")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
tap.subscribe(onNext: {
print("Tap 2")
})
}

打印结果会和和之前点击场景的一样。第二个订阅者无法收到前三个值。

Disconnect

调用 connect 后,返回一个 Disposable ,和調用 subscribe 一样,返回一个 Disposable 。在 connect 中,我们可以调用 dispose 进行 disconnect ,即取消连接。取消连接后,停止向 Observer 发送数据。需要注意的是, Observer 并没有取消订阅,只是接收不到数据。

再次调用 connect 会重新创建一次订阅, Observer 会重新接收 Observable 的数据。我们在 3 秒后 disconnect 在 connect 。

let tap = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.take(5)
.map { _ in }
.publish()
let s = tap.connect()
tap.subscribe(onNext: {
print("Tap 1")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
s.dispose() // Disconnect
tap.connect() // Connect
tap.subscribe(onNext: {
print("Tap 2")
})
}

打印结果如下:

Tap 1
Tap 1
Tap 1 // 此时 Disconnect 再 Connect
Tap 1
Tap 2
Tap 1
Tap 2
Tap 1
Tap 2
Tap 1
Tap 2
Tap 1
Tap 2

可以看到 Tap 1 一共输出了 8 次。在输出第 3 个 Tap 1 后,我们调用了 disconnect 和 connect ,订阅重新开始,此后第 1 个订阅者重新收到 5 个数据。

需要注意的是,因为此处源订阅者是个 Cold Observable ,所以数据流会重新执行一遍。

RefCount

调用 refCount 会返回一个 Observable ,将 ConnectableObservable 转换成 Observable 。在这一过程中,实现了自动管理 connect 和 disconnect 。即只有有订阅者 Observable 就保持 connect 状态,没有订阅者订阅时,改为 disconnect 。这里我们从源码解释会更好理解一些。

在有订阅者订阅时,会执行下面这段代码。

if _parent._count == 0 {
_parent._count = 1
_parent._connectableSubscription = _parent._source.connect()
}
else {
_parent._count = _parent._count + 1
}

Observer 数量从 0 到 1 时,会调用 connect 方法。

在 Observer 解除订阅时,会执行下面这段代码。

if self._parent._count == 1 {
self._parent._connectableSubscription!.dispose()
self._parent._count = 0
self._parent._connectableSubscription = nil
}
else if self._parent._count > 1 {
self._parent._count = self._parent._count - 1
}

如果 Observer 数量从 1 到 0 时,则调用 dispose 方法,即 disconnect 。

这就完成了只要有订阅者订阅,则为 connect 状态,不再有订阅者订阅时,为 disconnect 状态。

replay

继续之前的例子,我们将 publish 改为 replay(1) ,为了更好的描述 replay 的含义,我们去掉 map { _ in } 方法,将递增的值传递下去。

let tap = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.take(5)
.replay(1)
tap.connect()
tap.subscribe(onNext: { value in
print("1 Subscribe: \(value)")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
tap.subscribe(onNext: { value in
print("2 Subscribe: \(value)")
})
}

输出结果多了一个 2 Subscribe: 2 。

1 Subscribe: 0
1 Subscribe: 1
1 Subscribe: 2
2 Subscribe: 2 // 调用 publish 则不会打印该行
1 Subscribe: 3
2 Subscribe: 3
1 Subscribe: 4
2 Subscribe: 4

相比 publish 不同的是,我们对第 2 个订阅者多发射了一个 2 ,即重放( replay )了一个值。 replay 的个数是可以通过参数指定的,比如改为 replay(2) ,输出结果则会在 2 Subscribe: 2 前面多一个 2 Subscribe: 1 。 replay 会缓存指定个数的值,将这些值发送给订阅它的 Observer。

需要注意的是,当我们调用 replay(0) 时,效果等同于 publish 。

当我们需要缓存全部值时,我们可以调用 replayAll。

multicast

我们还可以以 Subject 为媒介,通过 multicast 方法将一个 Observable 转换成一个 ConnectableObservable 。

比如调用一个 multicast(PublishSubject()) ,就是以 PublishSubject 为媒介,转换成 ConnectableObservable 。

let tap = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.take(5)
.multicast(PublishSubject()) // 等价于 publish()
tap.connect()
tap.subscribe(onNext: { value in
print("1 Subscribe: \(value)")
})
DispatchQueue.main
.asyncAfter(deadline: DispatchTime.now() + 3) {
tap.subscribe(onNext: { value in
print("2 Subscribe: \(value)")
})
}

第 2 个订阅者不会收到 0、1、2 三个值,这与 publish() 效果是完全一样的。

需要指出的是, publish() 就是调用了 multicast(PublishSubject()) 。

extension ObservableType {
public func publish() -> ConnectableObservable<E> {
return self.multicast(PublishSubject())
}
}

而 replay 则是借助 ReplaySubject 完成的。

extension ObservableType {
public func replay(_ bufferSize: Int) -> ConnectableObservable<E> {
return self.multicast(ReplaySubject.create(bufferSize: bufferSize))
}
}

multicast 接收一个 Subject ,返回一个 ConnectableObservableAdapter ,在这个类中有下面这段代码。

let disposable = _source.subscribe(_subject.asObserver())

传入的 Subject 被做为 Observer ,订阅原 Observable 。可以理解为,我们将一个 Observable 转换成了 Observer ,对外暴露了一个 ConnectableObservable 。

multicast 还有一个方法,注意该方法返回的不再是 ConnectableObservable ,只是一个 Observable 。

extension ObservableType {
public func multicast<S : SubjectType, R where S.SubjectObserverType.E == E>(_ subjectSelector: @escaping () throws -> S, selector: @escaping (RxSwift.Observable<S.E>) throws -> RxSwift.Observable<R>) -> RxSwift.Observable<R>
}

multicast 允许我们手动传入一个返回 Subject 的闭包和一个 selector ,这个 selector 不同于 Cocoa 中的 Selector ,这里的只是提供了一个对 multicast 后的 Observable 进行一次处理的机会。

参考资源

  1. http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html
  2. http://reactivex.io/documentation/operators/refcount.html
  3. http://reactivex.io/documentation/operators/publish.html
  4. http://reactivex.io/documentation/operators/replay.html
  5. http://blog.chengyunfeng.com/?p=975

--

--