Java Concurrency #12: JUC — 別讓Thread們失控 Executor & 線程池

Charlie Lee
Bucketing
Published in
8 min readAug 23, 2020

使用Thread Pool管理執行緒,不讓Thread到處亂竄問題讓自己無法好好睡覺

Photo by Chris A. Tweten on Unsplash

前言

根據前篇文章#11介紹的Semaphore可以幫助開發者實踐限流器功能,管理執行緒與控管資源,但實踐起來需要注意的細節會非常的多,但是Java有提供原生的執行緒池ThreadPoolExecutor,開發者可以直接使用,就可以管理執行緒的數量與作統一管理。

為何需要 Thread Pool?

  • 創建執行許與銷毀需要消耗系統資源(需要再Process空間增加Stack Counter 等資訊),執行緒池可以讓執行緒重複利用
  • 控制Thread的數量,過多的執行緒不但不會讓CPU效率增長,反而會應為太多的Context Switch(CPU切換不同執行緒時的行為),而讓CPU效率下降
  • 對Thread做管理,想一想之前的文章,每次創建完Thread他就像是野馬,不回同地往前衝,根本不知道他會到哪

粗估思考自己如何設計Thread Pool

在定義介面框架時,可以借助Semaphore的概念,釋出與索取權限,所以介面可能會如下

class ThreadPool {
Queue q;
int size;
ThreadPool(int permits) {
size = permits;
q = new LinkedList(permits);
}
Thread acquire() {
if (q.size() > size)
Thread.wait();
// 創建Thread
if (q.size == 0)
return new Thread();
return q.poll();
}
void release(Thread t) {
// 收回Thread
q.add(t);
Thread.notifyAll();
}
}

實際使用會如下

ThreadPool pool = new ThreadPool();
Thread t1 = pool.accquire();
t1.execute(() -> {
// do something
})

上方只是概念上的虛擬碼,先定義初始框架,接下來開始塞真實邏輯進去,在開始塞真實邏輯前要先介紹一個模式,

生產者與消費者模式,生產者消費者模式用真實世界情境去理解,就是到便當店買便當,老闆會不斷的做便當然後將便當放到籃子裡(假設只有一種口味)等著客人來買,客人到了如果籃子有便當則付錢拿走,如果老闆還在做那就等一下,等老闆放到籃子在拿走,而老闆和客人都是獨立的個體,老闆做便當和客人拿便當,如果在籃子還有便當的情況下雙方都可以專注做自己的事情(做便當/拿便當付錢),但是籃子沒便當時,則客人需要等待(Blocking)。

這就是生產者與消費者模式,轉換成程式碼,有兩個Thread,其中一個負責產資料另一個負責處理產的資料,而中間的籃子則可以使用Collection或是Array當作資料結構儲存。

回到Thread Pool的設計,Thread Pool的核心邏輯就是消費者與生產者模式,執行緒池在創建的時候需要標明有多少執行緒(消費者),而創建後這些執行緒會一直等著Collection(籃子)被裝任務並且執行,而外部需要使用的程式就像是老闆一樣,不斷提交任務讓Thread Pool執行。

程式碼如下

這邊使用的Collection是JUC中,執行緒安全的LinkedBlockingQueue,若take的時候沒有element則會等待直到有element為止。

接下來進入正題,Java的ThreadPoolExecutor

ThreadPoolExecutor

首先先看看ThreadPoolExecutor的建構者,來理解ThreadPoolExecutor可以幫我們做到甚麼

上方的建構者我們只討論最完整的建構方式,七個參數的ThreadPoolExecutor,因為其他的建構方式都只是最完整的簡化版

  • corePoolSize: 此Thread Pool會有多少個Thread保持長期存活 (就算沒任務也會存在)
  • maximumPoolSize: 最多有多少個Thread可以存活 (core + 額外創建的Thread)
  • keepAliveTime: 非核心Thread沒工作後多久會被淘汰
  • TimeUnit: keepAliveTime的時間單位 (具體可看Java TimeUnit enum)
  • BlockingQueue<Runnable>: 此Thread Pool用來放任務的Blocking Collection

