Some RxJava2 Single/Maybe Gotchas

smallufo
10 min readNov 11, 2016

--

Single/Maybe 是 rxjava2 新增的類型,我對此做了一些嘗試。假設我要透過 http 從遠端透過 JSoup 去 parse 一個 URL 得到一份 Document:

private Single<Document> parse(String url) {
HttpClient httpClient = HttpClientBuilder.create().build();

HttpGet httpGet = new HttpGet(url);
HttpResponse response;
try {
response = httpClient.execute(httpGet);
return Single.just(Jsoup.parse(response.getEntity().getContent(), "utf-8", url));
} catch (Throwable e) {
return Single.error(e);
}
}

實際 parse 的 client 端如下:

@Test
public void testParse() {
Document doc = Single.just("http://non_exist_domain.com")
.flatMap((url) -> parse(url).retry(3))
.blockingGet();

logger.info("doc size = {}" , doc.outerHtml().length());
}

在此 client 端,我特別帶入一個不存在的網域 non_exist_domain.com , 我希望出錯時,能自動 retry 3 次(總共 run 4次),執行結果如下:

retrieving http://non_exist_domain.comjava.lang.RuntimeException: java.net.UnknownHostException: non_exist_domain.com: nodename nor servname provided, or not known

這是怎回事?為什麼只看到發送一次 http request?

我想到另一種做法:

@Test
public void testParse() {
Document doc = Single.just("http://non_exist_domain.com")
.flatMap((url) -> parse(url))
.retry(3)
.blockingGet();

logger.info("doc size = {}" , doc.outerHtml().length());
}

如此執行,的確是我要的結果:

retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
java.lang.RuntimeException: java.net.UnknownHostException: non_exist_domain.com

但這其實不是正確的解法,因為其 retry 接的是 flatMap() 之後,並非直接是 parse(url) 之後的 retry()。

網路上再仔細研究一下, Single / Maybe 其實是 Lazy 的,要能夠在每次 call 時,都能執行到內部的演算法,應該要改如此的寫法:

private Single<Document> parse2(String url) {
return Single.fromCallable(() -> {
HttpClient httpClient = HttpClientBuilder.create().build();

HttpGet httpGet = new HttpGet(url);
HttpResponse response;
logger.info("retrieving {}", url);
response = httpClient.execute(httpGet);
return Jsoup.parse(response.getEntity().getContent(), "utf-8", url);
});
}

如此一來,再改回原本的 client 端:

@Test
public void testParse() {
Document doc = Single.just("http://non_exist_domain.com")
.flatMap((url) -> parse2(url).retry(3))
.blockingGet();
logger.info("doc size = {}" , doc.outerHtml().length());
}

就可以得到我要的結果(實際執行四次)。

retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
java.lang.RuntimeException: java.net.UnknownHostException: non_exist_domain.com

同場加映,如果有多個 URL,想要依序 parse,並且過濾掉出錯的結果:

@Test
public void testParseSequentially() {
String[] urls = {
"http://google.com" ,
"http://non_exist_domain.com" ,
"http://yahoo.com" };

Observable.fromArray(urls)
.flatMap(url -> parse2(url) // 用的是第二版的演算法
.retry(3)
.toObservable()
.onExceptionResumeNext(Observable.empty())
)
.blockingIterable()
.forEach(doc -> logger.info("doc size = {}" , doc.outerHtml().length()));
}

結果如下:

retrieving http://google.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://yahoo.com
doc size = 10962
doc size = 413364

可以看到,程式嘗試呼叫錯誤的地方三次,直到沒辦法,才塞 empty。而最後結果,則是依序輸出 google / yahoo 的 size。

如果想要「非同步(async)」、「同時(Parallelly)」發送多個 http request 呢?

@Test
public void testSync2Async() {
String[] urls = {
"http://google.com" ,
"http://yahoo.com",
"http://nytimes.com",
"http://medium.com",
"http://alexa.com",
"http://non_exist_domain.com" ,
"http://amazon.com",
"http://github.com",
"http://facebook.com",
"http://youtube.com",
"http://wikipedia.org",
"http://live.com",
"http://twitter.com",
"http://baidu.com",
"http://ebay.com",
"http://reddit.com",
"http://msn.com",
"http://twitter.com",
"http://netflix.com",
};

Observable.fromArray(urls)
.flatMap(url -> parse2(url)
.retry(3)
.toObservable()
.subscribeOn(Schedulers.computation())
.onExceptionResumeNext(Observable.empty()))
.blockingIterable()
.forEach(doc -> logger.info("doc size = {}" , doc.outerHtml().length()));
}

輸出結果:

retrieving http://medium.com
retrieving http://yahoo.com
retrieving http://nytimes.com
retrieving http://google.com
doc size = 10948
retrieving http://alexa.com
retrieving http://github.com
doc size = 196867
retrieving http://amazon.com
doc size = 287629
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
retrieving http://non_exist_domain.com
doc size = 413020
retrieving http://non_exist_domain.com
retrieving http://youtube.com
doc size = 30829
retrieving http://facebook.com
doc size = 25227
retrieving http://baidu.com
doc size = 120
retrieving http://twitter.com
doc size = 5442
retrieving http://wikipedia.org
retrieving http://reddit.com
doc size = 656500
doc size = 97808
retrieving http://ebay.com
doc size = 1050
doc size = 295809
retrieving http://msn.com
doc size = 84767
retrieving http://netflix.com
doc size = 196203
doc size = 42625
doc size = 316558

如此就可同時 spawn 多個 Thread 來發送 http request .

--

--