天天看點

Java自定義線程池詳解

自定義線程池的核心:ThreadPoolExecutor

為了更好的控制多線程,JDK提供了一套線程架構Executor,幫助開發人員有效的進行線程控制,其中在java.util.concurrent包下,是JDK并發包的核心,比如我們熟知的Executors。Executors扮演着線程工廠的角色,我們通過它可以建立特定功能的線程池,而這些線程池背後的就是:ThreadPoolExecutor。那麼下面我們來具體分析下它。

構造ThreadPoolExecutor

<a href="http://s5.51cto.com/wyfs02/M00/8B/5C/wKioL1hKverQ_SbKAABM-6wU-Hk846.png" target="_blank"></a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

<code>public</code> <code>ThreadPoolExecutor(</code><code>int</code> <code>corePoolSize,</code>

<code>                          </code><code>int</code> <code>maximumPoolSize,</code>

<code>                          </code><code>long</code> <code>keepAliveTime,</code>

<code>                          </code><code>TimeUnit unit,</code>

<code>                          </code><code>BlockingQueue&lt;Runnable&gt; workQueue,</code>

<code>                          </code><code>ThreadFactory threadFactory,</code>

<code>                          </code><code>RejectedExecutionHandler handler) {</code>

<code>    </code><code>if</code> <code>(corePoolSize &lt; </code><code>0</code> <code>||</code>

<code>        </code><code>maximumPoolSize &lt;= </code><code>0</code> <code>||</code>

<code>        </code><code>maximumPoolSize &lt; corePoolSize ||</code>

<code>        </code><code>keepAliveTime &lt; </code><code>0</code><code>)</code>

<code>        </code><code>throw</code> <code>new</code> <code>IllegalArgumentException();</code>

<code>    </code><code>if</code> <code>(workQueue == </code><code>null</code> <code>|| threadFactory == </code><code>null</code> <code>|| handler == </code><code>null</code><code>)</code>

<code>        </code><code>throw</code> <code>new</code> <code>NullPointerException();</code>

<code>    </code><code>this</code><code>.corePoolSize = corePoolSize;</code>

<code>    </code><code>this</code><code>.maximumPoolSize = maximumPoolSize;</code>

<code>    </code><code>this</code><code>.workQueue = workQueue;</code>

<code>    </code><code>this</code><code>.keepAliveTime = unit.toNanos(keepAliveTime);</code>

<code>    </code><code>this</code><code>.threadFactory = threadFactory;</code>

<code>    </code><code>this</code><code>.handler = handler;</code>

<code>}</code>

corePoolSize:核心線程數

maximumPoolSize:最大線程數

keepAliveTime + unit:線程回收時間

workQueue:任務較多時暫存到隊列

threadFactory:執行程式建立新線程時使用的工廠

handler:超出線程池容量以及隊列長度後拒絕任務的政策

有界隊列 or 無界隊列

我們知道對于BlockingQueue而言,有典型的有界隊列ArrayBlockingQueue以及LinkedBlockingQueue這種無界隊列,那麼對于線程池而言,workQueue采用有界隊列、無界隊列會産生什麼影響呢?如果采用無界隊列,那麼會存在拒絕政策嗎?顯然不會,因為容量是無限的,比如沒有預定義容量的LinkedBlockingQueue。比如,在某個時段突發很多請求,那麼采用無界隊列就保證了增長的可能性,而不是拒絕。如果采用有界隊列,相比無界隊列而言,有助于防止資源耗盡,在實際中我們經常在拒絕政策中記錄log,然後通過定時任務的方式進行處理。

<a href="http://s1.51cto.com/wyfs02/M02/8B/5C/wKioL1hKxFTxbjC4AAASvvdSX-I647.png" target="_blank"></a>

接着上面的分析思路,想一想采用有界隊列、無界隊列中線程池的大小,這裡涉及到一個問題,那就是任務增長時,是先增長至maximumPoolSize,還是先暫存到隊列中的問題?

corePoolSize and maximumPoolSize

