RxSwiftにおけるマルチスレッドの理解を深める — Schedulerについて

この記事はeureka Native Advent Calendar 2017 — Qiitaの9日目の記事です。


こんにちは。Pairs JP事業部でスクラムマスター & iOS / Webエンジニアをしているです。今回はRxSwiftのSchedulerについてお話しします。Techブログで初めてUIKit以外の記事を書いています。


ことの発端はあるObservableをsubscribeしたとき、subscribeのクロージャは[unowned self]と、[weak self]のどちらにすべきかで社内で議論が起こりました。

API.Me.Get
.subscribe(onNext: { [unowned self] _ in
self.someMethod()
})
.disposed(by: self.disposeBag)

[weak self]で書けば安全ですが、毎回selfのアンラップをするのはしんどいので、unownedで書けるものは書いた方が良いと個人的に思っています。


そこで問題になってくるのがSchedulerの考えになってきます。本記事ではSchedulerについてわかりやすく体系的にまとめることを目指します。そして最後にdisposeとsubscribe(onNext:の順序、unownedで書ける条件について話したいと思います。想定読者はRxSwiftをすでに使用しており、ある程度の知識があるものとしています。

Versions

RxSwift 4.0
Swift4.0

目次

  • Schedulerとは
  • observeOnとsubscribeOnとは?
  • Schedulerの種類
  • unowned self / weak self
  • さいごに

Schedulerとは

RxSwiftにおいて、Schedulerの役割はObservableやOperator、Subscribeなどの処理をどのスレッド、キューで実行するかを決めることです。

observeOnとsubscribeOnとは?

SchedulerはObservableTypeのobserveOnsubscribeOnというメソッドに渡して使います。ReactiveXのドキュメントがわかりやすいです。以下の図はReactiveXのドキュメントに掲載されている図です。ストリームの横線の色が実行されているSchedulerを示しています。

observeOnはどのSchedulerで次のObserverに通知していくかを決定します。上の図で言うオレンジとピンクの線になります。


observeOnについては、こちらのサンプルコードがわかりやすいと思います。observeOnを呼ぶたびに次のOperatorとSubscribeの処理のスレッドが切り替わります。

someObservable
.observeOn(MainScheduler.instance) // これはメインスレッドで動く特別なScheduler
.map { _ in
// メインスレッドで実行される
}
.observeOn(backgroundScheduler) // これはバックグラウンドで動くように自身で作成したScheduler
.map { _ in
// バックグラウンドスレッドで実行される
}
.subscribe(onNext: { _ in
// バックグラウンドスレッドで実行される
})

subscribeOnは、ストリームの元となるObservableがどのSchedulerで処理されるかを決定します。上の図で言うブルーの線になります。ColdなObservableに対して有効です(subscribeされるまでObservableは実行されないため)。


subscribeOnについては、こちらのサンプルコードをご覧ください。とはいえ、subscribeOnはほとんど使われないと思います。

let someObservable = Observable<Int>.create({ (observer) -> Disposable in
// 下のコードでsubscribeOn(backgroundScheduler)を呼んでいるので、バックグラウンドスレッドで実行される
observer.on(.next(1))
observer.on(.completed)
return Disposables.create()
})
// 1. ここがメインスレッド or バックグラウンドスレッド
someObservable
.subscribeOn(backgroundScheduler)
.map { _ in
// 上記の1と同じスレッドで実行される
}
.subscribe(onNext: { _ in
// 上記の1と同じスレッドで実行される
})
observeOnで実験をしてみる
// ここはメインスレッド
let someObservable = Observable<Int>.create({ (observer) -> Disposable in
observer.on(.next(1))
observer.on(.completed)
return Disposables.create()
})
DispatchQueue.global(qos: .background).async { // [1]
_ = someObservable
.map { n in
// バックグラウンドスレッド(特に指定していないので[1]の影響)
}
.observeOn(MainScheduler.instance) // [2]
.map { n in
// メインスレッド([2]の影響)
}
.observeOn(backgroundScheduler) // [3]
.map { n in
// バックグラウンドスレッド([3]の影響)
}
.do(onNext: { _ in
// バックグラウンドスレッド([3]の影響)
})
.observeOn(MainScheduler.instance) // [4]
.subscribe(onNext: { _ in
// メインスレッド([4]の影響)
})
}
Schedulerの種類
RxSwift 4.0では現在8つのSchedulerが存在します。
  • MainScheduler
  • ConcurrentMainScheduler
  • SerialDispatchQueueScheduler
  • ConcurrentDispatchQueueScheduler
  • OperationQueueScheduler
  • CurrentThreadScheduler
  • HistoricalSchedulerTimeConverter
  • VirtualTimeScheduler
HistoricalSchedulerTimeConverterとVirtualTimeSchedulerはRxSwiftのテストコードで使用するSchedulerになるので、今回の記事では省略したいと思います。上の6つのSchedulerについて詳細を説明していきます。
メインスレッド
メインスレッドで実行するSchedulerは2つ存在します。
  • MainScheduler
  • ConcurrentMainScheduler
使い分けとしては、MainSchedulerがobserveOnに最適化されており、ConcurrentMainSchedulerがsubscribeOnに最適化されています。


それぞれシングルトンのインスタンスを持っており、これらを使用します。
  • MainScheduler.instance
  • MainScheduler.asyncInstance(ただのSerialDispatchQueueScheduler)
  • ConcurrentMainScheduler.instance
MainScheduler.instanceMainScheduler.asyncInstanceの違いは、
- MainScheduler.instanceはキューイングされているものがない場合、即時実行される
- MainScheduler.asyncInstanceは必ずDispatchQueue.mainでディスパッチされる
です。


また、DriverはMainScheduler.instanceでdrive, subscribe, bindのメソッドが実行されます。ControlPropertyはConcurrentMainScheduler.instanceでsubscribeOnされています。
DispatchQueue
SerialDispatchQueueScheduler
MainSchedulerはSerialDispatchQueueSchedulerのサブクラスです。MainScheduler.asyncInstanceもこのSerialDispatchQueueSchedulerになります。


DispatchQoSやDispatchQueueを渡してinitします。渡されたDispatchQueueがConcurrentでも中でSerial Queueが生成されており、必ずSerialになります。
let serialScheduler1 = SerialDispatchQueueScheduler(qos: .background)
let serialScheduler2 = SerialDispatchQueueScheduler(queue: DispatchQueue.main, internalSerialQueueName: "jp.eure.pairs.main.serial")
ConcurrentDispatchQueueScheduler
バックグラウンドスレッドで動作させたい場合は通常こちらのSchedulerを使用します。
let backgroundScheduler = ConcurrentDispatchQueueScheduler(qos: .background)
OperationQueue
あるOperationQueueでまとまった処理をしたい場合は、こちらのOperationQueueSchedulerを使います。特にConcurrentに処理する数を制限したいときにmaxConcurrentOperationCountを設定します。
let operationQueue = OperationQueue()
operationQueue.maxConcurrentOperationCount = 3
let operationScheduler = OperationQueueScheduler(operationQueue: operationQueue)
特に指定しない場合
特に指定がなければ、CurrentThreadSchedulerで動きます。こちらはSubscribeが実行されるスレッドでOperatorやSubscribeの処理が実行されます。
DispatchQueue.main.async {
_ = someObservable1
.subscribe(onNext: { _ in
// メインスレッド
})
}
DispatchQueue.global(qos: .background).async {
_ = someObservable2
.subscribe(onNext: { _ in
// バックグラウンドスレッド
})
}
unowned self / weak self
最初の問題に戻りましょう。
API.Me.Get
.subscribe(onNext: { [unowned self] _ in
self.someMethod()
})
.disposed(by: self.disposeBag)
この問題はdiseposeされた後に、 subscribe(onNext: が呼ばれることがなければ [unowned self]で良いことになります。


disposeの後にonNextが呼ばれないことが保証される場合は、
  • SerialなScheduler(MainScheduler, SerialDispatchQueueScheduler)でobserveOnしている
  • 同じQueueでdisposeされる(disposeBagが同じQueueでdeinit または 明示的にdispose)
となります。それ以外の条件ではdispose後にonNextが呼ばれる可能性があるので、必ず[weak self]にすべきです。
以下の例では基本的にMainSchedulerで動いているので、[unowned self]で良いです。
class ViewController: UIViewController {
let disposeBag = DisposeBag() // メインスレッドでdeinitされる
override func viewDidLoad() {
super.viewDidLoad()
// メインスレッド
API.Me.Get
.subscribe(onNext: { [unowned self] _ in
     // メインスレッド(observeOnが指定されていないので、CurrentThreadScheduler)
self.someMethod()
})
.disposed(by: self.disposeBag)
}
}
さいごに
今回はRxSwiftのSchedulerについてまとめました。Schedulerを意識せずにRxSwiftを使用することは可能ですが、意図しない動作をする可能性があるので危険です。また、disposeとonNextのタイミングについても理解をしておくと良いと思います。