在实践中应用 RxSwift — 并发

最近项目中遇到了一些网络并发的需求。
下载几张图片,将结果一并展示。
验证多个 Code 。(此时后端仅提供了验证一个 Code 的 API)

本文便来尝试用 Rx 解决这个问题,其实只是几行代码的事情

贴出我们最终的需求:批量请求 API ,在全部请求完毕后,获取全部回调结果,此处调回结果顺序和请求 API 顺序相同。

并发请求

我们用 httpbin.org API 为例。

连续请求如下 API :

https://httpbin.org/get?foo=1
https://httpbin.org/get?foo=2
https://httpbin.org/get?foo=3
https://httpbin.org/get?foo=4
https://httpbin.org/get?foo=5
https://httpbin.org/get?foo=6
https://httpbin.org/get?foo=7

无序

无序的请求很简单。

[1, 2, 3, 4, 5, 6, 7].toObservable()
.map(convertToRequest)
.flatMap(NSURLSession.sharedSession().rx_data)
.map(prase)
.map { $0["args"]["foo"].stringValue }
.subscribeNext {
print($0)
}
.addDisposableTo(disposeBag)

核心就是通过 flatMap 即 flatMap(NSURLSession.sharedSession().rx_data) 完成异步请求,当然,上面的代码打印的结果基本上是乱序的,很明显这还不是我们需要的,但至少并发请求完成了。

有序

为了完成有序的请求,我们需要使用 concat

这里不打算详细解释 concat ,用一句话理解连接多个 Observable 的值/合并多个 Observable

这里我们的网络请求,每个 Observable 只发射一个值,自然就是将每一个网络回调按顺序连接起来。

[1, 2, 3, 4, 5, 6, 7].toObservable()
.map(convertToRequest)
.map(NSURLSession.sharedSession().rx_data)
.concat()
.map(prase)
.map { $0["args"]["foo"].stringValue }
.subscribeNext {
print($0)
}
.addDisposableTo(disposeBag)

需要注意的是我们将 .flatMap(NSURLSession.sharedSession().rx_data) 替换成了 .map(NSURLSession.sharedSession().rx_data) 。我们需要按顺序传递 Observable ,并 concat 起来,使用 flatMap 就还是传递 Observable 的值了。

此时打印的结果就是。

1
2
3
4
5
6
7

reduce 结果

接下来的需求就是将所有的网络请求收集起来,使用 reduce 即可。

[1, 2, 3, 4, 5, 6, 7].toObservable()
.map(convertToRequest)
.map(NSURLSession.sharedSession().rx_data)
.concat()
.map(prase)
.map { $0["args"]["foo"].stringValue }
.reduce(Array<String>()) { (acc, x) in
return acc + [x]
}
.subscribeNext {
print($0)
}
.addDisposableTo(disposeBag)

此时结果就变成了。

["1", "2", "3", "4", "5", "6", "7"]

yooooo 这正是我们想要的结果。

此处需要注意的是,有一个有趣的方法可以用在这里,让代码更加易读。toArray。

[1, 2, 3, 4, 5, 6, 7].toObservable()
.map(convertToRequest)
.map(NSURLSession.sharedSession().rx_data)
.concat()
.map(prase)
.map { $0["args"]["foo"].stringValue }
.toArray()
.reduce(Array<String>(), accumulator: +)
.subscribeNext {
print($0)
}
.addDisposableTo(disposeBag)

并发量

有时我们可能还需要考虑并发量,用 merge 可以完成这个需求,添加最大并发量参数即可,比如最大并发量是 3 。

[1, 2, 3, 4, 5, 6, 7].toObservable()
.map(convertToRequest)
.map(NSURLSession.sharedSession().rx_data)
.merge(maxConcurrent: 3)
.map(prase)
.map { $0["args"]["foo"].stringValue }
.subscribeNext {
print($0)
}
.addDisposableTo(disposeBag)

至此,并发的问题就到这里。

实际应用的坑

在看完上面的内容,真正应用到项目中可能并不如意,比如下面这段代码。

concurrencyButton
.rx_tap
.flatMap { [1, 2, 3, 4, 5, 6, 7].toObservable() }
.map(convertToRequest)
.map(NSURLSession.sharedSession().rx_data)
.concat()
.map(prase)
.map { $0["args"]["foo"].stringValue }
.toArray()
.reduce(Array<String>(), accumulator: +)
.subscribeNext {
print($0)
}
.addDisposableTo(disposeBag)

这段代码是永远收不到结果的,因为 rx_tap 是个无终止序列,即该 Observable 永远不会调用 onCompleted 。而 reduce 正是处理从该 Observable 变化的结果,除非中间变化中出现 error ,否则 reduce 永远不会传值。

reduce 谁?[1, 2, 3, 4, 5, 6, 7].toObservable() 。代码改成下面的样子即可。

concurrencyButton
.rx_tap
.flatMap {
[1, 2, 3, 4, 5, 6, 7]
.toObservable()
.map(convertToRequest)
.map(NSURLSession.sharedSession().rx_data)
.concat()
.map(prase)
.map { $0["args"]["foo"].stringValue }
.toArray()
.reduce(Array<String>(), accumulator: +)
}
.subscribeNext {
print($0)
}
.addDisposableTo(disposeBag)

总结

核心就是通过组合 merge reduce concat 等方法。