天天看點

ThreadPoolExecutor解析

前言:在最新的阿裡規範中強制使用ThreadPoolExecutor方式建立線程池,不允許使用Executors,是以有必要對ThreadPoolExecutor進行進一步了解。

1.ThreadPoolExecutor介紹

線程池類,直接看其入參最多的構造函數:

ThreadPoolExecutor解析

參數意義:

corePoolSize

核心線程數的大小。預設情況下,在建立了線程池之後,線程池中的線程數為0,當有任務到來後,如果線程池中存活的線程數小于corePoolSize,則建立一個線程。

maximumPoolSize

線程池中允許的最大線程數,這個參數表示了線程池中最多能建立的線程數量。當任務數量比corePoolSize大時,任務添加到workQueue,當workQueue滿了,将繼續建立線程以處理任務。maximumPoolSize表示當wordQueue滿了,線程池中最多可以建立的線程數量。

keepAliveTime、unit

當線程池處于空閑狀态時,超過keepAliveTime時間之後,空閑的線程會被終止。隻有當線程池中的線程數大于corePoolSize時,這個參數才會起作用,但是如果調用了allowCoreThreadTimeOut(boolean)方法,線上程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;當線程數大于corePoolSize時,如果一個線程的空閑時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。

workQueue

阻塞隊列,存儲送出的等待任務。

threadFactory

線程工廠,指定建立線程的工廠

handler

當任務超出線程池範圍和隊列容量時,采取何種拒絕政策。

對于上述參數,源碼注釋中有很詳細的解釋。這裡筆者挑出認為重要的幾段:

ThreadPoolExecutor解析

這裡表明了corePoolSize、maximumPoolSize和workQueue的關系(上述注釋說的非常的清楚,這裡稍微翻譯下):

#1.預設情況下,線程池初始化的時候,線程數為0。當接收到新任務時,如果線程池中存活的線程數小于corePoolSize,則建立一個線程。

#2.當運作的線程數超出核心線程數時,執行器更多的選擇是将任務放入隊列中,而不是建立一個線程。

#3.當隊列滿後,任務不能送出到隊列,在不超過maximumPoolSize(最大線程數)的情況下,會建立一個新線程去執行任務,當超過maximumPoolSize時,任務将被拒絕(這裡就關聯到接下來說要介紹的内容,在任務操作maximumPoolSize時,線程池所使用拒絕政策)。

ThreadPoolExecutor解析

當執行器關閉、線程池滿了、隊列滿了,則新任務會被拒絕。使用的拒絕政策有以下幾種:

ThreadPoolExecutor解析

注釋解釋的非常清楚,線程池采用的拒絕政策共有4種:

#1.AbortPolicy : 預設政策,當任務被拒絕時直接抛出異常RejectedExecutionException。

#2.CallerRunsPolicy : 讓調用者所在的線程來執行任務,這種政策并不會丢棄任務,但是會降低執行器處理任務的速率。

#3.DiscardPolicy : 直接丢棄新任務。

#4.DiscardOldestPolicy : 如果執行器未關閉,删除隊列中第一個任務,再次執行任務。如果失敗會重試(repeated)。

接下來看線程池的排隊政策。

線程池提供了3種排隊的政策:

#1.直接送出(SynchronousQueue):直接送出任務,不儲存任務。直接送出政策無容量限制,但是當任務數量過速增長有可能撐爆“JVM”。在生産中一般不采用此政策。

ThreadPoolExecutor解析

#2.無界隊列(LinkedBlockingQueue):當所有核心線程都在忙時,用一個無界隊列存放送出的任務。最大線程數設定了也無效。使用無界隊列會儲存核心線程處理不了的任務,隊列無上限,是以最大線程數設定了也無效,無界隊列需謹慎使用。

ThreadPoolExecutor解析

#3.有界隊列(ArrayBlockingQueue):用一個有界隊列幫助防止資源被耗盡,不過調整和控制比較難。因為隊列容量小了,任務不能立即執行,當然需要配合拒絕政策;隊列容量太大,又比較耗費資源。當然在生産環境中一般使用有界隊列的排隊政策,因為使用有界隊列可以儲存超過核心線程的任務,并且隊列有上限,超過上限,建立線程抛錯,可以更好的保護資源,防止崩潰。

ThreadPoolExecutor解析

通過以上分析,可以發現corePoolSize、maximumPoolSize和排隊政策是互相影響的,maximumPoolSize的值并不一定有效。

接下來看看線程池的存活機制

ThreadPoolExecutor解析

當建立的線程超過核心線程數時,線程池會讓該線程保持存活keepAliveTime時間,超過該時間後會銷毀該線程。預設情況下該值對非核心線程有效,如果想讓核心線程也适用于該機制,可以調用allowCoreThreadTimeOut()方法,但是這樣的話就不存在核心線程的概念了。

