天天看點

并發程式設計9 Java線程池 ThreadPoolExector幹貨部落格思想原理JDK中的線程池

幹貨部落格

https://www.jianshu.com/p/389b58856894

思想

池化技術

池化技術應用:線程池、資料庫連接配接池、http連接配接池等等。

池化技術的思想主要是為了減少每次擷取資源的消耗,提高對資源的使用率。

通過将資源池化,統一管理,可以友善管控。例如提供資源監控,排程政策等能力

線程池的好處

降低資源消耗:通過重複利用已建立的線程降低線程建立和銷毀造成的消耗。

提高響應速度:當任務到達時,可以不需要等待線程建立就能立即執行。

提高線程的可管理性:線程是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定,監控和調優。

原理

下圖所示為線程池的實作原理:調用方不斷地向線程池中送出任務;線程池中有一組線程,不斷地從隊列中取任務,這是一個典型的生産者—消費者模型

并發程式設計9 Java線程池 ThreadPoolExector幹貨部落格思想原理JDK中的線程池

設計的關鍵點,即排程政策制定

  1. 隊列設定多長?如果是無界的,調用方不斷地往隊列中放任務,可能導緻記憶體耗盡。如果是有界的,當隊列滿了之後,調用方如何處理?
  2. 線程池中的線程個數是固定的,還是動态變化的?
  3. 每次送出新任務,是放入隊列?還是開新線程?
  4. 當沒有任務的時候,線程是睡眠一小段時間?還是進入阻塞?如果進入阻塞,如何喚醒?

JDK的實作

ThreadPoolExector/ScheduledThreadPoolExecutor

  • 基于阻塞隊列實作(即設計關鍵點:4)
  • 通過構造方法入參,讓我們定制具體的排程政策(即設計關鍵點:1,2,3)

JDK中的線程池

類圖

并發程式設計9 Java線程池 ThreadPoolExector幹貨部落格思想原理JDK中的線程池

種類

  • ThreadPoolExector:線程池
  • ScheduledThreadPoolExecutor:定時排程的線程池

ThreadPoolExector

概念與邏輯模型

本節的描述不是很嚴謹,适宜快速了解

概念
  • 核心線程是否允許關閉:預設為否
  • 核心線程數:若核心線程不允許關閉,則核心線程為線程池的最小線程數量,會一直存活。若核心線程允許關閉,則線程池最小線程數可以為0,不受核心線程數控制。
  • 最大線程數數:線程池中同時線程的最大上限。
  • 線程逾時時間:當 本線程超過逾時時間,若 (核心線程不允許關閉 并 線程數量大于核心線程數)或 核心線程允許關閉,本線程會被關閉
  • 任務緩沖隊列:是一個阻塞隊列,目前線程數大于核心線程數時,實時緩沖(展現在實時放任務)一定數量的(即緩沖隊列的大小)任務。如果此時有空閑線程,則空閑線程直接執行任務(這個就是阻塞取任務的意義)。若不能實時緩沖(即放入任務後會超過緩沖隊列的大小),就嘗試建立任務執行。
模型
  • 線程池中有 核心線程數,最大線程數,線程保證存活時間,核心線程是否允許關閉,線程集合,任務緩沖-阻塞隊列,拒絕政策 等 要素
  • 線程池負責排程線程
    • 注意線上程池中的線程都是等同的,并不區分某個線程是否是核心線程,
    • 線程池 是 依據 存活的線程數量 和 線程是否空閑,選擇适宜的排程政策(即執行任務的方式)。
排程政策
  1. 線程池初始化時不初始化線程
  2. 我們将任務送出到線程池時,線程池的排程政策
    1. 任務執行政策
      • 當線程池的線程數量小于核心線程數時,建立線程執行(即使有空閑線程也不用)
      • 當線程池的線程數量大于等于核心線程數時
        • 優先使用空閑線程執行(即 可以實時放入緩沖隊列,即 放入後不超過緩沖隊列的大小)
        • 再嘗試擴張線程池,建立線程執行(即 線程池的線程數小于最大線程數)
        • 擴張失敗後,調用拒絕政策(即 線程池的線程數大于等于最大線程數)
    2. 線程逾時政策

      本線程逾時後,

      * 若核心線程允許關閉,則關閉本線程

      * 若核心線程不允許關閉 并 目前線程數大于核心線程數,則關閉本線程

  3. 線程池的關閉
    • shutdown()會将線程池狀态置為SHUTDOWN,不再接受新的任務,同時會等待線程池中已有的任務執行完成再結束。
    • shutdownNow()會将線程池狀态置為SHUTDOWN,對所有線程執行interrupt()操作,清空隊列,并将隊列中的任務傳回回來。

