前置工作
一般我們使用的java線程池的大多都是由ThreadPoolExecutor所生成的。
類結構圖如下:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczX0xiRGZkRGZ0Xy9GbvNGL2EzXlpXazxSP9cXT5VkeaVHbXNGcwdFZwhnMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL4UTN5QzNwITM4IDNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
在說源碼前,先說明幾個概念,這幾個概念不弄明白,那源碼也很難看懂。
首先是線程池的狀态和數量線上程池中是如何表示的
ThreadPoolExecutor 使用 int 的高 3 位來表示線程池狀态,低 29 位表示線程數量
線上程池中使用了一個AtomicInteger類型的變量來儲存線程池的狀态和數量
不妨看看源碼是如何定義的:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//初始值為-1
private static final int RUNNING = -1 << COUNT_BITS;//RUNNING轉換成二進制
//就是11100000000000000000000000000000,如果隻看前面三位的話RUNNING的值就是-1
private static final int COUNT_BITS = Integer.SIZE - 3;//COUNT_BITS==29
private static int ctlOf(int rs, int wc) { return rs | wc; }
這個應該還是很好了解的吧,經過計算ctlOf(RUNNING, 0),初始化的ctl其實就是11100000000000000000000000000000,這個值表示的是線程池為RUNNING狀态,0條工作線程。再舉個例子:11100000000000000000000000000011,表示RUNNING狀态,3條工作線程。
ctl的幾個狀态:
//下面的值隻顯示其二進制的前三位
private static final int RUNNING = -1 << COUNT_BITS;//111
private static final int SHUTDOWN = 0 << COUNT_BITS;//000
private static final int STOP = 1 << COUNT_BITS;//001
private static final int TIDYING = 2 << COUNT_BITS;//010
private static final int TERMINATED = 3 << COUNT_BITS;//011
//其中
private static final int COUNT_BITS = Integer.SIZE - 3;//29
好了,接下來就是最有意思的源碼環節了😏
源碼分析
首先從構造器出發
線程池的構造器:
public ThreadPoolExecutor(int corePoolSize,//核心線程數目 (最多保留的線程數)
int maximumPoolSize,//最大線程數目
long keepAliveTime,//生存時間 - 針對救急線程
TimeUnit unit,//時間機關 - 針對救急線程
BlockingQueue<Runnable> workQueue,//阻塞隊列
ThreadFactory threadFactory,//線程工廠 - 可以為線程建立時起個好名字
RejectedExecutionHandler handler) {//拒絕政策
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通過這個構造器,我們就可以建立我們想要的線程池。
構造器有幾個參數
BlockingQueue:這是一個阻塞隊列,如果這個不懂,可以先去了解一下。
篇幅關系,在這就詳細不說明了。
ThreadFactory :線程工廠
就拿DefaultThreadFactory 舉例
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
//構造方法
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
//擷取線程組
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
//線程名稱
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//建立線程
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//設定線程為非守護線程
if (t.isDaemon())
t.setDaemon(false);
//設定線程的優先級,其中Thread.NORM_PRIORITY是5(優先級的範圍是1~10)
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
其實本質上就是建立一個Thread,隻不過多了一些參數設定
RejectedExecutionHandler:拒絕政策,阻塞隊列滿了且線程數量達到maximumPoolSize會采取對應的拒絕政策
拒絕政策,jdk預設實作有4種,拿一個舉例
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
//直接抛異常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
上面就是構造器的介紹
接下來是線程池的任務送出,這也是最核心的方法了
//當我們送出一個任務時所調用的方法
public void execute(Runnable command) {
//任務為空
if (command == null)
throw new NullPointerException();
//擷取ctl(就是我們剛才介紹的那個原子變量,後面不再提示)
int c = ctl.get();
//判斷工作中的線程數量<corePoolSize(核心線程數量)
if (workerCountOf(c) < corePoolSize) {
/*
workerCountOf方法:擷取原子變量後面29位的值,也就是工作線程的數量
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
是以CAPACITY的二進制:00011111111111111111111111111111
*/
//直接将任務添加到工作線程所在的集合中,addWorker()方法後面會有介紹
if (addWorker(command, true))
return;
c = ctl.get();
}
//代碼能到這裡說明工作線程數量>=corePoolSize,那麼就嘗試添加到阻塞隊列中。
//如果線程池是RUNNING狀态并且成功添加到阻塞隊列中
if (isRunning(c) && workQueue.offer(command)) {
/*
isRunning()方法:
private static boolean isRunning(int c) {
return c < SHUTDOWN;//ctl狀态可以看上面的表格
}
*/
//再次擷取ctl,在并發環境下,可能之前擷取的ctl已經發生改變
int recheck = ctl.get();
/*
如果線程池處于!RUNNING狀态,則删除阻塞隊列中剛才添加的任務
(隻有RUNNING狀态才能将任務添加到阻塞隊列中)
其實我覺得這塊做再次校驗的目的是盡量保證當線程池被其它線程shutdown
後,阻塞隊列就不能再添加任務了(按理來說shutdown狀态,阻塞隊列是不能
再添加任務的)
為什麼說它是盡量保證呢,因為它也隻能保證從第一擷取ctl和第二次擷取ctl
之間如果ctl發生改變,能及時做出相應的修改
但如果我在第二次擷取ctl後,也就是int recheck = ctl.get()執行後,
突然線程池變成了shutdown的狀态,那下面的條件就不會成立,
剛剛添加到阻塞隊列中的任務就不會被移除了。當然發生這種情況的機率非常
小,即使發生了,無非就是多執行一個任務。
*/
if (! isRunning(recheck) && remove(command))
//删除成功則說明任務添加失敗,直接走拒接政策
reject(command);
/*
如果線程池已經停止了(不處于RUNNING狀态,但處于SHOTDOWN狀态)
判斷工作中的線程數量是否為0,為0則建立一個空任務
這個是不是感到很疑惑??????我看了當初也是懵了。
我也是看了後面的源碼才煥然大悟:工作中的線程數量為0時,那麼就沒有線程
去阻塞隊列中取任務執行了,那麼我們剛才添加到阻塞隊列的任務就永遠不會
執行。是以我們添加了一個空任務的Worker(Worker可以執行多個任務,後面會說到)。
那為何工作中的線程數量會為0?因為我們可能設定corePoolSize為0(比如
Executors.newCachedThreadPool()),那麼所有線程接
在keepAliveTime時間内擷取不到阻塞隊列中的任務時,就會被回收。最終導
緻線程池中工作的線程數量為0
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//上面if條件不成立,則可能是阻塞隊列滿了,那麼我們直接将任務添加到工作線程集
//合中(工作線程數超過corePoolSize就會将任務添加到阻塞隊列中,阻塞隊列滿了就會
//再次嘗試将任務添加到工作線程中,但也不能超過maximumPoolSize)
else if (!addWorker(command, false))
//超過maximumPoolSize則添加失敗,走拒絕政策
reject(command);
}
接下來是addWorker()方法:
//firstTask是添加的任務,core來判斷是核心線程還是最大線程,即corePoolSize和maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//擷取ctl
int c = ctl.get();
//擷取線程池運作狀态
int rs = runStateOf(c);
/*
判斷線程池是否還在運作
其中rs>=SHUTDOWN表示線程處于非RUNNING狀态
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())
上面這三行條件:是為了保證阻塞隊列的任務在SHUTDOWN狀态下也能被執行(建立空任務的原因)
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/*
下面這段帶代碼的目的是将工作線程的線程數量+1(也就是ctl中的後29位)
*/
for (;;) {
//擷取工作線程的數量
int wc = workerCountOf(c);
/*
工作線程數量超過規定大小則傳回false
條件1:wc >= CAPACITY 判斷工作線程的數量是否超過線程的最大容量
其中CAPACITY的定義:private static final int CAPACITY
= (1 << COUNT_BITS) - 1;
COUNT_BITS的定義private static final int COUNT_BITS =
Integer.SIZE - 3;
經過計算CAPACITY的值為00011111111111111111111111111111
(前面3為用來表示線程狀态),經計算,也就是536,870,912
條件2:wc >= (core ? corePoolSize : maximumPoolSize) 判
斷core是否為核心線程,core是方法參數
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas将ctl的值+1(工作線程數+1),并結束外層循環
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//上面cas失敗,并且兩次讀取的ctl的運作狀态不一緻(說明線程池的運作狀
//态發生了改變),則continue外層循環重試
if (runStateOf(c) != rs)
continue retry;
}
}
/*
上面代碼将ctl中的工作線程數量+1,但隻是數量+1,任務還沒有被執行。
下面這段代碼是将任務添加到工作線程的集合中
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;//這個Worker結構後面會談及到,先看一下大緻結構
/*
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;//任務
volatile long completedTasks;//完成任務的數量
public void run() {
runWorker(this);
}
}
*/
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//擷取線程池運作狀态
int rs = runStateOf(ctl.get());
/*
rs < SHUTDOWN 判斷線程池是否關閉
rs == SHUTDOWN && firstTask == null是為了保證阻塞隊列
的任務在SHUTDOWN狀态下也能被執行(建立空任務的原因)
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加到工作線程的集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//添加成功後修改添加狀态,後面需要
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//判斷任務是否添加成功
if (workerAdded) {
//添加成功則啟動
t.start();//這行很重要,後面會說到
//修改啟動辨別,後面需要
workerStarted = true;
}
}
} finally {
//判斷是否啟動
if (! workerStarted)
//如果啟動失敗,則将剛剛添加到工作線程集合中的任務删除
addWorkerFailed(w);
/*
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);删除工作線程集合中的Worker
decrementWorkerCount();工作線程數量-1
tryTerminate();嘗試将線程池變成TERMINATED狀态
} finally {
mainLock.unlock();
}
}
*/
}
//傳回啟動狀态
return workerStarted;
}
上面我們說完了任務送出的大概流程,但你有沒有想過添加到阻塞隊列中的任務好像并沒有被執行,一個Worker執行完了任務又會怎麼樣?線程池是怎麼做到一個線程可以執行多個任務,而不是執行一個任務就釋放一個線程。還有就是線程池的異常處理,按理來說如果一個異常沒有被捕獲,線程就會挂掉,挂掉後線程池的線程數量就會減小,那麼線程池是怎麼保證線程的數量的呢。帶着問題,我們繼續。
在上面代碼分析中,我們還去沒有仔細去分析Worker這個類,源碼來
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;//任務
this.thread = getThreadFactory().newThread(this);//建立一個線程,
//注意這裡傳遞的任務不是firstTask ,而是this,因為Worker類也是Runnable的
//子類,是以線程啟動時調用的方法是下面的run方法
}
//線程啟動調用的方法
public void run() {
runWorker(this);
}
}
其實這裡用到了一個設計模式,裝飾器模式(你們注意一下Runnable,實作+組合,這不典型的裝飾器模式麼)。
可以看出,一個Worker對應一個Thread
還得我們在execute()方法中說很重要的一行代碼的t.start()嗎?這裡調用的其實是Worker中的run方法,不是firstTask的run方法。
是以t.start()也就是執行下面代碼
public void run() {
runWorker(this);
}
我們再進入runWorker()方法
//這裡我就不逐行解釋了,就說了幾行關鍵部分
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/*
發現沒有,while循環,一個Worker是可以執行多個任務的
task!=null,還記得execute()中的addWorker(null, false)這行代碼嗎?添加一個空任務
那行代碼在這裡就起作用了,如果任務為null,那麼就會去阻塞隊列中取任務
getTask()就是去阻塞隊列中擷取任務,如果沒有任務getTask()會一直阻塞,
除非等待時間超過keepAliveTime或者線程池被終止運作,該方法後面會詳細說到
getTask()後面會詳細說到
*/
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//這個任務才是我們之前execute()方法傳遞過來的任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//Worker收尾工作,後面會詳細說到
processWorkerExit(w, completedAbruptly);
}
}
有了上面的這個方法,問題差不多解決了一半
我們看看getTask()方法到底做了些什麼
private Runnable getTask() {
//線程去阻塞隊列中擷取任務執行,如果阻塞隊列一直沒有任務,則會逾時
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//擷取ctl
int c = ctl.get();
//擷取線程池運作狀态
int rs = runStateOf(c);
/*
如果線程池為STOP、TIDYING、TERMINATED這幾種狀态,則該工作線程應該終止
如果線程為SHUTDOWN狀态并且阻塞隊列為空,則該工作線程也應該終止
簡單說就是如果線程池關閉了,該工作線程就應該終止
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//工作線程數量-1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/*
allowCoreThreadTimeOut:如果為false(預設值),核心線程即使處于空閑狀态也保持活動狀态。如果為true,核心線程使用keepAliveTime逾時等待工作。
private volatile boolean allowCoreThreadTimeOut;
timed可以了解為線程是否能一直處于空閑狀态,線程處于空閑狀态一段時間後會被回收(預設數量超過corePoolSize的線程需要被回收,
如果設定allowCoreThreadTimeOut為true,那麼建立的所有線程都不會被回收)
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
這塊代碼目地是讓那些等待逾時的線程被回收
這裡的第一個條件: wc > maximumPoolSize 我也不知道開發人員為何要這樣寫,
因為wc>maximumPoolSize是不可能發生的,addWorker()方法中已經保證了
在并發環境下wc<=maximumPoolSize。
第二個條件:timed && timedOut 目的是保證那些等待逾時的線程被回收
第三個條件:wc > 1 || workQueue.isEmpty() 保證線程池中至少有一個
線程活動,除非阻塞隊列為空
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
timed為true則線程不能一直處于空閑狀态,等待一定時間後還沒接收到任
務,那麼線程就會被回收
timed為false則線程一直等待阻塞隊列中的任務。篇幅比較大,阻塞隊列
就不進行說明了。
總之一句話poll()隻等待一段的時間,而take()會一直等待
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//如果r不為null,則說明從阻塞隊列中拿到了任務
if (r != null)
return r;
//代碼如果能走到這裡,則r為null,等待逾時
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
最後就是Worker結束所調用的方法
//參數 w:工作線程 completedAbruptly:true表示工作線程異常,false表示工作線程正常執行
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
//如果是異常退出,則認為任務不計數,因為之前執行了incrementWorkerCount,所
//以需要執行decrementWorkerCount
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//線程池完成的總任務
completedTaskCount += w.completedTasks;
//删除工作線程集合中的Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
//嘗試将線程池的狀态改為TERMINATED
tryTerminate();
//擷取ctl
int c = ctl.get();
/*
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
判斷運作狀态是否為SHUTDOWN或RUNNING
*/
if (runStateLessThan(c, STOP)) {
//工作線程正常執行完
if (!completedAbruptly) {
/*
min表示線程池核心線程數量(你也可以了解成corePoolSize)
allowCoreThreadTimeOut:如果為false(預設值),核心線程即使處于
空閑狀态也保持活動狀态。如果為true,核心線程使用keepAliveTime
逾時等待工作。
private volatile boolean allowCoreThreadTimeOut;
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果阻塞隊列中還有任務,則核心線程數量不能為0
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果工作線程數量>=min(線程池核心線程數量),則直接傳回,否則還會有
//最後一行代碼的執行,因為要保證工作中線程數量>=corePoolSize
if (workerCountOf(c) >= min)
return; // replacement not needed
}
/*
添加一個新線程worker,這裡worker的添加有可能是因為workerCountOf(c) >= min,
也有可能是因為上面條件!completedAbruptly==false(工作線程出現異
常,出現異常我們就需要重新建立一個新的worker,
這也就是為何我們使用線程池時不需要考慮因線程異常而導緻線程數量減小的原因)
*/
addWorker(null, false);
}
}
ThreadPoolExecutor線程池的工作流程大概就是這樣,如果哪裡有誤歡迎評論指出!