作者:雪山上的蒲公英
1. 通過Executors建立線程池的弊端
在建立線程池的時候,大部分人還是會選擇使用Executors去建立。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISPrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdsATOfd3bkFGazxCMx8VesATMfhHLlN3XnxCMwEzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5yN4EmYiVGOwEzYwEDOzUTMyAzY1IWZ2MGMkVzM1cTYl9CX0IzLcZDMxIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjL2M3Lc9CX6MHc0RHaiojIsJye.png)
下面是建立定長線程池(FixedThreadPool)的一個例子,嚴格來說,當使用如下代碼建立線程池時,是不符合程式設計規範的。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
原因在于:(摘自阿裡編碼規約)
線程池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明确線程池的運作規則,規避資源耗盡的風險。
說明:Executors各個方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要問題是堆積的請求處理隊列可能會耗費非常大的記憶體,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要問題是線程數最大數是Integer.MAX_VALUE,可能會建立數量非常多的線程,甚至OOM。
2. 通過ThreadPoolExecutor建立線程池
是以,針對上面的不規範代碼,重構為通過ThreadPoolExecutor建立線程池的方式。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
ThreadPoolExecutor 是線程池的核心實作。線程的建立和終止需要很大的開銷,線程池中預先提供了指定數量的可重用線程,是以使用線程池會節省系統資源,并且每個線程池都維護了一些基礎的資料統計,友善線程的管理和監控。
3. ThreadPoolExecutor參數解釋
下面是對其參數的解釋,在建立線程池時需根據自己的情況來合理設定線程池。
corePoolSize & maximumPoolSize
核心線程數(corePoolSize)和最大線程數(maximumPoolSize)是線程池中非常重要的兩個概念,希望同學們能夠掌握。
當一個新任務被送出到池中,如果目前運作線程小于核心線程數(corePoolSize),即使目前有空閑線程,也會建立一個線程來處理新送出的任務;如果目前運作線程數大于核心線程數(corePoolSize)并小于最大線程數(maximumPoolSize),隻有當等待隊列已滿的情況下才會建立線程。
keepAliveTime & unit
keepAliveTime 為超過 corePoolSize 線程數量的線程最大空閑時間,unit 為時間機關。
等待隊列
任何阻塞隊列(BlockingQueue)都可以用來轉移或儲存送出的任務,線程池大小和阻塞隊列互相限制線程池:
- 如果運作線程數小于
,送出新任務時就會建立一個線程來運作;corePoolSize
- 如果運作線程數大于或等于
,新送出的任務就會入列等待;如果隊列已滿,并且運作線程數小于corePoolSize
,也将會建立一個線程來運作;maximumPoolSize
- 如果線程數大于
,新送出的任務将會根據拒絕政策來處理。maximumPoolSize
下面來看一下三種通用的入隊政策:
- 直接傳遞:通過 SynchronousQueue 直接把任務傳遞給線程。如果目前沒可用線程,嘗試入隊操作會失敗,然後再建立一個新的線程。當處理可能具有内部依賴性的請求時,該政策會避免請求被鎖定。直接傳遞通常需要無界的最大線程數(maximumPoolSize),避免拒絕新送出的任務。當任務持續到達的平均速度超過可處理的速度時,可能導緻線程的無限增長。
- 無界隊列:使用無界隊列(如 LinkedBlockingQueue)作為等待隊列,當所有的核心線程都在處理任務時, 新送出的任務都會進入隊列等待。是以,不會有大于 corePoolSize 的線程會被建立(maximumPoolSize 也将失去作用)。這種政策适合每個任務都完全獨立于其他任務的情況;例如網站伺服器。這種類型的等待隊列可以使瞬間爆發的高頻請求變得平滑。當任務持續到達的平均速度超過可處理速度時,可能導緻等待隊列無限增長。
- 有界隊列:當使用有限的最大線程數時,有界隊列(如 ArrayBlockingQueue)可以防止資源耗盡,但是難以調整和控制。隊列大小和線程池大小可以互相作用:使用大的隊列和小的線程數可以減少CPU使用率、系統資源和上下文切換的開銷,但是會導緻吞吐量變低,如果任務頻繁地阻塞(例如被I/O限制),系統就能為更多的線程排程執行時間。使用小的隊列通常需要更多的線程數,這樣可以最大化CPU使用率,但可能會需要更大的排程開銷,進而降低吞吐量。
拒絕政策
當線程池已經關閉或達到飽和(最大線程和隊列都已滿)狀态時,新送出的任務将會被拒絕。ThreadPoolExecutor 定義了四種拒絕政策:
- AbortPolicy:預設政策,在需要拒絕任務時抛出RejectedExecutionException;
- CallerRunsPolicy:直接在 execute 方法的調用線程中運作被拒絕的任務,如果線程池已經關閉,任務将被丢棄;
- DiscardPolicy:直接丢棄任務;
- DiscardOldestPolicy:丢棄隊列中等待時間最長的任務,并執行目前送出的任務,如果線程池已經關閉,任務将被丢棄。
我們也可以自定義拒絕政策,隻需要實作 RejectedExecutionHandler;需要注意的是,拒絕政策的運作需要指定線程池和隊列的容量。
4. ThreadPoolExecutor建立線程方式
通過下面的demo來了解ThreadPoolExecutor建立線程的過程。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 測試ThreadPoolExecutor對線程的執行順序
**/
public class ThreadPoolSerialTest {
public static void main(String[] args) {
//核心線程數
int corePoolSize = 3;
//最大線程數
int maximumPoolSize = 6;
//超過 corePoolSize 線程數量的線程最大空閑時間
long keepAliveTime = 2;
//以秒為時間機關
TimeUnit unit = TimeUnit.SECONDS;
//建立工作隊列,用于存放送出的等待執行任務
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
ThreadPoolExecutor threadPoolExecutor = null;
try {
//建立線程池
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy());
//循環送出任務
for (int i = 0; i < 8; i++) {
//送出任務的索引
final int index = (i + 1);
threadPoolExecutor.submit(() -> {
//線程列印輸出
System.out.println("大家好,我是線程:" + index);
try {
//模拟線程執行時間,10s
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//每個任務送出後休眠500ms再送出下一個任務,用于保證送出順序
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
執行結果:
這裡描述一下執行的流程:
- 首先通過 ThreadPoolExecutor 構造函數建立線程池;
- 執行 for 循環,送出 8 個任務(恰好等于maximumPoolSize[最大線程數] + capacity[隊列大小]);
- 通過 threadPoolExecutor.submit 送出 Runnable 接口實作的執行任務;
- 送出第1個任務時,由于目前線程池中正在執行的任務為 0 ,小于 3(corePoolSize 指定),是以會建立一個線程用來執行送出的任務1;
- 送出第 2, 3 個任務的時候,由于目前線程池中正在執行的任務數量小于等于 3 (corePoolSize 指定),是以會為每一個送出的任務建立一個線程來執行任務;
- 當送出第4個任務的時候,由于目前正在執行的任務數量為 3 (因為每個線程任務執行時間為10s,是以送出第4個任務的時候,前面3個線程都還在執行中),此時會将第4個任務存放到 workQueue 隊列中等待執行;
- 由于 workQueue 隊列的大小為 2 ,是以該隊列中也就隻能儲存 2 個等待執行的任務,是以第5個任務也會儲存到任務隊列中;
- 當送出第6個任務的時候,因為目前線程池正在執行的任務數量為3,workQueue 隊列中存儲的任務數量也滿了,這時會判斷目前線程池中正在執行的任務的數量是否小于6(maximumPoolSize指定);
- 如果小于 6 ,那麼就會新建立一個線程來執行送出的任務 6;
- 執行第7,8個任務的時候,也要判斷目前線程池中正在執行的任務數是否小于6(maximumPoolSize指定),如果小于6,那麼也會立即建立線程來執行這些送出的任務;
- 此時,6個任務都已經送出完畢,那 workQueue 隊列中的等待 任務4 和 任務5 什麼時候執行呢?
- 當任務1執行完畢後(10s後),執行任務1的線程并沒有被銷毀掉,而是擷取 workQueue 中的任務4來執行;
- 當任務2執行完畢後,執行任務2的線程也沒有被銷毀,而是擷取 workQueue 中的任務5來執行;
通過上面流程的分析,也就知道了之前案例的輸出結果的原因。其實,線程池中會線程執行完畢後,并不會被立刻銷毀,線程池中會保留 corePoolSize 數量的線程,當 workQueue 隊列中存在任務或者有新送出任務時,那麼會通過線程池中已有的線程來執行任務,避免了頻繁的線程建立與銷毀,而大于 corePoolSize 小于等于 maximumPoolSize 建立的線程,則會在空閑指定時間(keepAliveTime)後進行回收。
5. ThreadPoolExecutor拒絕政策
在上面的測試中,我設定的執行線程總數恰好等于maximumPoolSize[最大線程數] + capacity[隊列大小],是以沒有出現需要執行拒絕政策的情況,是以在這裡,我再增加一個線程,送出9個任務,來示範不同的拒絕政策。
AbortPolicy
為什麼阿裡不允許用Executors建立線程池,而是通過ThreadPoolExecutor的方式? CallerRunsPolicy
為什麼阿裡不允許用Executors建立線程池,而是通過ThreadPoolExecutor的方式? DiscardPolicy
為什麼阿裡不允許用Executors建立線程池,而是通過ThreadPoolExecutor的方式? DiscardOldestPolicy
參考
有熱門推薦????
為什麼要在2021年放棄Jenkins?我已經對他失去耐心了...
Docker + FastDFS + Spring Boot 一鍵式搭建分布式檔案伺服器面試官:這貨一聽就是一個水貨...2020年度開發者工具Top 100名單!我怎麼99%的都沒用過,是我太low?
面試必問:Spring 循環依賴的三種方式 !再見!公司的爛系統~ 網友:好想給大神當小弟...簡單解析一下掃碼登陸原理,簡單到你想不到!
面試官問:為什麼SpringBoot的 jar 可以直接運作?