使用

  1. 根據具體場景構造合适的線程池。應使用ThreadPoolExecutor構造器構造線程池,而不是Executor的工廠方法,因為使用ThreadPoolExecutor構造器可以更清楚的知道線程池的細節,并且可以定制參數。
  2. 任務的送出
    • execute()送出無傳回結果的任務
    • submit()送出有傳回結果的任務
  3. 注意線程池的關閉
    • shutdown()會将線程池狀态置為SHUTDOWN,不再接受新的任務,同時會等待線程池中已有的任務執行完成再結束。
    • shutdownNow()會将線程池狀态置為SHUTDOWN,對所有線程執行interrupt()操作,清空隊列,并将隊列中的任務傳回回來。
      并發程式設計9 Java線程池 ThreadPoolExector幹貨部落格思想原理JDK中的線程池
  4. 使用鈎子方法定制功能,增強ThreadPoolExector。比如內建日志或者監控
// ThreadPoolExector的鈎子方法
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
           

原理

源碼的簡要流程

隻關注主幹邏輯,忽略加鎖,異常和線程池關閉等分支

  1. 構造一個線程池,隻是設定線程池參數,沒做其他處理。
  2. 使用線程池執行 任務
    • 目前線程數小于核心線程數,則開啟新線程并執行任務
    • 當超過核心線程數時,
      • 先使用offer方法 将任務送出到 任務緩沖-阻塞隊列,直接傳回成功或失敗。(注意不是阻塞目前線程等待消費)
        • 送出成功,即在 任務緩沖-阻塞隊列 中等待 空閑線程消費
        • 送出失敗
          • 在不大于最大線程數的條件下,啟動線程并執行任務
            • 若啟動線程失敗,即超過最大線程數等情況,則調用拒絕政策,拒絕任務。
  3. 線程(這裡指的是Worker)的邏輯
    • 線程啟動後,會立即執行第一個任務
    • 執行完第一個任務後,會不斷從緩存隊列中擷取任務,并執行,若沒有擷取到任務則終止線程

      擷取任務的邏輯 :

      • 如果 線程數大于核心線程數 或 核心線程允許逾時銷毀,則使用帶有逾時時間poll方法“阻塞地”從緩存隊列中擷取任務,
        • 如果逾時即擷取不到任務
      • 否則使用take方法“阻塞地”從緩存隊列中擷取任務
  4. 關閉線程池:

    未完待續…

核心資料結構
public class ThreadPoolExecutor extends AbstractExecutorService {
	//...
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	// 存放任務的阻塞隊列
	private final BlockingQueue<Runnable> workQueue;
	// 對線程池内部各種變量進行互斥通路控制
	private final ReentrantLock mainLock = new ReentrantLock();
	// 線程集合
	private final HashSet<Worker> workers = new HashSet<Worker>();
	//...
}
           

每一個線程是一個Worker對象。Worker是ThreadPoolExector的内部類,核心資料結構如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	// ...
	final Thread thread; // Worker封裝的線程
	Runnable firstTask; // Worker接收到的第1個任務
	volatile long completedTasks; // Worker執行完畢的任務個數
	// ...
}
           

由定義會發現,Worker繼承于AQS,也就是說Worker本身就是一把鎖。

這把鎖有什麼用處呢?用于線程池的關閉、線程執行任務的過程中。

構造器,即配置參數
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler){
       if (corePoolSize < 0 ||
           maximumPoolSize <= 0 ||
           maximumPoolSize < corePoolSize ||
           keepAliveTime < 0)
           throw new IllegalArgumentException();
       if (workQueue == null || threadFactory == null || handler == null)
           throw new NullPointerException();
       this.corePoolSize = corePoolSize;
       this.maximumPoolSize = maximumPoolSize;
       this.workQueue = workQueue;
       this.keepAliveTime = unit.toNanos(keepAliveTime);
       this.threadFactory = threadFactory;
       this.handler = handler;
   }
           

構造方法有7個參數

  1. corePoolSize,線程池中的核心線程數
  2. maximumPoolSize,線程池中的最大線程數
  3. keepAliveTime,空閑時間,當線程池數量超過核心線程數時,多餘的空閑線程存活的時間,即:這些線程多久被銷毀。
  4. unit,空閑時間的機關,可以是毫秒、秒、分鐘、小時和天,等等
  5. workQueue,等待隊列,線程池中的線程數超過核心線程數時,任務将放在等待隊列,它是一個BlockingQueue類型的對象
  6. threadFactory,線程工廠,我們可以使用它來建立一個線程
  7. handler,拒絕政策,當線程池和等待隊列都滿了之後,需要通過該對象的回調函數進行回調處理