一個是核心線程數,一個是最大線程數,怎麼了解呢?線上程池初始化階段,是否已經初始化好corePoolSize個線程呢?大量任務來了後,線程池中的線程怎麼變化?任務完成處理後,線程池又是如何回收的,回收到什麼程度?

下面先來看一個線程池處理流程圖:

<a href="http://s1.51cto.com/wyfs02/M00/8B/67/wKiom1hM5fezoRJeAAF39773KVs450.png" target="_blank"></a>

簡單一句話:送出任務,線程池中的線程數可以增長至corePoolSize,之後繼續送出任務将暫存至隊列中,如果隊列滿,則看是否能繼續增長線程數至maximumPoolSize,超出後将進行拒絕政策處理。顯然,如果采用無界隊列,那麼maximumPoolSize将失效,線程池中的線程最多就是corePoolSize個線程工作。

從這裡我們也可以看出,在需要判斷是否corePoolSize是否已經達到,需要鎖的介入,是以我們應盡量讓線程啟動後線程池的大小就處于&gt;=corePoolSize個數,提前預熱。

keepAliveTime + unit

這裡涉及到線程池回收線程。簡單來說,就是采用有界隊列,導緻corePoolSize滿,隊列滿,不得已線程池的線程增長至maximumPoolSize,那麼任務處理完畢後,線程池中多出corePoolSize的部分理應回收,那麼等待多長時間,多長時間沒有任務後在進行回收的問題就是由上面的參數決定。

Executors建立線程池執行個體分析

固定線程池

<a href="http://s5.51cto.com/wyfs02/M01/8B/67/wKiom1hM7FTj_AAjAAAeDUwZKE8751.png" target="_blank"></a>

特點:

無界隊列,導緻keepAliveTime/unit/maximumPoolSize失效,不存在拒絕;

随着任務的增長,線程數将固定在corePoolSize。

緩存線程池

<a href="http://s4.51cto.com/wyfs02/M01/8B/63/wKioL1hM7OXii9sCAAAeP9VaDmg179.png" target="_blank"></a>

一個比較特殊的隊列:SynchronousQueue,沒有容量可言,送出任務就意味着一直阻塞等待任務的線程立刻得到任務進行執行。說白了,就是不要暫存到隊列中,任務直接送出給線程進行執行。由于任務無法暫存,是以緩存線程池會根據任務的實際情況,進行線程池的增長,直至maximumPoolSize(Integer.MAX_VALUE)。

注意到keepAliveTime被設定成了60S,意思就是說如果任務來了很多,緩存線程池建立了不少線程來對付它們,任務處理的差不多了,那麼等待60S後,還沒有任務需要處理,那麼進行線程回收處理。

單線程池

<a href="http://s5.51cto.com/wyfs02/M01/8B/67/wKiom1hM7m2yYEPRAAAjO1cNL8w174.png" target="_blank"></a>

無界隊列+核心、最大線程數都是1。

打個簡單比喻,任務來了,不管多少,那麼有序的放着,勞工隻有一個,那就按照順序處理任務就是了。就是一個單線程順序處理任務的情況。

定時線程池

<a href="http://s5.51cto.com/wyfs02/M01/8B/63/wKioL1hM8-7RzPkKAAAU726bDCs945.png" target="_blank"></a>

<a href="http://s4.51cto.com/wyfs02/M02/8B/67/wKiom1hM9AHAB7gcAAAZy9KlW7A808.png" target="_blank"></a>

利用延時隊列DelayedQueue(無界隊列),完成線程池實作定時以及周期性執行任務的需要。

拒絕政策

JDK已經提供了幾種拒絕政策,如下所示:

<a href="http://s4.51cto.com/wyfs02/M02/8B/67/wKiom1hM75jDUdgoAAA4-88kjYU624.png" target="_blank"></a>

我們需要自定義決絕政策時,很簡單,直接實作RejectedExecutionHandler的rejectedExecution方法即可。

本文轉自zfz_linux_boy 51CTO部落格,原文連結:http://blog.51cto.com/zhangfengzhe/1881644,如需轉載請自行聯系原作者