綜合以上,線程池在多次執行任務後,會一直維持部分線程存活,即使它是閑置的。目的是為了減少線程銷毀建立的開銷,下次有任務需要執行,直接從池子裡拿線程就能用了。但核心線程不能維護太多,因為也需要一定開銷。最大的線程數保護了整個系統的穩定性,避免并發量大的時候,把線程擠滿。工作隊列則是保證了任務順序和暫存,系統的可靠性。線程存活規則的目的和維護核心線程的目的類似,但降低了它的存活的時間。

2.線程狀态控制

ThreadPoolExecutor解析

ctl變量是整個線程池的核心控制狀态,它是一個AtomicInteger類型的原子對象,它記錄了線程池中生效線程數和線程池的運作狀态。

  • workerCount,生效的線程數,基本上可以了解為存活的線程數。
  • runState,線程池運作狀态。

ctl總共32位,其中低29位代表workerCount,是以最大線程數為(2^29)-1。高3位代表runState。

runState有5個值:

ThreadPoolExecutor解析

各值對應的值如下:

RUNNING    -- 對應的高3位值是111。
SHUTDOWN   -- 對應的高3位值是000。
STOP       -- 對應的高3位值是001。
TIDYING    -- 對應的高3位值是010。
TERMINATED -- 對應的高3位值是011。      
  • RUNNING,接收新任務處理隊列任務。
  • SHUTDOWN,不接收新任務,但處理隊列任務。
  • STOP,不接收新任務,也不處理隊列任務,并且中斷所有進行中的任務。
  • TIDYING,所有任務都被終結,有效線程為0,并觸發terminated()方法。
  • TERMINATED,當terminated()方法執行結束。

線程池各個狀态之間的切換:

ThreadPoolExecutor解析

當調用了shutdown(),狀态會從RUNNING變成SHUTDOWN,不再接收新任務,此時會處理完隊列裡面的任務。

如果調用的是shutdownNow(),狀态會直接變成STOP。

當線程或者隊列都是空的時候,狀态就會變成TIDYING。

當terminated()執行完的時候,就會變成TERMINATED。

ThreadPoolExecutor解析

3.關鍵函數解析

execute(Runnable) 

1 public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * Proceed in 3 steps:
 6          *
 7          * 1. If fewer than corePoolSize threads are running, try to
 8          * start a new thread with the given command as its first
 9          * task.  The call to addWorker atomically checks runState and
10          * workerCount, and so prevents false alarms that would add
11          * threads when it shouldn't, by returning false.
12          *
13          * 2. If a task can be successfully queued, then we still need
14          * to double-check whether we should have added a thread
15          * (because existing ones died since last checking) or that
16          * the pool shut down since entry into this method. So we
17          * recheck state and if necessary roll back the enqueuing if
18          * stopped, or start a new thread if there are none.
19          *
20          * 3. If we cannot queue task, then we try to add a new
21          * thread.  If it fails, we know we are shut down or saturated
22          * and so reject the task.
23          */
24         int c = ctl.get();
25         if (workerCountOf(c) < corePoolSize) {
26             if (addWorker(command, true))
27                 return;
28             c = ctl.get();
29         }
30         if (isRunning(c) && workQueue.offer(command)) {
31             int recheck = ctl.get();
32             if (! isRunning(recheck) && remove(command))
33                 reject(command);
34             else if (workerCountOf(recheck) == 0)
35                 addWorker(null, false);
36         }
37         else if (!addWorker(command, false))
38             reject(command);
39     }      

execute函數的主要流程源碼中的注釋已經講得非常清楚了。

  • 如果少于核心線程在運作,則嘗試建立一個新的線程。
  • 如果任務成功入隊,需再次檢查線程池狀态看是否需要入隊,因為在入隊過程中,有可能狀态發生變化;如果确認入隊但沒有存活線程,則建立一個空線程。
  • 如果不能入隊,則嘗試新建立一個線程,如果失敗,則拒絕任務。
  • 注意在第二步最後會建立一個線程,這裡會有一個輪詢機制讓下個task出隊,然後直接利用這個空閑線程。

在execute中我們主要關注addWorker()函數。

首先看下該函數的整體注釋了解其大緻流程。

ThreadPoolExecutor解析
  • 該函數會檢查目前線程池是否可以建立worker(線程)。
  • 當線程池stop或者shut down,又或者線程工廠建立線程失敗時都會傳回false。
  • 線上程建立失敗時,會進行復原。
  • 注意core參數:true表示以corePoolSize作為參照,false表示以maximumPoolSize為參照。

接下來分析addWorker源碼:

