天天看點

java多線程源碼_java多線程源碼解析

一.interface Executor 最頂層接口

1.1

void execute(Runnable command);

二.class Executors 為這些 Executor 提供了便捷的工廠方法。

1.newFixedThreadPool  固定個數的線程池

public static ExecutorService newFixedThreadPool(intnThreads) {//1.1 調用ThreadPoolExecutor

return newThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());//1.2 使用linkedBlockingQueue

}

1.1 調用ThreadPoolExecutor()構造方法

//1.1.1 參數說明

public ThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,

TimeUnit unit,

BlockingQueueworkQueue) {

//1.1.2 調用另一個構造方法

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }

1.1.1 參數說明*

corePoolSize 核心線程數

maximumPoolSize 最大線程數

keepAliveTime 線程存活時間

unit

BlockingQueue 隊列

1.1.2 調用基礎的構造方法

public ThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,

TimeUnit unit,

BlockingQueueworkQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize<= 0 ||maximumPoolSize< corePoolSize ||keepAliveTime< 0)throw newIllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw newNullPointerException();this.acc = System.getSecurityManager() == null ?

null:

AccessController.getContext();this.corePoolSize =corePoolSize;this.maximumPoolSize =maximumPoolSize;this.workQueue =workQueue;this.keepAliveTime =unit.toNanos(keepAliveTime);this.threadFactory =threadFactory;this.handler =handler;

}

沒什麼好說的,就是一些參數校驗,異常處理。

1.2 linkedBlockingQueue

看名字就知道,這是一個阻塞隊列。 阻塞隊列我會另寫文章,此處僅繼續往下走 不向上挖。

publicLinkedBlockingQueue() {this(Integer.MAX_VALUE);//1.2.1調用LinkedBlockingQueue(int) 構造方法

}

1.2.1調用LinkedBlockingQueue(int) 構造方法

public LinkedBlockingQueue(intcapacity) {if (capacity <= 0) throw newIllegalArgumentException();this.capacity =capacity;

last= head = new Node(null);

}

很明顯,構造了一個最大整型2的32次方-1的隊列

*對于這些參數的含義,我想應該在execute方法中去一探究竟。

先說結論

1.如果正在運作的線程數量小于 corePoolSize,那麼馬上建立線程運作這個任務;

2.如果正在運作的線程數量大于或等于 corePoolSize,那麼将這個任務放入隊列。

3.如果這時候隊列滿了,而且正在運作的線程數量小于 maximumPoolSize,那麼還是要建立線程運作這個任務;(此時隊列裡面的任務還在隊列裡面,也就是跨過了隊列裡面的任務,建立了新的線程運作目前任務)

4.如果隊列滿了,而且正在運作的線程數量大于或等于 maximumPoolSize,那麼線程池會抛出異常,告訴調用者“我不能再接受任務了”。

public voidexecute(Runnable command) {

if (command == null)

throw newNullPointerException();

int c = ctl.get();//1.1

if (workerCountOf(c) < corePoolSize) {//當運作線程數小于corePoolSize

if (addWorker(command, true)) //1.2 addWorker 建立成功則傳回,失敗則繼續

return;

c =ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) { //1.3 offer() 是向隊列添加一個新元素,當隊列滿傳回false,差別于add()也是添加一個新元素,隊列滿抛出unchecked異常

int recheck = ctl.get();//二次檢查

if (! isRunning(recheck) && remove(command))//1.4recheck>0, remove 删除任務,傳回boolean

reject(command); //拒絕該任務

else if (workerCountOf(recheck) == 0) //1.5 recheck=0時,

addWorker(null, false);

}

else if (!addWorker(command, false))//添加失敗,拒絕

reject(command);

}

1.1 ct1 為-2的29次方, -536870912

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int ctlOf(int rs, int wc) { return rs | wc; }

private static final int RUNNING = -1 <

private static final int COUNT_BITS = Integer.SIZE - 3;

public static final int SIZE = 32;

1.2

private boolean addWorker(Runnable firstTask, booleancore) {

retry://label标簽,可以在循環嵌套時使用,可以在内層循環直接跳出外層循環

for(;;) {

int c =ctl.get();

int rs =runStateOf(c);

//Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&firstTask == null &&

! workQueue.isEmpty())) //檢查隊列是否為空

return false;

for(;;) {

int wc =workerCountOf(c);

if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))//目前運作線程大于等于容量,或者根據core,判斷大于等于核心線程數or最大線程數

return false;

if(compareAndIncrementWorkerCount(c))

breakretry;

c = ctl.get(); //Re-read ctl

if (runStateOf(c) !=rs)

continueretry;

//else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try{

w = newWorker(firstTask);

final Thread t =w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try{

//Recheck while holding lock.

//Back out on ThreadFactory failure or if

//shut down before lock acquired.

int rs =runStateOf(ctl.get());

if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) //precheck that t is startable

throw newIllegalThreadStateException();

workers.add(w);

int s =workers.size();

if (s >largestPoolSize)

largestPoolSize =s;

workerAdded = true;

}

} finally{

mainLock.unlock();

}

if(workerAdded) {

t.start();

workerStarted = true;

}

}

} finally{

if (!workerStarted)

addWorkerFailed(w);

}

returnworkerStarted;

}

1.3 ctl.get()<0 且隊列能成功添加一個新任務

private static final int SHUTDOWN = 0 <

private static boolean isRunning(intc) {

return c

}

1.4 ctl.get>0 且隊列能删除改任務 則拒絕任務

1.5  ctl ==0 , addWorker(null, false)

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

private static int workerCountOf(int c) { return c & CAPACITY; }

2.newSingleThreadExecutor  corePoolSize 和maximumPoolSize都為1 的線程池

public staticExecutorService newSingleThreadExecutor() {return newFinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));

}

3.newCachedThreadPool  corePoolSize 為0,maximumPoolSize都為最大整型數的線程池,即都放在隊列中

public staticExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue());//注意此處為SynchronousQueue,看源碼其實是建立了TransferStack()

}

4.newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(intcorePoolSize) {return newScheduledThreadPoolExecutor(corePoolSize);

}

public ScheduledThreadPoolExecutor(intcorePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,newDelayedWorkQueue());//注意此處為DelayedWorkQueue,初始容量為16的阻塞延遲隊列?

}

三.class ThreadPoolExecutor extends AbstractExecutorService 提供一個可擴充的線程池實作

四.interface ExecutorService extends Executor  更廣泛的接口

五.abstract class AbstractExecutorService extends ExecutorService