幹貨部落格
https://www.jianshu.com/p/389b58856894
思想
池化技術
池化技術應用:線程池、資料庫連接配接池、http連接配接池等等。
池化技術的思想主要是為了減少每次擷取資源的消耗,提高對資源的使用率。
通過将資源池化,統一管理,可以友善管控。例如提供資源監控,排程政策等能力
線程池的好處
降低資源消耗:通過重複利用已建立的線程降低線程建立和銷毀造成的消耗。
提高響應速度:當任務到達時,可以不需要等待線程建立就能立即執行。
提高線程的可管理性:線程是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定,監控和調優。
原理
下圖所示為線程池的實作原理:調用方不斷地向線程池中送出任務;線程池中有一組線程,不斷地從隊列中取任務,這是一個典型的生産者—消費者模型
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczX0xiRGZkRGZ0Xy9GbvNGL2EzXlpXazxyMw1WYoZkMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLzEzM0MTNyADMxEDNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
設計的關鍵點,即排程政策制定
- 隊列設定多長?如果是無界的,調用方不斷地往隊列中放任務,可能導緻記憶體耗盡。如果是有界的,當隊列滿了之後,調用方如何處理?
- 線程池中的線程個數是固定的,還是動态變化的?
- 每次送出新任務,是放入隊列?還是開新線程?
- 當沒有任務的時候,線程是睡眠一小段時間?還是進入阻塞?如果進入阻塞,如何喚醒?
JDK的實作
ThreadPoolExector/ScheduledThreadPoolExecutor
- 基于阻塞隊列實作(即設計關鍵點:4)
- 通過構造方法入參,讓我們定制具體的排程政策(即設計關鍵點:1,2,3)
JDK中的線程池
類圖
種類
- ThreadPoolExector:線程池
- ScheduledThreadPoolExecutor:定時排程的線程池
ThreadPoolExector
概念與邏輯模型
本節的描述不是很嚴謹,适宜快速了解
概念
- 核心線程是否允許關閉:預設為否
- 核心線程數:若核心線程不允許關閉,則核心線程為線程池的最小線程數量,會一直存活。若核心線程允許關閉,則線程池最小線程數可以為0,不受核心線程數控制。
- 最大線程數數:線程池中同時線程的最大上限。
- 線程逾時時間:當 本線程超過逾時時間,若 (核心線程不允許關閉 并 線程數量大于核心線程數)或 核心線程允許關閉,本線程會被關閉
- 任務緩沖隊列:是一個阻塞隊列,目前線程數大于核心線程數時,實時緩沖(展現在實時放任務)一定數量的(即緩沖隊列的大小)任務。如果此時有空閑線程,則空閑線程直接執行任務(這個就是阻塞取任務的意義)。若不能實時緩沖(即放入任務後會超過緩沖隊列的大小),就嘗試建立任務執行。
模型
- 線程池中有 核心線程數,最大線程數,線程保證存活時間,核心線程是否允許關閉,線程集合,任務緩沖-阻塞隊列,拒絕政策 等 要素
- 線程池負責排程線程
- 注意線上程池中的線程都是等同的,并不區分某個線程是否是核心線程,
- 線程池 是 依據 存活的線程數量 和 線程是否空閑,選擇适宜的排程政策(即執行任務的方式)。
排程政策
- 線程池初始化時不初始化線程
- 我們将任務送出到線程池時,線程池的排程政策
- 任務執行政策
- 當線程池的線程數量小于核心線程數時,建立線程執行(即使有空閑線程也不用)
- 當線程池的線程數量大于等于核心線程數時
- 優先使用空閑線程執行(即 可以實時放入緩沖隊列,即 放入後不超過緩沖隊列的大小)
- 再嘗試擴張線程池,建立線程執行(即 線程池的線程數小于最大線程數)
- 擴張失敗後,調用拒絕政策(即 線程池的線程數大于等于最大線程數)
-
線程逾時政策
本線程逾時後,
* 若核心線程允許關閉,則關閉本線程
* 若核心線程不允許關閉 并 目前線程數大于核心線程數,則關閉本線程
- 任務執行政策
- 線程池的關閉
- shutdown()會将線程池狀态置為SHUTDOWN,不再接受新的任務,同時會等待線程池中已有的任務執行完成再結束。
- shutdownNow()會将線程池狀态置為SHUTDOWN,對所有線程執行interrupt()操作,清空隊列,并将隊列中的任務傳回回來。
使用
- 根據具體場景構造合适的線程池。應使用ThreadPoolExecutor構造器構造線程池,而不是Executor的工廠方法,因為使用ThreadPoolExecutor構造器可以更清楚的知道線程池的細節,并且可以定制參數。
- 任務的送出
- execute()送出無傳回結果的任務
- submit()送出有傳回結果的任務
- 注意線程池的關閉
- shutdown()會将線程池狀态置為SHUTDOWN,不再接受新的任務,同時會等待線程池中已有的任務執行完成再結束。
- shutdownNow()會将線程池狀态置為SHUTDOWN,對所有線程執行interrupt()操作,清空隊列,并将隊列中的任務傳回回來。
并發程式設計9 Java線程池 ThreadPoolExector幹貨部落格思想原理JDK中的線程池
- 使用鈎子方法定制功能,增強ThreadPoolExector。比如內建日志或者監控
// ThreadPoolExector的鈎子方法
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
原理
源碼的簡要流程
隻關注主幹邏輯,忽略加鎖,異常和線程池關閉等分支
- 構造一個線程池,隻是設定線程池參數,沒做其他處理。
- 使用線程池執行 任務
- 目前線程數小于核心線程數,則開啟新線程并執行任務
- 當超過核心線程數時,
- 先使用offer方法 将任務送出到 任務緩沖-阻塞隊列,直接傳回成功或失敗。(注意不是阻塞目前線程等待消費)
- 送出成功,即在 任務緩沖-阻塞隊列 中等待 空閑線程消費
- 送出失敗
- 在不大于最大線程數的條件下,啟動線程并執行任務
- 若啟動線程失敗,即超過最大線程數等情況,則調用拒絕政策,拒絕任務。
- 在不大于最大線程數的條件下,啟動線程并執行任務
- 先使用offer方法 将任務送出到 任務緩沖-阻塞隊列,直接傳回成功或失敗。(注意不是阻塞目前線程等待消費)
- 線程(這裡指的是Worker)的邏輯
- 線程啟動後,會立即執行第一個任務
-
執行完第一個任務後,會不斷從緩存隊列中擷取任務,并執行,若沒有擷取到任務則終止線程
擷取任務的邏輯 :
- 如果 線程數大于核心線程數 或 核心線程允許逾時銷毀,則使用帶有逾時時間poll方法“阻塞地”從緩存隊列中擷取任務,
- 如果逾時即擷取不到任務
- 否則使用take方法“阻塞地”從緩存隊列中擷取任務
- 如果 線程數大于核心線程數 或 核心線程允許逾時銷毀,則使用帶有逾時時間poll方法“阻塞地”從緩存隊列中擷取任務,
-
關閉線程池:
未完待續…
核心資料結構
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 存放任務的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
// 對線程池内部各種變量進行互斥通路控制
private final ReentrantLock mainLock = new ReentrantLock();
// 線程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//...
}
每一個線程是一個Worker對象。Worker是ThreadPoolExector的内部類,核心資料結構如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ...
final Thread thread; // Worker封裝的線程
Runnable firstTask; // Worker接收到的第1個任務
volatile long completedTasks; // Worker執行完畢的任務個數
// ...
}
由定義會發現,Worker繼承于AQS,也就是說Worker本身就是一把鎖。
這把鎖有什麼用處呢?用于線程池的關閉、線程執行任務的過程中。
構造器,即配置參數
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
構造方法有7個參數
- corePoolSize,線程池中的核心線程數
- maximumPoolSize,線程池中的最大線程數
- keepAliveTime,空閑時間,當線程池數量超過核心線程數時,多餘的空閑線程存活的時間,即:這些線程多久被銷毀。
- unit,空閑時間的機關,可以是毫秒、秒、分鐘、小時和天,等等
- workQueue,等待隊列,線程池中的線程數超過核心線程數時,任務将放在等待隊列,它是一個BlockingQueue類型的對象
- threadFactory,線程工廠,我們可以使用它來建立一個線程
- handler,拒絕政策,當線程池和等待隊列都滿了之後,需要通過該對象的回調函數進行回調處理
而我們更關心的是workQueue、threadFactory和handler
-
workQueue
等待隊列是BlockingQueue類型的,理論上隻要是它的子類,我們都可以用來作為等待隊列。
jdk内部自帶一些阻塞隊列
ArrayBlockingQueue,隊列是有界的,基于數組實作的阻塞隊列
LinkedBlockingQueue,隊列可以有界,也可以無界。基于連結清單實作的阻塞隊列
PriorityBlockingQueue,帶優先級的無界阻塞隊列
SynchronousQueue,不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作将一直處于阻塞狀态。
該隊列也是Executors.newCachedThreadPool()的預設隊列
-
threadFactory
ThreadFactory是一個接口,隻有一個方法。用于生産線程。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
可參考預設的線程工廠實作類:DefaultThreadFactory
-
handler:拒絕政策
jdk自帶4種拒絕政策,我們來看看。
AbortPolicy // 直接抛出RejectedExecutionException異常,預設的
DiscardPolicy // 任務直接丢棄,不做任何處理
DiscardOldestPolicy // 丢棄隊列裡最舊的那個任務,再嘗試執行目前任務
CallerRunsPolicy // 在調用者線程執行
我們可以通過實作RejectedExecutionHandler接口的方式,定制自己的拒絕政策。
如何制定需要看實際的場景,比如
線上業務:如 秒殺,訂單查詢,下訂單等,時效性要求高,在超過某個時間後,等待的意義不大了。
離線業務:如 短信提醒批處理,時效性要求不高,但是在生産消費不平衡後,很容易積累大量的待處理任務。
任務送出
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果目前線程數小于corePoolSize,則啟動新線程
if (workerCountOf(c) < corePoolSize) {
// 添加Worker,并将command設定為Worker線程的第一個任務開始執行。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果目前的線程數大于或等于corePoolSize,則調用workQueue.offer放入隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程池正在停止,則将command任務從隊列移除,并拒絕command任務請求。
if (! isRunning(recheck) && remove(command))
reject(command);
// 放入隊列中後發現沒有線程執行任務,開啟新線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 線程數大于maxPoolSize,并且隊列已滿,調用拒絕政策
else if (!addWorker(command, false))
reject(command);
}
// 該方法用于啟動新線程。boolean core代表是否是核心線程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 如果線程池狀态值起碼是SHUTDOWN和STOP,或則第一個任務不是null,或者工作隊列為空
// 則添加worker失敗,傳回false
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// 工作線程數達到上限,要麼是corePoolSize要麼是maximumPoolSize,啟動線程失敗
if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 增加worker數量成功,傳回到retry語句
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果線程池運作狀态起碼是SHUTDOWN,則重試retry标簽語句,CAS
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// worker數量加1成功後,接着運作:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 建立worker對象
w = new Worker(firstTask);
// 擷取線程對象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c)
|| (runStateLessThan(c, STOP) && firstTask == null)) {
// 由于線程已經在運作中,無法啟動,抛異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将線程對應的worker加入worker集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
// 如果添加worker成功,則啟動該worker對應的線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果啟動新線程失敗
if (! workerStarted)
// workCount - 1
addWorkerFailed(w);
}
return workerStarted;
}
線程池關閉
未完待續…