前言:在最新的阿裡規範中強制使用ThreadPoolExecutor方式建立線程池,不允許使用Executors,是以有必要對ThreadPoolExecutor進行進一步了解。
1.ThreadPoolExecutor介紹
線程池類,直接看其入參最多的構造函數:
參數意義:
corePoolSize
核心線程數的大小。預設情況下,在建立了線程池之後,線程池中的線程數為0,當有任務到來後,如果線程池中存活的線程數小于corePoolSize,則建立一個線程。
maximumPoolSize
線程池中允許的最大線程數,這個參數表示了線程池中最多能建立的線程數量。當任務數量比corePoolSize大時,任務添加到workQueue,當workQueue滿了,将繼續建立線程以處理任務。maximumPoolSize表示當wordQueue滿了,線程池中最多可以建立的線程數量。
keepAliveTime、unit
當線程池處于空閑狀态時,超過keepAliveTime時間之後,空閑的線程會被終止。隻有當線程池中的線程數大于corePoolSize時,這個參數才會起作用,但是如果調用了allowCoreThreadTimeOut(boolean)方法,線上程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;當線程數大于corePoolSize時,如果一個線程的空閑時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。
workQueue
阻塞隊列,存儲送出的等待任務。
threadFactory
線程工廠,指定建立線程的工廠
handler
當任務超出線程池範圍和隊列容量時,采取何種拒絕政策。
對于上述參數,源碼注釋中有很詳細的解釋。這裡筆者挑出認為重要的幾段:
這裡表明了corePoolSize、maximumPoolSize和workQueue的關系(上述注釋說的非常的清楚,這裡稍微翻譯下):
#1.預設情況下,線程池初始化的時候,線程數為0。當接收到新任務時,如果線程池中存活的線程數小于corePoolSize,則建立一個線程。
#2.當運作的線程數超出核心線程數時,執行器更多的選擇是将任務放入隊列中,而不是建立一個線程。
#3.當隊列滿後,任務不能送出到隊列,在不超過maximumPoolSize(最大線程數)的情況下,會建立一個新線程去執行任務,當超過maximumPoolSize時,任務将被拒絕(這裡就關聯到接下來說要介紹的内容,在任務操作maximumPoolSize時,線程池所使用拒絕政策)。
當執行器關閉、線程池滿了、隊列滿了,則新任務會被拒絕。使用的拒絕政策有以下幾種:
注釋解釋的非常清楚,線程池采用的拒絕政策共有4種:
#1.AbortPolicy : 預設政策,當任務被拒絕時直接抛出異常RejectedExecutionException。
#2.CallerRunsPolicy : 讓調用者所在的線程來執行任務,這種政策并不會丢棄任務,但是會降低執行器處理任務的速率。
#3.DiscardPolicy : 直接丢棄新任務。
#4.DiscardOldestPolicy : 如果執行器未關閉,删除隊列中第一個任務,再次執行任務。如果失敗會重試(repeated)。
接下來看線程池的排隊政策。
線程池提供了3種排隊的政策:
#1.直接送出(SynchronousQueue):直接送出任務,不儲存任務。直接送出政策無容量限制,但是當任務數量過速增長有可能撐爆“JVM”。在生産中一般不采用此政策。
#2.無界隊列(LinkedBlockingQueue):當所有核心線程都在忙時,用一個無界隊列存放送出的任務。最大線程數設定了也無效。使用無界隊列會儲存核心線程處理不了的任務,隊列無上限,是以最大線程數設定了也無效,無界隊列需謹慎使用。
#3.有界隊列(ArrayBlockingQueue):用一個有界隊列幫助防止資源被耗盡,不過調整和控制比較難。因為隊列容量小了,任務不能立即執行,當然需要配合拒絕政策;隊列容量太大,又比較耗費資源。當然在生産環境中一般使用有界隊列的排隊政策,因為使用有界隊列可以儲存超過核心線程的任務,并且隊列有上限,超過上限,建立線程抛錯,可以更好的保護資源,防止崩潰。
通過以上分析,可以發現corePoolSize、maximumPoolSize和排隊政策是互相影響的,maximumPoolSize的值并不一定有效。
接下來看看線程池的存活機制
當建立的線程超過核心線程數時,線程池會讓該線程保持存活keepAliveTime時間,超過該時間後會銷毀該線程。預設情況下該值對非核心線程有效,如果想讓核心線程也适用于該機制,可以調用allowCoreThreadTimeOut()方法,但是這樣的話就不存在核心線程的概念了。
綜合以上,線程池在多次執行任務後,會一直維持部分線程存活,即使它是閑置的。目的是為了減少線程銷毀建立的開銷,下次有任務需要執行,直接從池子裡拿線程就能用了。但核心線程不能維護太多,因為也需要一定開銷。最大的線程數保護了整個系統的穩定性,避免并發量大的時候,把線程擠滿。工作隊列則是保證了任務順序和暫存,系統的可靠性。線程存活規則的目的和維護核心線程的目的類似,但降低了它的存活的時間。
2.線程狀态控制
ctl變量是整個線程池的核心控制狀态,它是一個AtomicInteger類型的原子對象,它記錄了線程池中生效線程數和線程池的運作狀态。
- workerCount,生效的線程數,基本上可以了解為存活的線程數。
- runState,線程池運作狀态。
ctl總共32位,其中低29位代表workerCount,是以最大線程數為(2^29)-1。高3位代表runState。
runState有5個值:
各值對應的值如下:
RUNNING -- 對應的高3位值是111。
SHUTDOWN -- 對應的高3位值是000。
STOP -- 對應的高3位值是001。
TIDYING -- 對應的高3位值是010。
TERMINATED -- 對應的高3位值是011。
- RUNNING,接收新任務處理隊列任務。
- SHUTDOWN,不接收新任務,但處理隊列任務。
- STOP,不接收新任務,也不處理隊列任務,并且中斷所有進行中的任務。
- TIDYING,所有任務都被終結,有效線程為0,并觸發terminated()方法。
- TERMINATED,當terminated()方法執行結束。
線程池各個狀态之間的切換:
當調用了shutdown(),狀态會從RUNNING變成SHUTDOWN,不再接收新任務,此時會處理完隊列裡面的任務。
如果調用的是shutdownNow(),狀态會直接變成STOP。
當線程或者隊列都是空的時候,狀态就會變成TIDYING。
當terminated()執行完的時候,就會變成TERMINATED。
3.關鍵函數解析
execute(Runnable)
1 public void execute(Runnable command) {
2 if (command == null)
3 throw new NullPointerException();
4 /*
5 * Proceed in 3 steps:
6 *
7 * 1. If fewer than corePoolSize threads are running, try to
8 * start a new thread with the given command as its first
9 * task. The call to addWorker atomically checks runState and
10 * workerCount, and so prevents false alarms that would add
11 * threads when it shouldn't, by returning false.
12 *
13 * 2. If a task can be successfully queued, then we still need
14 * to double-check whether we should have added a thread
15 * (because existing ones died since last checking) or that
16 * the pool shut down since entry into this method. So we
17 * recheck state and if necessary roll back the enqueuing if
18 * stopped, or start a new thread if there are none.
19 *
20 * 3. If we cannot queue task, then we try to add a new
21 * thread. If it fails, we know we are shut down or saturated
22 * and so reject the task.
23 */
24 int c = ctl.get();
25 if (workerCountOf(c) < corePoolSize) {
26 if (addWorker(command, true))
27 return;
28 c = ctl.get();
29 }
30 if (isRunning(c) && workQueue.offer(command)) {
31 int recheck = ctl.get();
32 if (! isRunning(recheck) && remove(command))
33 reject(command);
34 else if (workerCountOf(recheck) == 0)
35 addWorker(null, false);
36 }
37 else if (!addWorker(command, false))
38 reject(command);
39 }
execute函數的主要流程源碼中的注釋已經講得非常清楚了。
- 如果少于核心線程在運作,則嘗試建立一個新的線程。
- 如果任務成功入隊,需再次檢查線程池狀态看是否需要入隊,因為在入隊過程中,有可能狀态發生變化;如果确認入隊但沒有存活線程,則建立一個空線程。
- 如果不能入隊,則嘗試新建立一個線程,如果失敗,則拒絕任務。
- 注意在第二步最後會建立一個線程,這裡會有一個輪詢機制讓下個task出隊,然後直接利用這個空閑線程。
在execute中我們主要關注addWorker()函數。
首先看下該函數的整體注釋了解其大緻流程。
- 該函數會檢查目前線程池是否可以建立worker(線程)。
- 當線程池stop或者shut down,又或者線程工廠建立線程失敗時都會傳回false。
- 線上程建立失敗時,會進行復原。
- 注意core參數:true表示以corePoolSize作為參照,false表示以maximumPoolSize為參照。
接下來分析addWorker源碼:
1 private boolean addWorker(Runnable firstTask, boolean core) {
2 retry: // 标記,表示跳出循環時,從哪裡開始執行,類似于goto
3 for (;;) {
4 int c = ctl.get(); // 擷取ctl對應的值,“生效線程數”和“線程池狀态”
5 int rs = runStateOf(c); // 擷取線程池狀态
6
7 // Check if queue empty only if necessary.
8 // 如果該if判斷想要傳回false,隊列為空為必要條件,因為addWorker()不隻是在接收新任務會調用到,處理隊列的任務也會調用到。線上程池狀态為SHUTDOWN時還會處理隊列中的任務,是以隊列不為空會繼續向下執行
9 if (rs >= SHUTDOWN &&
10 ! (rs == SHUTDOWN &&
11 firstTask == null &&
12 ! workQueue.isEmpty()))
13 return false;
14 /* 内循環意義:判斷worker是否符合corePoolSize和maximumPoolSize定義,不滿足則傳回false;
然後利用CAS自增workerCount,如果CAS成功則退出循環;
如果CAS失敗會繼續自旋,在自旋過程中會檢查線程池狀态,如果發生變化,則回退到外層循環,重新執行。
15 是以内循環的主要作用就是讓workerCount在符合條件下自增。
16 */
17 for (;;) {
18 int wc = workerCountOf(c);
19 if (wc >= CAPACITY ||
20 wc >= (core ? corePoolSize : maximumPoolSize))
21 return false;
22 if (compareAndIncrementWorkerCount(c))
23 break retry;
24 c = ctl.get(); // Re-read ctl
25 if (runStateOf(c) != rs)
26 continue retry;
27 // else CAS failed due to workerCount change; retry inner loop
28 }
29 }
30
31 boolean workerStarted = false;
32 boolean workerAdded = false;
33 Worker w = null;
34 // 這段代碼的主要功能:添加任務到線程池,并啟動任務所在的線程
35 try {
36 // 建立一個Worker對象,包含一個由線程工廠建立的線程和一個需執行的任務
37 w = new Worker(firstTask);
38 final Thread t = w.thread;
39 if (t != null) {
40 // 線程建立成功 擷取一個可重入鎖,把Worker對象放入worker成員變量中
41 final ReentrantLock mainLock = this.mainLock;
42 mainLock.lock();
43 try {
44 // Recheck while holding lock.
45 // Back out on ThreadFactory failure or if
46 // shut down before lock acquired.
47 int rs = runStateOf(ctl.get());
48 // 檢查線程池狀态和線程狀态
49 if (rs < SHUTDOWN ||
50 (rs == SHUTDOWN && firstTask == null)) {
51 if (t.isAlive()) // precheck that t is startable
52 throw new IllegalThreadStateException();
53 workers.add(w); // 将Worker變量加入workers中(集合)
54 // 更新largestPoolSize
55 int s = workers.size();
56 if (s > largestPoolSize)
57 largestPoolSize = s;
58 workerAdded = true;
59 }
60 } finally {
61 mainLock.unlock();
62 }
63 // 如果任務添加成功,則啟動任務所在的線程
64 if (workerAdded) {
65 t.start();
66 workerStarted = true;
67 }
68 }
69 } finally {
70 // 如果任務添加失敗則執行addWorkerFailed進行復原
71 if (! workerStarted)
72 addWorkerFailed(w);
73 }
74 return workerStarted;
75 }
addWorkerFailed(Worker),任務添加失敗復原函數:
1 private void addWorkerFailed(Worker w) {
2 final ReentrantLock mainLock = this.mainLock;
3 // 加鎖復原
4 mainLock.lock();
5 try {
6 if (w != null)
7 workers.remove(w); // 復原workers
8 decrementWorkerCount();// 復原workerCount
9 tryTerminate();// 判斷線程池狀态,是否需要終結線程池
10 } finally {
11 mainLock.unlock();
12 }
13 }
4.總結
ThreadPoolExecutor我們主要關注其addWorker方法,對于其他方法,可翻看源碼,比較好了解。
核心要點:
- 當核心線程忙碌時,線程池更傾向于把任務放進隊列,而不是建立線程。
- 三種不同的排隊政策,根據選擇隊列的不同,maximumPoolSize不一定有用的。
- ctl是線程池的核心控制狀态,包含的runState線程池運作狀态和workCount有效線程數。
- retry:是一種标記循環的文法,retry可以是任何變量命名合法字元。
by Shawn Chen,2019.02.16,下午。