而我們更關心的是workQueue、threadFactory和handler

  1. workQueue

    等待隊列是BlockingQueue類型的,理論上隻要是它的子類,我們都可以用來作為等待隊列。

    jdk内部自帶一些阻塞隊列

    ArrayBlockingQueue,隊列是有界的,基于數組實作的阻塞隊列

    LinkedBlockingQueue,隊列可以有界,也可以無界。基于連結清單實作的阻塞隊列

    PriorityBlockingQueue,帶優先級的無界阻塞隊列

    SynchronousQueue,不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作将一直處于阻塞狀态。

    該隊列也是Executors.newCachedThreadPool()的預設隊列

  2. threadFactory

    ThreadFactory是一個接口,隻有一個方法。用于生産線程。

    public interface ThreadFactory {

    Thread newThread(Runnable r);

    }

    可參考預設的線程工廠實作類:DefaultThreadFactory

  3. handler:拒絕政策

    jdk自帶4種拒絕政策,我們來看看。

    AbortPolicy // 直接抛出RejectedExecutionException異常,預設的

    DiscardPolicy // 任務直接丢棄,不做任何處理

    DiscardOldestPolicy // 丢棄隊列裡最舊的那個任務,再嘗試執行目前任務

    CallerRunsPolicy // 在調用者線程執行

    我們可以通過實作RejectedExecutionHandler接口的方式,定制自己的拒絕政策。

    如何制定需要看實際的場景,比如

    線上業務:如 秒殺,訂單查詢,下訂單等,時效性要求高,在超過某個時間後,等待的意義不大了。

    離線業務:如 短信提醒批處理,時效性要求不高,但是在生産消費不平衡後,很容易積累大量的待處理任務。

任務送出
public void execute(Runnable command) {
	if (command == null)
		throw new NullPointerException();
	int c = ctl.get();
	// 如果目前線程數小于corePoolSize,則啟動新線程
	if (workerCountOf(c) < corePoolSize) {
		// 添加Worker,并将command設定為Worker線程的第一個任務開始執行。
		if (addWorker(command, true))
			return;
		c = ctl.get();
	} 
	// 如果目前的線程數大于或等于corePoolSize,則調用workQueue.offer放入隊列
	if (isRunning(c) && workQueue.offer(command)) {
		int recheck = ctl.get();
		// 如果線程池正在停止,則将command任務從隊列移除,并拒絕command任務請求。
		if (! isRunning(recheck) && remove(command))
			reject(command);
		// 放入隊列中後發現沒有線程執行任務,開啟新線程
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	} 
	// 線程數大于maxPoolSize,并且隊列已滿,調用拒絕政策
	else if (!addWorker(command, false))
		reject(command);
} 
	
// 該方法用于啟動新線程。boolean core代表是否是核心線程。
private boolean addWorker(Runnable firstTask, boolean core) {
	retry:
	for (int c = ctl.get();;) {
		// 如果線程池狀态值起碼是SHUTDOWN和STOP,或則第一個任務不是null,或者工作隊列為空
		// 則添加worker失敗,傳回false
		if (runStateAtLeast(c, SHUTDOWN)
			&& (runStateAtLeast(c, STOP)
				|| firstTask != null
				|| workQueue.isEmpty()))
			return false;
		for (;;) {
			// 工作線程數達到上限,要麼是corePoolSize要麼是maximumPoolSize,啟動線程失敗
			if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
				return false;
			// 增加worker數量成功,傳回到retry語句
			if (compareAndIncrementWorkerCount(c))
				break retry;
			c = ctl.get(); // Re-read ctl
			// 如果線程池運作狀态起碼是SHUTDOWN,則重試retry标簽語句,CAS
			if (runStateAtLeast(c, SHUTDOWN))
				continue retry;
			// else CAS failed due to workerCount change; retry inner loop
		}
	} 
	// worker數量加1成功後,接着運作:
	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
		// 建立worker對象
		w = new Worker(firstTask);
		// 擷取線程對象
		final Thread t = w.thread;
		if (t != null) {
			final ReentrantLock mainLock = this.mainLock;
			// 加鎖
			mainLock.lock();
			try {
				// Recheck while holding lock.
				// Back out on ThreadFactory failure or if
				// shut down before lock acquired.
				int c = ctl.get();
				if (isRunning(c) 
					|| (runStateLessThan(c, STOP) && firstTask == null)) {
					// 由于線程已經在運作中,無法啟動,抛異常
					if (t.isAlive()) // precheck that t is startable
						throw new IllegalThreadStateException();
					// 将線程對應的worker加入worker集合
					workers.add(w);
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
						workerAdded = true;
					}
				} finally {
					// 釋放鎖
					mainLock.unlock();
				} 
				// 如果添加worker成功,則啟動該worker對應的線程
				if (workerAdded) {
					t.start();
					workerStarted = true;
				}
			}
		} finally {
			// 如果啟動新線程失敗
			if (! workerStarted)
			// workCount - 1
			addWorkerFailed(w);
		} 
		return workerStarted;
	}
           

線程池關閉

未完待續…