轉載請标明出處:
https://blog.csdn.net/bingospunky/article/details/80234457
本文出自馬彬彬的部落格
建立ThreadPoolExecutor
ExecutorService fixedThreadPool = new ThreadPoolExecutor(, , , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
我們使用Executors.newXXX方法建立線程池也是通過上面的new方法進行建立的,隻是傳遞的初始化參數不同罷了。
第1、2個參數是線程池線程的數量,第3、4個參數是當線程沒有runnable執行時,多長時間結束,第5個參數是當runnable多于線程數時,把runnable存放的隊列。
原理
ThreadPoolExecutor有一個private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));參數表示目前執行的線程,ctl是一個32位的整數,AtomicInteger是為了線程安全。這個ctl前3位相當于占位功能,後29位表示目前啟動的線程數量,比如11100000000000000000000000000000表示沒有執行的線程,11100000000000000000000000000001表示目前有一個線程在執行。當添加線程或者線程結束時,會響應的改變ctl的值。
有一個Worker的類,這個Worker繼承AbstractQueuedSynchronizer,實作鎖的功能,在執行runnable時互斥。這個Worker包含一個Thread,Thread的run方法是while循環,不斷的從queue裡擷取runnable執行。
當我們執行的runnable數量 超過線程數+queue.size 時如何
當我們使用Executors.newXXX建立線程池時一般不會遇到這個問題,因為這樣建立的隊列容量是Integer.MAX,當我們想控制線程池的queue容量時,就有可能會遇到超過上限的這種情況。
當出現這種情況時,會抛出異常。execute方法代碼如下:
Code1
ThreadPoolExecutor.ThreadPoolExecutor
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == )
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
我就不自己解釋了,借用一下代碼裡的一段注釋來解釋。
/*
* Proceed in steps:
*
* If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
當我們的runnable出現異常時如何處理的
從使用經驗上來看,我們的runnable方法抛出了異常,後面的runnable依然可以執行。那麼這是如何實作的呢。
首先來看Worker裡的Thread的run方法。
Code2
ThreadPoolExecutor.runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Code2第19行抛出異常後,會執行第37行的processWorkerExit方法,這時completedAbruptly=true,表示這時異常結束的前面的while循環。我們再看一下processWorkerExit方法的代碼。
Code3
ThreadPoolExecutor.processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? : corePoolSize;
if (min == && ! workQueue.isEmpty())
min = ;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
由于completedAbruptly值為true,先執行第3行代碼,改變前文中提到的ctl的值,讓其表示的線程數減1。Code3第8行,remove掉該Worker。會執行Code3第22行代碼,使用runnable=null,添加一個Worker對象,這句話的意思就是啟動一個新的線程,去queue裡擷取runnable執行。
是以,我們自己runnable抛出了異常,是kill掉了執行它的線程,并且補充一個新的線程。
ThreadPoolExecutor構造方法裡的第3、4個方法如何生效
ThreadPoolExecutor第3、4個方法表示當線程空閑多久後kill。
這個線程kill是針對多餘core線程數生效的,或則我們可以設定對于所有線程生效。如果設定對于所有線程生效,我們需要執行ThreadPoolExecutor.allowCoreThreadTimeOut(true)設定。
那麼這是如何生效的呢。我們還是來看Code2的代碼。該線程一直在while裡執行,當它跳出while時,就是線程結束的條件。跳出while的條件就是Code2第8行的getTask方法傳回null。接下來看getTask方法。
Code4
ThreadPoolExecutor.getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
Code4第13行,當allowCoreThreadTimeOut或者當先線程數超過core線程數時,設定timed=true。Code第21行,當timed==true時,queue擷取runnable時阻塞我們設定的時間。這樣來實作當線程空閑某一段時間後,線程kill的。
補充
ThreadPoolExecutor記錄線程數量等使用的是一個AtomicInteger類型的變量ctl,把這個變量前3位和後29位分開使用的,使用了位運算,下面的代碼方面我們檢視資料的變化。
System.out.println(- & );
System.out.println(- & ~);
System.out.println(Integer.toBinaryString(-));
System.out.println(Integer.toBinaryString(-));
System.out.println(Integer.toBinaryString());
System.out.println(Integer.toBinaryString());
System.out.println(Integer.toBinaryString(-));
System.out.println(Integer.toBinaryString(-));
System.out.println(Integer.toBinaryString(- << ));
System.out.println(Integer.toBinaryString((- << ) | ));