1 private boolean addWorker(Runnable firstTask, boolean core) {
 2         retry: // 标記,表示跳出循環時,從哪裡開始執行,類似于goto
 3         for (;;) {
 4             int c = ctl.get(); // 擷取ctl對應的值,“生效線程數”和“線程池狀态”
 5             int rs = runStateOf(c); // 擷取線程池狀态
 6 
 7             // Check if queue empty only if necessary.
 8             // 如果該if判斷想要傳回false,隊列為空為必要條件,因為addWorker()不隻是在接收新任務會調用到,處理隊列的任務也會調用到。線上程池狀态為SHUTDOWN時還會處理隊列中的任務,是以隊列不為空會繼續向下執行
 9             if (rs >= SHUTDOWN &&
10                 ! (rs == SHUTDOWN &&
11                    firstTask == null &&
12                    ! workQueue.isEmpty()))
13                 return false;
14             /* 内循環意義:判斷worker是否符合corePoolSize和maximumPoolSize定義,不滿足則傳回false;
                  然後利用CAS自增workerCount,如果CAS成功則退出循環;
                  如果CAS失敗會繼續自旋,在自旋過程中會檢查線程池狀态,如果發生變化,則回退到外層循環,重新執行。
15                是以内循環的主要作用就是讓workerCount在符合條件下自增。
16             */
17             for (;;) {
18                 int wc = workerCountOf(c);
19                 if (wc >= CAPACITY ||
20                     wc >= (core ? corePoolSize : maximumPoolSize))
21                     return false;
22                 if (compareAndIncrementWorkerCount(c))
23                     break retry;
24                 c = ctl.get();  // Re-read ctl
25                 if (runStateOf(c) != rs)
26                     continue retry;
27                 // else CAS failed due to workerCount change; retry inner loop
28             }
29         }
30 
31         boolean workerStarted = false;
32         boolean workerAdded = false;
33         Worker w = null;
34         // 這段代碼的主要功能:添加任務到線程池,并啟動任務所在的線程
35         try {
36             // 建立一個Worker對象,包含一個由線程工廠建立的線程和一個需執行的任務
37             w = new Worker(firstTask);
38             final Thread t = w.thread;
39             if (t != null) {
40                 // 線程建立成功 擷取一個可重入鎖,把Worker對象放入worker成員變量中
41                 final ReentrantLock mainLock = this.mainLock;
42                 mainLock.lock();
43                 try {
44                     // Recheck while holding lock.
45                     // Back out on ThreadFactory failure or if
46                     // shut down before lock acquired.
47                     int rs = runStateOf(ctl.get());
48                     // 檢查線程池狀态和線程狀态
49                     if (rs < SHUTDOWN ||
50                         (rs == SHUTDOWN && firstTask == null)) {
51                         if (t.isAlive()) // precheck that t is startable
52                             throw new IllegalThreadStateException();
53                         workers.add(w); // 将Worker變量加入workers中(集合)
54                         // 更新largestPoolSize
55                         int s = workers.size();
56                         if (s > largestPoolSize)
57                             largestPoolSize = s;
58                         workerAdded = true;
59                     }
60                 } finally {
61                     mainLock.unlock();
62                 }
63                 // 如果任務添加成功,則啟動任務所在的線程
64                 if (workerAdded) {
65                     t.start();
66                     workerStarted = true;
67                 }
68             }
69         } finally {
70             // 如果任務添加失敗則執行addWorkerFailed進行復原
71             if (! workerStarted)
72                 addWorkerFailed(w);
73         }
74         return workerStarted;
75     }      

addWorkerFailed(Worker),任務添加失敗復原函數:

1   private void addWorkerFailed(Worker w) {
 2         final ReentrantLock mainLock = this.mainLock;
 3         // 加鎖復原
 4         mainLock.lock();
 5         try {
 6             if (w != null)
 7                 workers.remove(w); // 復原workers
 8             decrementWorkerCount();// 復原workerCount
 9             tryTerminate();// 判斷線程池狀态,是否需要終結線程池
10         } finally {
11             mainLock.unlock();
12         }
13     }      

4.總結

ThreadPoolExecutor我們主要關注其addWorker方法,對于其他方法,可翻看源碼,比較好了解。

核心要點:

  • 當核心線程忙碌時,線程池更傾向于把任務放進隊列,而不是建立線程。
  • 三種不同的排隊政策,根據選擇隊列的不同,maximumPoolSize不一定有用的。
  • ctl是線程池的核心控制狀态,包含的runState線程池運作狀态和workCount有效線程數。
  • retry:是一種标記循環的文法,retry可以是任何變量命名合法字元。

by Shawn Chen,2019.02.16,下午。

=========================================================

比你優秀的人比你還努力,你有什麼資格不去奮鬥!

__一個有理想的程式員。

繼續閱讀