BlockingQueue主要使用有四種

  • LinkedBlockingQueue: 內部結構是LinkedList
  • ArrayBlockingQueue: 內部結構為Array
  • SynchronousQueue: 同步對列,一個需求必須被拿走後才能再放需求(裡面沒有元素排隊柱列,1:1運作)
  • DelayQueue: 當時間到了裡面的元素才可以被拿走(如果沒有元素則不可拿)

回到ThreadPool的其餘參數介紹

  • ThreadFactory: ThreadPool創建的工廠(如果不設定則就如同new Thread直接創建),如果有設定大多都是用來前處理(設定Thread是否為Daemon或是ThreadGroup與權限等級和Thead name)
  • RejectedExecutionHandler: 當Q不能再放入元素或任務時的後續處理,預設提供的策略有四種

CallerRunsPolicy: 丟任務進來的Thread自己執行此任務
AbortPolicy: 不接受任務並且丟出Exeception
DiscardPolicy: 直接丟掉這個被拒絕的任務
DiscardOldestPolicy: 丟掉最老(最先進到Thread Pool的任務)讓新的任務近來

ThreadPoolExecutor實踐範例

以下是範例

在此範例使用了2個核心Thread,當核心執行緒處理不過來時會增加輔助執行緒最多到10個,而每個輔助執行緒沒工作時只會存活10秒,Task Queue使用LinkedBlockingQueue,ThreadFactory主要是將Thread Pool裡的Thread都聚集在同一個Group方便管理,而RejetedHanler使用最隨便的DiscardPolicy(切記實際使用時要隊部可進入Task Q的任務作處理,不然就要打包回家了)

方便的Executors

Java為了讓開發者可以方便的建造ThreadPool還提供了Executors一系列的靜態方法創建Thread Pool,他幫助開發者省略許多ThreadPool要創建的細節,隨call隨用。

以下是Executors提供的常用創建方法

Executors提供的方法都是讓開發者簡單呼叫,但底層還是實作回傳ThreadPoolExecutor,只是根據代入的參數不同再次命名。接下來透過代入參數分別討論上面常用的創見方法

  • newCachedThreadPool
  1. 核心Thread Core為0個,代表此創建方法並不是完全的Thread Pool,更像是幫助開發者控管Thread的工具
  2. 使用SynchronousQueue,不管是放入Task或是拿取都是Blocking操作(有相對應的動作才能執行)
  3. 沒有限制Maxminmun數量,代表可以無限使用,有可能產生過多的Thread丟出OOM例外

所以此ThreadPool可能比較適合短期內會有快速操作的任務,可以使用此Pool來管理這些任務。

  • newFixedThreadPool
  1. core數量與max一樣,代表創建後能執行的任務就是這樣
  2. 使用 LinkedBlockingQueue<Runnable>當作task存放的資料結構,沒有限制數量一樣會有OOM風險

和CahcedThreadPool相比,這算是最基本的ThreadPool,不過要切記塞入過多Task的話會有OOM風險

  • newSingleThreadExecutor
  1. 基本組成與newFixedThreadPool一致,最大的差別在於core與max都為1,這代表需要強制使用FIFO(先來先到)策略執行task。
  • newScheduledThreadPool
  1. 主要支持周期性任務(批次處裡)
  2. 使用了DelayedWorkQueue來實踐週期處理

以上就是四種Executors常用的創建ThreadPool方法,切記Executors創建的ThreadPool都沒有嚴格限制Task Q的數量,都有可能造成OOM,最嚴謹的方法還是使用ThradPoolExecutor自己乖乖填參數。(在之後的Java版本,都有對這些方法做參數的更新,有興趣讀者在自己看原始碼瞜)

範例程式碼 GitHub

總結

本篇文章主要介紹如何使用Java提供的ThreadPoolExecutor創建執行緒池,也提到了簡單的Executors靜態方法創建,但是要小心,執行緒池不能在沒有思考就直接不設定任務或執行緒上限,硬體資源是有限的可能會OOM讓系統crash。

--

--