Java Concurrency #15: JUC — 得到Thread的執行結果 FutureTask & CompletableFuture
最現代的Java管理Thread結果工具FutureTask與CompletableFuture
為何需要得到Thread的執行結果
先討論簡單的生活案例,每當我們煮湯的時候,都是經過下圖的流程,取水後燒水,在燒水的過程可以去洗菜切菜,看到水滾後在將菜放到水中開始煮湯。
在這情境中,人類就是Main Thread當我們將主要問題(煮湯)切成子問題(燒水、切菜、煮湯)後,就可以將子問題(燒水)丟給執行緒(瓦斯爐)去處理,當執行緒(瓦斯爐)處裡完畢並且告知(看到水滾)Main Thread(人類)後再將結果統合起來往下執行。
在前半部的文章,內容大多圍繞在讓執行緒平行處理和加鎖保護資料避免Race Condition,沒有說到太多協作的內容,而這一篇就是除了Thread原生地join fork方法之外,就是最基本的Multi Thread協作文章。
執行緒之間需要協作,就代表多個直行緒處理裡的資料是互相有關聯的,直行緒需要等待其他直行緒的處理結果才能往下執行,以Sort為例,在演算法書籍或是相關文章提到Merge Sort和Quick Sort都是以Main Thread處理當作範例,但其實在現實世界中這是不可能的,千百億的資料需要排序一定會動用到大量的執行緒,甚至分散式系統處理(例如Hadoop),而這些平行處理就需要在終點時彙整每個直行緒(或是電腦)的結果在往下執行,這也是將問題化成子問題最後再彙整最佳解這個過程最有效率的辦法。
接下來就來看看Java除了使用Thread提供的Join和Fork還有甚麼方法可以彙整Thread的結果
Future & Runnable & Callable
Future是Java線程池中基本的返回型別,簡單閱覽Java線程池的基本介面ExecutorService
可以看到最基本的Executor介面只提供了executor,啟動執行緒執行任務的方法,但在Java裡的線程池工具類都會實作ExecutorService介面,而其中大部分的submit任務提交都會回傳Future型別,讓開發者可以透過Future去得知執行緒運行狀況甚至結果。
先看Callable原始介面程式碼
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
從原始碼得知,Callable是提供一個回傳Template物件方法的介面,而且包含了FunctionInterface代表可以透過Java8 Lambda的方式使用,那這樣和Runnable又有何關係?
在往下介紹前先快速回顧一下Runnable的功能,Thread執行方法有二
- 可以直接繼承Thread改寫run方法
- Runnable透過建構者Dependency Injection的方式讓Thread使用。
可以參考下方連結
連結
那Runnable和Callable的關係,必需說到FutureTask這個物件,因為此處程式碼較多,這裡只關注在重點
先從簡單的Class Diagram開始
- Thread可以放置Runnable到建構者,使用Runable的run方法,
- Future介面提供了一些和執行緒處理相關的方法,而且這也是Java裡線程池返回處理結果的基本型別
- RunnableFuture擴增了Runnable和Future介面,這代表實作RunnableFuture的物件都可以丟進Thread讓執行緒執行,而且Runnable Future都有Future的方法
- FutureTask實作了RunnableFuture方法,並且可以在建構者放入實作Callable的物件
- Callable可以從call方法取回值
從最基本的Class Diagram分析,可以猜到Callable可以幫助開法者從Thread取回執行結果,再看看原始碼
從上方的原始碼可以觀察到
- FutureTask建構者可以選擇傳入Callable或是Runnable搭配一個Template Result,
- FututreTask實做了RunnableFuture介面,RunnableFutre在介紹類別圖時,有說到擴增了Runnable和Funture介面,這也代表可以直接在Thread建構者中丟入FutureTask
- FutureTask建構者注入Callable(Runnable, V reuslt),所以可以將結果從Thread拿回來。
簡單總結:
- Java透過實作Callable讓Main Thread可以接到Sub Thread的執行結果。
- Callable只是介面,還需要將此放入FutureTask建構者,才能讓Thread執行
- FutureTask實作了擴充Future和Runnable的RunnableFuture,這也代表FutureTask可以直接放入Thread的建構者中執行,也可以當作接受Executor(線程池)的返回結果
- Java Multi Thread的原始碼物件設計,真的是頂級與精隨的結合,每次看一次原始碼都可以感受到滿滿的物件設計精華
FutureTask
直接觀看範例程式碼。
public class FutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Thread Example
Callable<String> callable = () -> {
TimeUnit.SECONDS.sleep(3);
return "Hi";
}; FutureTask<String> f = new FutureTask<>(callable);
Thread t1 = new Thread(f);
t1.start();
// block
System.out.println(f.get()); // Thread Pool Example
ExecutorService executorService = Executors.newFixedThreadPool(1);
FutureTask<String> f2 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(4);
return "hi";
}); executorService.submit(f2);
// block
System.out.println(f2.get());
}
範例程式碼上半部是直接使用Thread執行而下半部則使用最簡單的線程池執行,在Thread執行的程式碼裡,寫了最完整的Callable物件並且丟入FutureTask中,因為FutureTask有實作Runnable介面所以可以讓Thread直接執行,而線程池我就直接偷懶了,直接用lambda實作的方式將Callable丟入FutureTask中,這也是業界最常使用也最直接的方式,因為範例程式碼需要模擬CPU花時間處理資料所以都Sleep了隨機定義的秒數。
切記!再取回FutureTask的時候(f.get),整個Main Thread會Block在這裡直到結果回傳,所以如何拿回Thread的處理結果也是需要經過設計和思考!
CompletableFuture
在上部分提到的FutureTask,提到了FutureTask在get執行結果時會將整個main thread block住,而且FutureTask拿回結果的方法非常簡單,只要輕鬆的f.get就可以,但是這也代表f.get的程式碼可能會散佈在各個角落,很容易就會在某個程式碼區塊導致整個系統block住,而且不易偵查到,這時候就需要CompletableFuture。
CompletableFuture實踐了Future和CompletionStage介面,並沒有包含Runnable介面,這也代表CompletableFuture不像是FutureTask可以直接讓執行緒執行,只能當作線程池的返回物件,而CompletableFuture實作了CompletionStage介面,CompletionStage提供了許多可以彌補FutureTask弱點的方法,包括調整Thread結果和控制執行順序的,由於方法過多所以下方的文章就點到為止,當有需要的時候再去看有甚麼方法就好,只要知道如果要整合Thread可以使用CompletionStage的繼承結構看有沒有適合的工具使用,
CompletableFuture基本使用方法
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
上方的程式碼得知CompletableFuture和FutureTask最大的差別,在於CompletableFuture需將Runnable或是線程池(Executor)放入建構者中,而FutureTask則相反,需將自己放入到Thread or Executor當作任務被執行。也可以猜到CompletableFuture物件裡,應會有線程池或是執行緒負責消化Runnable任務,而放入Executor的建構者也代表開發者可以替換原生地Executor/Thread使用自己指定的線程池(Executor)執行任務
CompletableFuture(CompletionStage)控制Thread方法主要有三種
- 串聯
- 整合
- 例外處理與Finally
串聯
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
其上方的輸入參數fn、cosumer和aciton,物件型態分別為Function/Cosumer/Runnable,Function和Consumer為Java 1.8Functional Programing的利器,而Runnable則是大家皆知的單方法執行介面,而回傳的CompletionStage則是CompletableFuture的實踐介面。
thenRun和thenAccept和CompletionStage的基本方法一樣,直接呼叫使用即可,容易讓人誤解的是Apply和Compose方法。
Apply和Compose方法非常相近,最大差別在於是否攤平其中的物件,這句非常難以理解,直接看實例
UserDataServcie是搜尋使用者資料的服務,這樣設計是為了讓系統主線程和DB搜尋線程分離,讓DB在搜尋資料同時主線程可以去做其他事情,直到資料回傳再繼續動作,直接看CompletableFutureExample中的composeAndApplyTest方法就可以察覺apply和compose的差異,apply的回傳物件CompletableFuture<CompletableFuture<UserRating>>,因為UserDataServcie也是回傳CompletableFuture所以回傳就會變成巢狀結構,只要看到巢狀結構就代表程是碼會越來越長,程式碼越來越長也代表可讀性就會降低,可讀性降低也跟著代表難以維護,而Compose的回傳物件則會直接幫開法者將回傳結果攤平,直接回傳CompletableFuture裡的物件。
整合
CompletableFuture整合執行緒分為And和OR,如同字義And就是彙整兩個結果,Or則是只要得到其中一個結果就馬上返回。
下方是And整合的程式碼,傳入的參數有stage、fn、consumer、action,特別要注意stage是需要整合的另一個CompletableFuture,而fn、consumer則是需要傳入雙參數的BiFunction和BiConsumer,因為是And需要整合兩個CompletableFuture的參數往下執行。
CompletionStage<R> thenCombine(stage, fn);
CompletionStage<R> thenCombineAsync(stage, fn);
CompletionStage<Void> thenAcceptBoth(stage, consumer);
CompletionStage<Void> thenAcceptBothAsync(stage, consumer);
CompletionStage<Void> runAfterBoth(stage, action);
CompletionStage<Void> runAfterBothAsync(stage, action);
而Or也是大同小異,只是fn、consumer則是單一參數,只要整合的兩個CompletableFuture其中一個有回傳,則就會馬上回傳結果,不會有匯聚的動作。
CompletionStage<R> applyToEither(stage, fn);
CompletionStage<R> applyToEitherAsync(stage, fn);
CompletionStage<Void> acceptEither(stage, consumer);
CompletionStage<Void> acceptEitherAsync(stage, consumer);
CompletionStage<Void> runAfterEither(stage, action);
CompletionStage<Void> runAfterEitherAsync(stage, action);
範例程式碼
使用起來非常簡單明瞭,其中orTest為了要突顯會先採取第一個回傳的結果,使用了Thread.sleep
例外處理與Finally
下方是CompletableFuture提供的例外處理與Finally方法,非常直觀與簡單使用,exceptionally在多線程執行有例外時,就會直接呼叫傳入的fn,而finally方法分為兩種主要傳入consumer或是fn的方法,其中在劃分成在同一個線程執行或是再開另一個線程執行(Async),注意此處的consumer與fn一樣需要傳入Bi種類,其中第二個參數為Throwable的子類別。
//例外處理
CompletionStage<Void> exceptionally(fn);
//Finally
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
範例程式碼
總結
本篇文章在開頭複習了Runnable/Callable,中間介紹了文章主軸Future與CompletableFuture,兩者都是Java整合與同步Thread的利器,Future的功能比較簡單,讓開發者有更多的開發空間,而CompletableFuture則是包山包海,可以直接使用,Java的Thread管理常常是其他語言攻擊的弱點,但在深入瞭解後會發現,Java其實也有跟著計算機架構一起進步,只是要看對Java探討的多深入了。