天天看點

多線程 java 線程池_Java 多線程之線程池的使用

一、 使用背景

談到Java多線程,我們很自然的會想到并發,在編寫多線程代碼時,我們一般會建立多個線程,如果并發的線程數量很多,而且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁的進行線程的建立會降低系統的效率,因為頻繁建立和銷毀線程是需要時間的。

那麼有沒有一種辦法可以使得線程可以複用,就是執行完一個任務之後,線程不被銷毀,而是用來執行其他任務呢?答:線程池

二、 線程池核心類:ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類。下面我們來看一下ThreadPoolExecutor類的具體實作源碼。

在ThreadPoolExecutor類中提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {

.....

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

BlockingQueue workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

BlockingQueue workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

BlockingQueue workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

...

}

下面解釋構造函數裡面每個參數的意義:

corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實作原理有非常大的關系。在建立了線程池後,預設情況下,線程池中并沒有任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法。預設情況下,在建立了線程池後,線程池中的線程數為0,當有任務來之後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;

maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示線上程池中最多能建立多少個線程;

keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。預設情況下,隻有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大于corePoolSize,即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,線上程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;

unit:參數keepAliveTime的時間機關。

workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運作過程産生重大影響,一般來說,這裡的阻塞隊列有以下幾種選擇:

ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue;

threadFactory:線程工廠,主要用來建立線程;

handler:表示當拒絕處理任務時的政策,有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)

ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

下面說明一下ThreadPoolExecutor的繼承關系:

Executor —> ExecutorService —> AbstractExecutorService —> ThreadPoolExecutor

感興趣的同學可以自行翻閱API文檔查閱了解。

三、線程池原理

下面我們要從以下幾個方面講解線程池的原理:

1.線程池狀态

2.任務的執行

3.線程池中的線程初始化

4.任務緩存隊列及排隊政策

5.任務拒絕政策

6.線程池的關閉

7.線程池容量的動态調整

1.線程池狀态

在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀态:

volatile int runState;

static final int RUNNING = 0;

static final int SHUTDOWN = 1;

static final int STOP = 2;

static final int TERMINATED = 3;

runState表示目前線程池的狀态,它是一個volatile變量用來保證線程之間的可見性;

下面的幾個static final變量表示runState可能的幾個取值。

當建立線程池後,初始時,線程池處于RUNNING狀态;

如果調用了shutdown()方法,則線程池處于SHUTDOWN狀态,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;

如果調用了shutdownNow()方法,則線程池處于STOP狀态,此時線程池不能接受新的任務,并且會去嘗試終止正在執行的任務;

當線程池處于SHUTDOWN或STOP狀态,并且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束後,線程池被設定為TERMINATED狀态。

2.任務的執行

在了解任務執行過程之前,我們先看一下ThreadPoolExecutor的類裡面一些重要的成員變量:

private final BlockingQueue workQueue; //任務緩存隊列,用來存放等待執行的任務

private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀态鎖,對線程池狀态(比如線程池大小、runState等)的改變都要使用這個鎖

private final HashSet workers = new HashSet(); //用來存放工作集

private volatile long keepAliveTime; //線程存活時間

private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設定存活時間

private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大于這個參數時,送出的任務會被放進任務緩存隊列)

private volatile int maximumPoolSize; //線程池最大能容忍的線程數

private volatile int poolSize; //線程池中目前的線程數

private volatile RejectedExecutionHandler handler; //任務拒絕政策

private volatile ThreadFactory threadFactory; //線程工廠,用來建立線程

private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數

private long completedTaskCount; //用來記錄已經執行完畢的任務個數

在ThreadPoolExecutor類中,最核心的任務送出方法是execute()方法,雖然通過submit也可以送出任務,但是實際上submit方法裡面最終調用的還是execute()方法,是以隻需要研究execute()方法的實作原理即可:

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

if (runState == RUNNING && workQueue.offer(command)) {

if (runState != RUNNING || poolSize == 0)

ensureQueuedTaskHandled(command);

}

else if (!addIfUnderMaximumPoolSize(command))

reject(command); // is shutdown or saturated

}

}

到這裡,大部分朋友應該對任務送出給線程池之後到被執行的整個過程有了一個基本的了解,下面總結一下:

1)首先,要清楚corePoolSize和maximumPoolSize的含義;

2)其次,要知道Worker是用來起到什麼作用的;

3)要知道任務送出給線程池之後的處理政策,這裡總結一下主要有4點:

如果目前線程池中的線程數目小于corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;

如果目前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試将其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程将其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;

如果目前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕政策進行處理;

如果線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程将被終止,直至線程池中的線程數目不大于corePoolSize;如果允許為核心池中的線程設定存活時間,那麼核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

3. 線程池中的線程初始化

預設情況下,建立線程池之後,線程池中是沒有線程的,需要送出任務之後才會建立線程。

在實際中如果需要線程池建立之後立即建立線程,可以通過以下兩個方法辦到:

prestartCoreThread():初始化一個核心線程;

prestartAllCoreThreads():初始化所有核心線程

下面是這2個方法的實作:

public boolean prestartCoreThread() {

return addIfUnderCorePoolSize(null); //注意傳進去的參數是null

}

public int prestartAllCoreThreads() {

int n = 0;

while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null

++n;

return n;

}

注意上面傳進去的參數是null,根據第2小節的分析可知如果傳進去的參數為null,則最後執行線程會阻塞在getTask方法中的

r = workQueue.take();

即等待任務隊列中有任務。

4.任務緩存隊列及排隊政策

在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

workQueue的類型為BlockingQueue,通常可以取下面三種類型:

1)ArrayBlockingQueue:基于數組的先進先出隊列,此隊列建立時必須指定大小;

2)LinkedBlockingQueue:基于連結清單的先進先出隊列,如果建立時沒有指定此隊列大小,則預設為Integer.MAX_VALUE;

3)synchronousQueue:這個隊列比較特殊,它不會儲存送出的任務,而是将直接建立一個線程來執行新來的任務。

5.任務拒絕政策

當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕政策,通常有以下四種政策:

ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)

ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

6.線程池的關閉

ThreadPoolExecutor提供了兩個方法,用于線程池的關閉,分别是shutdown()和shutdownNow(),其中:

shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完後才終止,但再也不會接受新的任務

shutdownNow():立即終止線程池,并嘗試打斷正在執行的任務,并且清空任務緩存隊列,傳回尚未執行的任務

7.線程池容量的動态調整

ThreadPoolExecutor提供了動态調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

setCorePoolSize:設定核心池大小

setMaximumPoolSize:設定線程池最大能建立的線程數目大小

當上述參數從小變大時,ThreadPoolExecutor進行線程指派,還可能立即建立新的線程來執行任務。

四、 合理配置線程池的大小

一般需要根據任務的類型來配置線程池大小:

如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1

如果是IO密集型任務,參考值可以設定為2*NCPU

當然,這隻是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先将線程池大小設定為參考值,再觀察任務運作情況和系統負載、資源使用率來進行适當調整。

五、參考資料