Single/Maybe 是 rxjava2 新增的類型,我對此做了一些嘗試。假設我要透過 http 從遠端透過 JSoup 去 parse 一個 URL 得到一份 Document:
private Single<Document> parse(String url) {
HttpClient httpClient = HttpClientBuilder.create().build();
HttpGet httpGet = new HttpGet(url);
HttpResponse response;
try {
response = httpClient.execute(httpGet);
return Single.just(Jsoup.parse(response.getEntity().getContent(), "utf-8", url));
} catch (Throwable e) {
return Single.error(e);
}
}
實際 parse 的 client 端如下:
@Test
public void testParse() {
Document doc = Single.just("http://non_exist_domain.com")
.flatMap((url) -> parse(url).retry(3))
.blockingGet();
logger.info("doc size = {}" , doc.outerHtml().length());
}
在此 client 端,我特別帶入一個不存在的網域 non_exist_domain.com , 我希望出錯時,能自動 retry 3 次(總共 run 4次),執行結果如下:
retrieving http://non_exist_domain.comjava.lang.RuntimeException: java.net.UnknownHostException: non_exist_domain.com: nodename nor servname provided, or not known
這是怎回事?為什麼只看到發送一次 http request?
我想到另一種做法:
@Test
public void testParse() {
Document doc = Single.just("http://non_exist_domain.com")
.flatMap((url) -> parse(url))
.retry(3)
.blockingGet();
logger.info("doc size = {}" , doc.outerHtml().length());
}
如此執行,的確是我要的結果:
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.comjava.lang.RuntimeException: java.net.UnknownHostException: non_exist_domain.com
但這其實不是正確的解法,因為其 retry 接的是 flatMap() 之後,並非直接是 parse(url) 之後的 retry()。
網路上再仔細研究一下, Single / Maybe 其實是 Lazy 的,要能夠在每次 call 時,都能執行到內部的演算法,應該要改如此的寫法:
private Single<Document> parse2(String url) {
return Single.fromCallable(() -> {
HttpClient httpClient = HttpClientBuilder.create().build();
HttpGet httpGet = new HttpGet(url);
HttpResponse response;
logger.info("retrieving {}", url);
response = httpClient.execute(httpGet);
return Jsoup.parse(response.getEntity().getContent(), "utf-8", url);
});
}
如此一來,再改回原本的 client 端:
@Test
public void testParse() {
Document doc = Single.just("http://non_exist_domain.com")
.flatMap((url) -> parse2(url).retry(3))
.blockingGet();
logger.info("doc size = {}" , doc.outerHtml().length());
}
就可以得到我要的結果(實際執行四次)。
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.comjava.lang.RuntimeException: java.net.UnknownHostException: non_exist_domain.com
同場加映,如果有多個 URL,想要依序 parse,並且過濾掉出錯的結果:
@Test
public void testParseSequentially() {
String[] urls = {
"http://google.com" ,
"http://non_exist_domain.com" ,
"http://yahoo.com" };
Observable.fromArray(urls)
.flatMap(url -> parse2(url) // 用的是第二版的演算法
.retry(3)
.toObservable()
.onExceptionResumeNext(Observable.empty())
)
.blockingIterable()
.forEach(doc -> logger.info("doc size = {}" , doc.outerHtml().length()));
}
結果如下:
retrieving http://google.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://yahoo.com
doc size = 10962
doc size = 413364
可以看到,程式嘗試呼叫錯誤的地方三次,直到沒辦法,才塞 empty。而最後結果,則是依序輸出 google / yahoo 的 size。
如果想要「非同步(async)」、「同時(Parallelly)」發送多個 http request 呢?
@Test
public void testSync2Async() {
String[] urls = {
"http://google.com" ,
"http://yahoo.com",
"http://nytimes.com",
"http://medium.com",
"http://alexa.com",
"http://non_exist_domain.com" ,
"http://amazon.com",
"http://github.com",
"http://facebook.com",
"http://youtube.com",
"http://wikipedia.org",
"http://live.com",
"http://twitter.com",
"http://baidu.com",
"http://ebay.com",
"http://reddit.com",
"http://msn.com",
"http://twitter.com",
"http://netflix.com",
};
Observable.fromArray(urls)
.flatMap(url -> parse2(url)
.retry(3)
.toObservable()
.subscribeOn(Schedulers.computation())
.onExceptionResumeNext(Observable.empty()))
.blockingIterable()
.forEach(doc -> logger.info("doc size = {}" , doc.outerHtml().length()));
}
輸出結果:
retrieving http://medium.com
retrieving http://yahoo.com
retrieving http://nytimes.com
retrieving http://google.com
doc size = 10948
retrieving http://alexa.com
retrieving http://github.com
doc size = 196867
retrieving http://amazon.com
doc size = 287629
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
doc size = 413020
retrieving http://non_exist_domain.com
retrieving http://youtube.com
doc size = 30829
retrieving http://facebook.com
doc size = 25227
retrieving http://baidu.com
doc size = 120
retrieving http://twitter.com
doc size = 5442
retrieving http://wikipedia.org
retrieving http://reddit.com
doc size = 656500
doc size = 97808
retrieving http://ebay.com
doc size = 1050
doc size = 295809
retrieving http://msn.com
doc size = 84767
retrieving http://netflix.com
doc size = 196203
doc size = 42625
doc size = 316558
如此就可同時 spawn 多個 Thread 來發送 http request .