一.interface Executor 最頂層接口
1.1
void execute(Runnable command);
二.class Executors 為這些 Executor 提供了便捷的工廠方法。
1.newFixedThreadPool 固定個數的線程池
public static ExecutorService newFixedThreadPool(intnThreads) {//1.1 調用ThreadPoolExecutor
return newThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());//1.2 使用linkedBlockingQueue
}
1.1 調用ThreadPoolExecutor()構造方法
//1.1.1 參數說明
public ThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnit unit,
BlockingQueueworkQueue) {
//1.1.2 調用另一個構造方法
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
1.1.1 參數說明*
corePoolSize 核心線程數
maximumPoolSize 最大線程數
keepAliveTime 線程存活時間
unit
BlockingQueue 隊列
1.1.2 調用基礎的構造方法
public ThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnit unit,
BlockingQueueworkQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize<= 0 ||maximumPoolSize< corePoolSize ||keepAliveTime< 0)throw newIllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw newNullPointerException();this.acc = System.getSecurityManager() == null ?
null:
AccessController.getContext();this.corePoolSize =corePoolSize;this.maximumPoolSize =maximumPoolSize;this.workQueue =workQueue;this.keepAliveTime =unit.toNanos(keepAliveTime);this.threadFactory =threadFactory;this.handler =handler;
}
沒什麼好說的,就是一些參數校驗,異常處理。
1.2 linkedBlockingQueue
看名字就知道,這是一個阻塞隊列。 阻塞隊列我會另寫文章,此處僅繼續往下走 不向上挖。
publicLinkedBlockingQueue() {this(Integer.MAX_VALUE);//1.2.1調用LinkedBlockingQueue(int) 構造方法
}
1.2.1調用LinkedBlockingQueue(int) 構造方法
public LinkedBlockingQueue(intcapacity) {if (capacity <= 0) throw newIllegalArgumentException();this.capacity =capacity;
last= head = new Node(null);
}
很明顯,構造了一個最大整型2的32次方-1的隊列
*對于這些參數的含義,我想應該在execute方法中去一探究竟。
先說結論
1.如果正在運作的線程數量小于 corePoolSize,那麼馬上建立線程運作這個任務;
2.如果正在運作的線程數量大于或等于 corePoolSize,那麼将這個任務放入隊列。
3.如果這時候隊列滿了,而且正在運作的線程數量小于 maximumPoolSize,那麼還是要建立線程運作這個任務;(此時隊列裡面的任務還在隊列裡面,也就是跨過了隊列裡面的任務,建立了新的線程運作目前任務)
4.如果隊列滿了,而且正在運作的線程數量大于或等于 maximumPoolSize,那麼線程池會抛出異常,告訴調用者“我不能再接受任務了”。
public voidexecute(Runnable command) {
if (command == null)
throw newNullPointerException();
int c = ctl.get();//1.1
if (workerCountOf(c) < corePoolSize) {//當運作線程數小于corePoolSize
if (addWorker(command, true)) //1.2 addWorker 建立成功則傳回,失敗則繼續
return;
c =ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //1.3 offer() 是向隊列添加一個新元素,當隊列滿傳回false,差別于add()也是添加一個新元素,隊列滿抛出unchecked異常
int recheck = ctl.get();//二次檢查
if (! isRunning(recheck) && remove(command))//1.4recheck>0, remove 删除任務,傳回boolean
reject(command); //拒絕該任務
else if (workerCountOf(recheck) == 0) //1.5 recheck=0時,
addWorker(null, false);
}
else if (!addWorker(command, false))//添加失敗,拒絕
reject(command);
}
1.1 ct1 為-2的29次方, -536870912
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static final int RUNNING = -1 <
private static final int COUNT_BITS = Integer.SIZE - 3;
public static final int SIZE = 32;
1.2
private boolean addWorker(Runnable firstTask, booleancore) {
retry://label标簽,可以在循環嵌套時使用,可以在内層循環直接跳出外層循環
for(;;) {
int c =ctl.get();
int rs =runStateOf(c);
//Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&firstTask == null &&
! workQueue.isEmpty())) //檢查隊列是否為空
return false;
for(;;) {
int wc =workerCountOf(c);
if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))//目前運作線程大于等于容量,或者根據core,判斷大于等于核心線程數or最大線程數
return false;
if(compareAndIncrementWorkerCount(c))
breakretry;
c = ctl.get(); //Re-read ctl
if (runStateOf(c) !=rs)
continueretry;
//else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try{
w = newWorker(firstTask);
final Thread t =w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try{
//Recheck while holding lock.
//Back out on ThreadFactory failure or if
//shut down before lock acquired.
int rs =runStateOf(ctl.get());
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) //precheck that t is startable
throw newIllegalThreadStateException();
workers.add(w);
int s =workers.size();
if (s >largestPoolSize)
largestPoolSize =s;
workerAdded = true;
}
} finally{
mainLock.unlock();
}
if(workerAdded) {
t.start();
workerStarted = true;
}
}
} finally{
if (!workerStarted)
addWorkerFailed(w);
}
returnworkerStarted;
}
1.3 ctl.get()<0 且隊列能成功添加一個新任務
private static final int SHUTDOWN = 0 <
private static boolean isRunning(intc) {
return c
}
1.4 ctl.get>0 且隊列能删除改任務 則拒絕任務
1.5 ctl ==0 , addWorker(null, false)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int workerCountOf(int c) { return c & CAPACITY; }
2.newSingleThreadExecutor corePoolSize 和maximumPoolSize都為1 的線程池
public staticExecutorService newSingleThreadExecutor() {return newFinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));
}
3.newCachedThreadPool corePoolSize 為0,maximumPoolSize都為最大整型數的線程池,即都放在隊列中
public staticExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue());//注意此處為SynchronousQueue,看源碼其實是建立了TransferStack()
}
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(intcorePoolSize) {return newScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(intcorePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,newDelayedWorkQueue());//注意此處為DelayedWorkQueue,初始容量為16的阻塞延遲隊列?
}
三.class ThreadPoolExecutor extends AbstractExecutorService 提供一個可擴充的線程池實作
四.interface ExecutorService extends Executor 更廣泛的接口
五.abstract class AbstractExecutorService extends ExecutorService