1.ThreadPoolExcuter原理說明
首先我們要知道為什麼要使用ThreadPoolExcuter,具體可以看看文檔中的說明:
線程池可以解決兩個不同問題:由于減少了每個任務的調用開銷,在執行大量的異步任務時,它通常能夠提供更好的性能,并且還可以提供綁定和管理資源(包括執行集合任務時使用的線程)的方法。每個 ThreadPoolExecutor還維護着一些基本的統計資料,如完成的任務數。
線程池做的其實可以看得很簡單,其實就是把你送出的任務(task)進行排程管理運作,但這個排程的過程以及其中的狀态控制是比較複雜的。
2.初始化參數介紹
可以直接看最完整的ThreadPoolExcuter的初始化函數:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
逐個介紹如下:
corePoolSize:核心線程數,在ThreadPoolExcutor中有一個與它相關的配置:allowCoreThreadTimeOut(預設為false),當allowCoreThreadTimeOut為false時,核心線程會一直存活,哪怕是一直空閑着。而當allowCoreThreadTimeOut為true時核心線程空閑時間超過keepAliveTime時會被回收。
maximumPoolSize:最大線程數,線程池能容納的最大線程數,當線程池中的線程達到最大時,此時添加任務将會采用拒絕政策,預設的拒絕政策是抛出一個運作時錯誤(RejectedExecutionException)。值得一提的是,當初始化時用的工作隊列為LinkedBlockingDeque時,這個值将無效。
keepAliveTime:存活時間,當非核心空閑超過這個時間将被回收,同時空閑核心線程是否回收受allowCoreThreadTimeOut影響。
unit:keepAliveTime的機關。
workQueue:任務隊列,常用有三種隊列,即SynchronousQueue,LinkedBlockingDeque(無界隊列),ArrayBlockingQueue(有界隊列)。
threadFactory:線程工廠,ThreadFactory是一個接口,用來建立worker。通過線程工廠可以對線程的一些屬性進行定制。預設直接建立線程。
RejectedExecutionHandler:也是一個接口,隻有一個方法,當線程池中的資源已經全部使用,添加新線程被拒絕時,會調用RejectedExecutionHandler的rejectedExecution法。
預設是抛出一個運作時異常。
這麼多參數看起來好像很複雜,是以Java貼心得為我們準備了便捷的API,即可以直接用Executors建立各種線程池。分别是:
//建立一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則建立線程。
//通過設定corePoolSize為0,而maximumPoolSize為Integer.Max_VALUE(Int型資料最大值)實作。
ExecutorService cache = Executors.newCachedThreadPool();
//建立一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。
//通過将corePoolSize和maximumPoolSize的值設定為一樣的值來實作。
ExecutorService fixed = Executors.newFixedThreadPool(num);
//建立一個定長線程池,支援定時及周期性任務執行。
//通過将隊列參數workQueue設定為DelayWorkQueue來實作。
ExecutorService schedule = Executors.newScheduledThreadPool(5);
//建立一個單線程化的線程池,它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
//通過将corePoolSize和maximumPoolSize都設定為1來實作。
ExecutorService single = Executors.newSingleThreadExecutor();
這幾個API會根據具體的情況而使用預設定好預設的初始化參數去建立一個ThreadPoolExecutor。
這裡需要做一個額外說明,在ThreadPoolExcuter中,worker和task是有差別的,task是使用者送出的任務,而worker則是用來執行task的線程。在初始化參數中,corePoolSize和maximumPoolSize都是針對worker的,而workQueue是用來存放task的。
3.worker介紹
前面有介紹了一下worker和task的差別,其中task是使用者送出的線程任務,而worker則是ThreadPoolExecutor自己内部實作的一個類了。
具體源碼如下:
/**
* Woker主要維護着運作task的worker的中斷控制資訊,以及其他小記錄。這個類拓展AbstractQueuedSynchronizer
* 而來簡化擷取和釋放每一個任務執行中的鎖。這可以防止中斷那些打算喚醒正在等待其他線程任務的任務,而不是
* 中斷正在運作的任務。我們實作一個簡單的不可重入鎖而不是ReentrantLo,因為我們不想當其調用setCorePoolSize
* 這樣的方法的時候能獲得鎖。
*/
//worker主要是對進行中的任務進行中斷控制,順帶着對其他行為進行記錄
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
//正在跑的線程,如果是null辨別factory失敗
final Thread thread;
/** Initial task to run. Possibly null. */
//初始化一個任務以運作
Runnable firstTask;
/** Per-thread task counter */
//每個線程計數
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
* 用給定的first task和從threadFactory建立
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
//主要調用了runWorker
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//鎖方法
//
protected boolean isHeldExclusively() {
return getState() != 0;
}
//嘗試擷取鎖
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//嘗試釋放鎖
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker其實可以看作進階一點的線程。其中繼承AbstractQueuedSynchronizer主要是為了實作鎖控制。ThreadPoolExecutor會持有并管理Worker,在Worker中firstTask其實就是存放task的,而thread則是存放目前Worker本身的線程。
其中比較重要的就是run方法了,但這個方法其實又是去調用ThreadPoolExecutor裡面的runWorker()方法,具體可以看下一節的介紹。
4.ctl介紹以及運作狀态說明
首先需要介紹線程池有五種運作狀态:
RUNNING(狀态值-1): 接收新任務并處理隊列中的任務
SHUTDOWN(狀态值0): 不接收新任務但會處理隊列中的任務。
STOP(狀态值1): 不接收新任務,不處理隊列中的任務,并中斷正在處理的任務
TIDYING(狀态值2): 所有任務已終止,workerCount為0,處于TIDYING狀态的線程将調用鈎子方法terminated()。
TERMINATED(狀态值3): terminated()方法完成。
然後我們可以看看ThreadPoolExcuter中的ctl這個變量。
ctl是ThreadPoolExcuter中比較有意思的一個實作,它是一個AtomicInteger,這裡不對AtomicInteger多做讨論,隻要知道可以把它看成有原子性的Integer就夠了,其實它具有原子性的原理是使用了CAS的技術,這是一種樂觀鎖的具體實作。
ThreadPoolExcuter是将兩個内部值打包成一個值,即将workerCount和runState(運作狀态)這兩個值打包在一個ctl中,因為runState有5個值,需要3位,是以有3位表示
runState,而其他29位表示為workerCount。
而運作時要擷取其他資料時,隻需要對ctl進行拆包即可。具體這部分代碼如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
//拆包ctl,分别擷取runState和WorkerCount
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
//打包操作
private static int ctlOf(int rs, int wc) { return rs | wc; }
5.拒絕政策
當執行器(Executor)處于終止狀态,或者執行器在max threads和工作隊列都是有界并且處于飽和的時候,新送出的任務會被拒絕。在任一情況下,執行的任務将調用RejectedExecutionHandler的方法rejectedExecution(Runnable, ThreadPoolExecutor)。有以下四種拒絕政策:
1.預設的是ThreadPoolExecutor.AbortPolicy,在這種政策下,處理器會在拒絕後抛出一個運作異常RejectedExecutionException。
2.在ThreadPoolExecutor.CallerRunsPolicy的政策下,線程會調用它直接的execute來運作這個任務。這種方式提供簡單的回報控制機制來減緩新任務送出的速度。
3.在ThreadPoolExecutor.DiscardPolicy政策下,無法執行的任務将被簡單得删除掉。
4.在ThreadPoolExecutor.DiscardOldestPolicy政策下,如果executor沒有處于終止狀态,在工作隊列頭的任務将被删除,然後會重新執行(可能會再次失敗,這會導緻重複這個過程)。
總結:本篇初步介紹了ThreadPoolExcuter的基本原理,解決了什麼問題。而後說明了ThreadPoolExcuter中的初始化參數,對其中的各個參數做初步介紹。再之後介紹ctl變量的作用,并初步介紹了任務送出失敗後的拒絕政策。