天天看点

Java自定义线程池详解

自定义线程池的核心:ThreadPoolExecutor

为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效的进行线程控制,其中在java.util.concurrent包下,是JDK并发包的核心,比如我们熟知的Executors。Executors扮演着线程工厂的角色,我们通过它可以创建特定功能的线程池,而这些线程池背后的就是:ThreadPoolExecutor。那么下面我们来具体分析下它。

构造ThreadPoolExecutor

<a href="http://s5.51cto.com/wyfs02/M00/8B/5C/wKioL1hKverQ_SbKAABM-6wU-Hk846.png" target="_blank"></a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

<code>public</code> <code>ThreadPoolExecutor(</code><code>int</code> <code>corePoolSize,</code>

<code>                          </code><code>int</code> <code>maximumPoolSize,</code>

<code>                          </code><code>long</code> <code>keepAliveTime,</code>

<code>                          </code><code>TimeUnit unit,</code>

<code>                          </code><code>BlockingQueue&lt;Runnable&gt; workQueue,</code>

<code>                          </code><code>ThreadFactory threadFactory,</code>

<code>                          </code><code>RejectedExecutionHandler handler) {</code>

<code>    </code><code>if</code> <code>(corePoolSize &lt; </code><code>0</code> <code>||</code>

<code>        </code><code>maximumPoolSize &lt;= </code><code>0</code> <code>||</code>

<code>        </code><code>maximumPoolSize &lt; corePoolSize ||</code>

<code>        </code><code>keepAliveTime &lt; </code><code>0</code><code>)</code>

<code>        </code><code>throw</code> <code>new</code> <code>IllegalArgumentException();</code>

<code>    </code><code>if</code> <code>(workQueue == </code><code>null</code> <code>|| threadFactory == </code><code>null</code> <code>|| handler == </code><code>null</code><code>)</code>

<code>        </code><code>throw</code> <code>new</code> <code>NullPointerException();</code>

<code>    </code><code>this</code><code>.corePoolSize = corePoolSize;</code>

<code>    </code><code>this</code><code>.maximumPoolSize = maximumPoolSize;</code>

<code>    </code><code>this</code><code>.workQueue = workQueue;</code>

<code>    </code><code>this</code><code>.keepAliveTime = unit.toNanos(keepAliveTime);</code>

<code>    </code><code>this</code><code>.threadFactory = threadFactory;</code>

<code>    </code><code>this</code><code>.handler = handler;</code>

<code>}</code>

corePoolSize:核心线程数

maximumPoolSize:最大线程数

keepAliveTime + unit:线程回收时间

workQueue:任务较多时暂存到队列

threadFactory:执行程序创建新线程时使用的工厂

handler:超出线程池容量以及队列长度后拒绝任务的策略

有界队列 or 无界队列

我们知道对于BlockingQueue而言,有典型的有界队列ArrayBlockingQueue以及LinkedBlockingQueue这种无界队列,那么对于线程池而言,workQueue采用有界队列、无界队列会产生什么影响呢?如果采用无界队列,那么会存在拒绝策略吗?显然不会,因为容量是无限的,比如没有预定义容量的LinkedBlockingQueue。比如,在某个时段突发很多请求,那么采用无界队列就保证了增长的可能性,而不是拒绝。如果采用有界队列,相比无界队列而言,有助于防止资源耗尽,在实际中我们经常在拒绝策略中记录log,然后通过定时任务的方式进行处理。

<a href="http://s1.51cto.com/wyfs02/M02/8B/5C/wKioL1hKxFTxbjC4AAASvvdSX-I647.png" target="_blank"></a>

接着上面的分析思路,想一想采用有界队列、无界队列中线程池的大小,这里涉及到一个问题,那就是任务增长时,是先增长至maximumPoolSize,还是先暂存到队列中的问题?

corePoolSize and maximumPoolSize

一个是核心线程数,一个是最大线程数,怎么理解呢?在线程池初始化阶段,是否已经初始化好corePoolSize个线程呢?大量任务来了后,线程池中的线程怎么变化?任务完成处理后,线程池又是如何回收的,回收到什么程度?

下面先来看一个线程池处理流程图:

<a href="http://s1.51cto.com/wyfs02/M00/8B/67/wKiom1hM5fezoRJeAAF39773KVs450.png" target="_blank"></a>

简单一句话:提交任务,线程池中的线程数可以增长至corePoolSize,之后继续提交任务将暂存至队列中,如果队列满,则看是否能继续增长线程数至maximumPoolSize,超出后将进行拒绝策略处理。显然,如果采用无界队列,那么maximumPoolSize将失效,线程池中的线程最多就是corePoolSize个线程工作。

从这里我们也可以看出,在需要判断是否corePoolSize是否已经达到,需要锁的介入,因此我们应尽量让线程启动后线程池的大小就处于&gt;=corePoolSize个数,提前预热。

keepAliveTime + unit

这里涉及到线程池回收线程。简单来说,就是采用有界队列,导致corePoolSize满,队列满,不得已线程池的线程增长至maximumPoolSize,那么任务处理完毕后,线程池中多出corePoolSize的部分理应回收,那么等待多长时间,多长时间没有任务后在进行回收的问题就是由上面的参数决定。

Executors创建线程池实例分析

固定线程池

<a href="http://s5.51cto.com/wyfs02/M01/8B/67/wKiom1hM7FTj_AAjAAAeDUwZKE8751.png" target="_blank"></a>

特点:

无界队列,导致keepAliveTime/unit/maximumPoolSize失效,不存在拒绝;

随着任务的增长,线程数将固定在corePoolSize。

缓存线程池

<a href="http://s4.51cto.com/wyfs02/M01/8B/63/wKioL1hM7OXii9sCAAAeP9VaDmg179.png" target="_blank"></a>

一个比较特殊的队列:SynchronousQueue,没有容量可言,提交任务就意味着一直阻塞等待任务的线程立刻得到任务进行执行。说白了,就是不要暂存到队列中,任务直接提交给线程进行执行。由于任务无法暂存,因此缓存线程池会根据任务的实际情况,进行线程池的增长,直至maximumPoolSize(Integer.MAX_VALUE)。

注意到keepAliveTime被设置成了60S,意思就是说如果任务来了很多,缓存线程池创建了不少线程来对付它们,任务处理的差不多了,那么等待60S后,还没有任务需要处理,那么进行线程回收处理。

单线程池

<a href="http://s5.51cto.com/wyfs02/M01/8B/67/wKiom1hM7m2yYEPRAAAjO1cNL8w174.png" target="_blank"></a>

无界队列+核心、最大线程数都是1。

打个简单比喻,任务来了,不管多少,那么有序的放着,工人只有一个,那就按照顺序处理任务就是了。就是一个单线程顺序处理任务的情况。

定时线程池

<a href="http://s5.51cto.com/wyfs02/M01/8B/63/wKioL1hM8-7RzPkKAAAU726bDCs945.png" target="_blank"></a>

<a href="http://s4.51cto.com/wyfs02/M02/8B/67/wKiom1hM9AHAB7gcAAAZy9KlW7A808.png" target="_blank"></a>

利用延时队列DelayedQueue(无界队列),完成线程池实现定时以及周期性执行任务的需要。

拒绝策略

JDK已经提供了几种拒绝策略,如下所示:

<a href="http://s4.51cto.com/wyfs02/M02/8B/67/wKiom1hM75jDUdgoAAA4-88kjYU624.png" target="_blank"></a>

我们需要自定义决绝策略时,很简单,直接实现RejectedExecutionHandler的rejectedExecution方法即可。

本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1881644,如需转载请自行联系原作者