文章中其实说明了外部的使用方式,但是没有说内部是如何实现的,为了加深对实现的理解,在使用中可以放心,我们这里将做源码解析以及反馈到原理上,executors工具可以创建普通的线程池以及schedule调度任务的调度池,其实两者实现上还是有一些区别,但是理解了threadpoolexecutor,在看scheduledthreadpoolexecutor就非常轻松了,后面的文章中也会专门介绍这块,但是需要先看这篇文章。
使用executors最常用的莫过于是使用:executors.newfixedthreadpool(int)这个方法,因为它既可以限制数量,而且线程用完后不会一直被cache住;那么就通过它来看看源码,回过头来再看其他构造方法的区别:
<code>1</code>
<code>public</code> <code>static</code> <code>executorservice <strong>newfixedthreadpool</strong>(</code><code>int</code> <code>nthreads) {</code>
<code>2</code>
<code> </code><code>return</code> <code>new</code> <code>threadpoolexecutor(nthreads, nthreads,</code>
<code>3</code>
<code> </code><code>0l, timeunit.milliseconds,</code>
<code>4</code>
<code> </code><code>new</code> <code>linkedblockingqueue());</code>
<code>5</code>
<code>}</code>
其实你可以自己new一个threadpoolexecutor,来达到自己的参数可控的程度,例如,可以将linkedblockingqueue换成其它的(如:synchronousqueue),只是可读性会降低,这里只是使用了一种设计模式。
我们现在来看看threadpoolexecutor的源码是怎么样的,也许你刚开始看他的源码会很痛苦,因为你不知道作者为什么是这样设计的,所以本文就我看到的思想会给你做一个介绍,此时也许你通过知道了一些作者的思想,你也许就知道应该该如何去操作了。
这里来看下构造方法中对那些属性做了赋值:
源码段1:
<code>01</code>
<code>public</code> <code>threadpoolexecutor(</code><code>int</code> <code>corepoolsize,</code>
<code>02</code>
<code> </code><code>int</code> <code>maximumpoolsize,</code>
<code>03</code>
<code> </code><code>long</code> <code>keepalivetime,</code>
<code>04</code>
<code> </code><code>timeunit unit,</code>
<code>05</code>
<code> </code><code>blockingqueue workqueue,</code>
<code>06</code>
<code> </code><code>threadfactory threadfactory,</code>
<code>07</code>
<code> </code><code>rejectedexecutionhandler handler) {</code>
<code>08</code>
<code> </code><code>if</code> <code>(corepoolsize <</code><code>0</code> <code>||</code>
<code>09</code>
<code> </code><code>maximumpoolsize <=</code><code>0</code> <code>||</code>
<code>10</code>
<code> </code><code>maximumpoolsize < corepoolsize ||</code>
<code>11</code>
<code> </code><code>keepalivetime <</code><code>0</code><code>)</code>
<code>12</code>
<code> </code><code>throw</code> <code>new</code> <code>illegalargumentexception();</code>
<code>13</code>
<code> </code><code>if</code> <code>(workqueue ==</code><code>null</code> <code>|| threadfactory ==</code><code>null</code> <code>|| handler ==</code><code>null</code><code>)</code>
<code>14</code>
<code> </code><code>throw</code> <code>new</code> <code>nullpointerexception();</code>
<code>15</code>
<code> </code><code>this</code><code>.corepoolsize = corepoolsize;</code>
<code>16</code>
<code> </code><code>this</code><code>.maximumpoolsize = maximumpoolsize;</code>
<code>17</code>
<code> </code><code>this</code><code>.workqueue = workqueue;</code>
<code>18</code>
<code> </code><code>this</code><code>.keepalivetime = unit.tonanos(keepalivetime);</code>
<code>19</code>
<code> </code><code>this</code><code>.threadfactory = threadfactory;</code>
<code>20</code>
<code> </code><code>this</code><code>.handler = handler;</code>
<code>21</code>
<code> </code><code>}</code>
这里你可以看到最终赋值的过程,可以先大概知道下参数的意思:
corepoolsize:核心运行的poolsize,也就是当超过这个范围的时候,就需要将新的runnable放入到等待队列workqueue中了,我们把这些runnable就叫做要去执行的任务吧。
maximumpoolsize:一般你用不到,当大于了这个值就会将任务由一个丢弃处理机制来处理,但是当你发生:newfixedthreadpool的时候,corepoolsize和maximumpoolsize是一样的,而corepoolsize是先执行的,所以他会先被放入等待队列,而不会执行到下面的丢弃处理中,看了后面的代码你就知道了。
workqueue:等待队列,当达到corepoolsize的时候,就向该等待队列放入线程信息(默认为一个linkedblockingqueue),运行中的线程属性为:workers,为一个hashset;我们的runnable内部被包装了一层,后面会看到这部分代码;这个队列默认是一个无界队列(你也可以设定一个有界队列),所以在生产者疯狂生产的时候,考虑如何控制的问题。
keepalivetime:默认都是0,当线程没有任务处理后,保持多长时间,当你使用:newcachedthreadpool(),它将是60s的时间。这个参数在运行中的线程从workqueue获取任务时,当(poolsize >corepoolsize || allowcorethreadtimeout)会用到,当然allowcorethreadtimeout要设置为true,也会先判定keepalivetime是大于0的,不过由于它在corepoolsize上采用了integer.max_value,当遇到系统遇到瞬间冲击,workers就会迅速膨胀,所以这个地方就不要去设置allowcorethreadtimeout=true,否则结果是这些运行中的线程会持续60s以上;另外,如果corepoolsize的值还没到integer.max_value,当超过那个值以后,这些运行中的线程,也是
threadfactory:是构造thread的方法,你可以自己去包装和传递,主要实现newthread方法即可;
handler:也就是参数maximumpoolsize达到后丢弃处理的方法,java提供了5种丢弃处理的方法,当然你也可以自己根据实际情况去重写,主要是要实现接口:rejectedexecutionhandler中的方法: public void rejectedexecution(runnabler, threadpoolexecutor e) java默认的是使用:abortpolicy,他的作用是当出现这中情况的时候会抛出一个异常;
其余的还包含:
1、callerrunspolicy:如果发现线程池还在运行,就直接运行这个线程
2、discardoldestpolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。
3、discardpolicy:什么也不做
4、abortpolicy:java默认,抛出一个异常:rejectedexecutionexception。
你可以自己写一个,例如我们想在这个处理中,既不是完全丢弃,也不是完全启动,也不是抛异常,而是控制生产者的线程,那么你就可以尝试某种方式将生产者的线程blocking住,其实就有点类似提到的semaphor的功能了。
通常你得到线程池后,会调用其中的:submit方法或execute方法去操作;其实你会发现,submit方法最终会调用execute方法来进行操作,只是他提供了一个future来托管返回值的处理而已,当你调用需要有返回值的信息时,你用它来处理是比较好的;这个future会包装对callable信息,并定义一个sync对象(),当你发生读取返回值的操作的时候,会通过sync对象进入锁,直到有返回值的数据通知,具体细节先不要看太多。
继续向下,来看看execute最为核心的方法吧: 源码段2:
<code>public</code> <code>void</code> <code>execute(runnable command) {</code>
<code> </code><code>if</code> <code>(command ==</code><code>null</code><code>)</code>
<code> </code><code>throw</code> <code>new</code> <code>nullpointerexception();</code>
<code> </code><code>if</code> <code>(poolsize >= corepoolsize || !addifundercorepoolsize(command)) {</code>
<code> </code><code>if</code> <code>(runstate == running && workqueue.offer(command)) {</code>
<code> </code><code>if</code> <code>(runstate != running || poolsize ==</code><code>0</code><code>)</code>
<code> </code><code>ensurequeuedtaskhandled(command);</code>
<code> </code><code>}</code>
<code> </code><code>else</code> <code>if</code> <code>(!addifundermaximumpoolsize(command))</code>
<code> </code><code>reject(command);</code><code>// is shutdown or saturated</code>
<code> </code><code>}</code>
这段代码看似简单,其实有点难懂,很多人也是这里没看懂,没事,我一个if一个if说:
首先第一个判定空操作就不用说了,下面判定的poolsize >= corepoolsize成立时候会进入if的区域,当然它不成立也有可能会进入,他会判定addifundercorepoolsize是否返回false,如果返回false就会进去;
我们先来看下addifundercorepoolsize方法的源码是什么:
源码段3:
<code>private</code> <code>boolean</code> <code>addifundercorepoolsize(runnable firsttask) {</code>
<code> </code><code>thread t =</code><code>null</code><code>;</code>
<code> </code><code>final</code> <code>reentrantlock mainlock =</code><code>this</code><code>.mainlock;</code>
<code> </code><code>mainlock.lock();</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>if</code> <code>(poolsize < corepoolsize && runstate == running)</code>
<code> </code><code>t = addthread(firsttask);</code>
<code> </code><code>}</code><code>finally</code> <code>{</code>
<code> </code><code>mainlock.unlock();</code>
<code> </code><code>if</code> <code>(t ==</code><code>null</code><code>)</code>
<code> </code><code>return</code> <code>false</code><code>;</code>
<code> </code><code>t.start();</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
可以发现,这段源码是如果发现小雨corepoolsize就会创建一个新的线程,并且调用线程的start()方法将线程运行起来:这个addthread()方法,我们先不考虑细节,因为我们还要先看到前面是怎么进去的,这里可以发信啊,只有没有创建成功thread才会返回false,也就是当当前的poolsize > corepoolsize的时候,或线程池已经不是在running状态的时候才会出现;
注意:这里在外部判定一次poolsize和corepoolsize只是初步判定,内部是加锁后判定的,以得到更为准确的结果,而外部初步判定如果是大于了,就没有必要进入这段有锁的代码了。
此时我们知道了,当前线程数量大于corepoolsize的时候,就会进入【代码段2】的第一个if语句中,回到【源码段2】,继续看if语句中的内容:
这里标记为
源码段4:
<code>if</code> <code>(runstate == running && workqueue.offer(command)) {</code>
<code> </code><code>if</code> <code>(runstate != running || poolsize ==</code><code>0</code><code>)</code>
<code> </code><code>ensurequeuedtaskhandled(command);</code>
<code>else</code> <code>if</code> <code>(!addifundermaximumpoolsize(command))</code>
<code>6</code>
<code> </code><code>reject(command);</code><code>// is shutdown or saturated</code>
第一个if,也就是当当前状态为running的时候,就会去执行workqueue.offer(command),这个workqueue其实就是一个blockingqueue,offer()操作就是在队列的尾部写入一个对象,此时写入的对象为线程的对象而已;所以你可以认为只有线程池在running状态,才会在队列尾部插入数据,否则就执行else if,其实else if可以看出是要做一个是否大于maximumpoolsize的判定,如果大于这个值,就会做reject的操作,关于reject的说明,我们在【源码段1】的解释中已经非常明确的说明,这里可以简单看下源码,以应征结果:
源码段5:
<code>private</code> <code>boolean</code> <code>addifundermaximumpoolsize(runnable firsttask) {</code>
<code> </code><code>if</code> <code>(poolsize < maximumpoolsize && runstate == running) </code><code>//在corepoolsize = maximumpoolsize下,该代码几乎不可能运行 t = addthread(firsttask); } finally { mainlock.unlock(); } if (t == null) return false; t.start(); return true; } void reject(runnable command) { handler.rejectedexecution(command, this); }</code>
也就是如果线程池满了,而且线程池调用了shutdown后,还在调用execute方法时,就会抛出上面说明的异常:rejectedexecutionexception 再回头来看下【代码段4】中进入到等待队列后的操作:
<code>if</code> <code>(runstate != running || poolsize ==</code><code>0</code><code>) ensurequeuedtaskhandled(command);</code>
这段代码是要在线程池运行状态不是running或poolsize == 0才会调用,他是干啥呢? 他为什么会不等于running呢?外面那一层不是判定了他== running了么,其实有时间差就是了,如果是poolsize == 0也会执行这段代码,但是里面的判定条件是如果不是running,就做reject操作,在第一个线程进去的时候,会将第一个线程直接启动起来;很多人也是看这段代码很绕,因为不断的循环判定类似的判定条件,你主要记住他们之间有时间差,要取最新的就好了。 此时貌似代码看完了?咦,此时有问题了: 1、 等待中的线程在后来是如何跑起来的呢?线程池是不是有类似timer一样的守护进程不断扫描线程队列和等待队列?还是利用某种锁机制,实现类似wait和notify实现的? 2、 线程池的运行队列和等待队列是如何管理的呢?这里还没看出影子呢! no,no,no! java在实现这部分的时候,使用了怪异的手段,神马手段呢,还要再看一部分代码才晓得。 在前面【源码段3】中,我们看到了一个方法叫:addthread(),也许很少有人会想到关键在这里,其实关键就是在这里: 我们看看addthread()方法到底做了什么。 源码段6:
<code>private</code> <code>thread addthread(runnable firsttask) { worker w =</code><code>new</code> <code>worker(firsttask); thread t = threadfactory.newthread(w); </code><code>if</code> <code>(t !=</code><code>null</code><code>) { w.thread = t; workers.add(w); </code><code>int</code> <code>nt = ++poolsize; </code><code>if</code> <code>(nt > largestpoolsize)</code>
<code> </code><code>largestpoolsize = nt;</code>
<code> </code><code>return</code> <code>t;</code>
这里创建了一个worker,其余的操作,就是将poolsize++的操作,然后将将其放入workers的运行的hashset中等操作;
我们主要关心worker是干什么的,因为这个threadfactory对我们用途不大,只是做了thread的命名处理;而worker你会发现它的定义也是一个runnable,外部开始在代码段中发现了调用哪个这个worker的start()方法,也就是线程的启动方法,其实也就是调用了worker的run()方法,那么我们重点要关心run方法是如何处理的
源码段7:
<code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>runnable task = firsttask;</code>
<code> </code><code>firsttask =</code><code>null</code><code>;</code>
<code> </code><code>while</code> <code>(task !=</code><code>null</code> <code>|| (task = gettask()) !=</code><code>null</code><code>) {</code>
<code> </code><code>runtask(task);</code>
<code> </code><code>task =</code><code>null</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>}</code><code>finally</code> <code>{</code>
<code> </code><code>workerdone(</code><code>this</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
firsttask其实就是开始在创建work的时候,由外部传入的runnable对象,也就是你自己的thread,你会发现它如果发现task为空,就会调用gettask()方法再判定,直到两者为空,并且是一个while循环体。
那么看看gettask()方法的实现为:
源码段8:
<code>runnable gettask() {</code>
<code> </code><code>for</code> <code>(;;) {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>int</code> <code>state = runstate;</code>
<code> </code><code>if</code> <code>(state > shutdown)</code>
<code> </code><code>return</code> <code>null</code><code>;</code>
<code> </code><code>runnable r;</code>
<code> </code><code>if</code> <code>(state == shutdown) </code><code>// help drain queue</code>
<code> </code><code>r = workqueue.poll();</code>
<code> </code><code>else</code> <code>if</code> <code>(poolsize > corepoolsize || allowcorethreadtimeout)</code>
<code> </code><code>r = workqueue.poll(keepalivetime, timeunit.nanoseconds);</code>
<code> </code><code>else</code>
<code> </code><code>r = workqueue.take();</code>
<code> </code><code>if</code> <code>(r !=</code><code>null</code><code>)</code>
<code> </code><code>return</code> <code>r;</code>
<code> </code><code>if</code> <code>(workercanexit()) {</code>
<code> </code><code>if</code> <code>(runstate >= shutdown)</code><code>// wake up others</code>
<code> </code><code>interruptidleworkers();</code>
<code> </code><code>}</code>
<code> </code><code>// else retry</code>
<code>22</code>
<code> </code><code>}</code><code>catch</code> <code>(interruptedexception ie) {</code>
<code>23</code>
<code> </code><code>// on interruption, re-check runstate</code>
<code>24</code>
<code>25</code>
<code>26</code>
你会发现它是从workqueue队列中,也就是等待队列中获取一个元素出来并返回!
回过头来根据代码段6理解下:
当前线程运行完后,在到workqueue中去获取一个task出来,继续运行,这样就保证了线程池中有一定的线程一直在运行;此时若跳出了while循环,只有workqueue队列为空才会出现或出现了类似于shutdown的操作,自然运行队列会减少1,当再有新的线程进来的时候,就又开始向worker里面放数据了,这样以此类推,实现了线程池的功能。
这里可以看下run方法的finally中调用的workerdone方法为:
源码段9:
<code>void</code> <code>workerdone(worker w) {</code>
<code> </code><code>completedtaskcount += w.completedtasks;</code>
<code> </code><code>workers.remove(w);</code>
<code> </code><code>if</code> <code>(--poolsize ==</code><code>0</code><code>)</code>
<code> </code><code>tryterminate();</code>
注意这里将workers.remove(w)掉,并且调用了—poolsize来做操作。
至于tryterminate是做了更多关于回收方面的操作。
最后我们还要看一段代码就是在【源码段6】中出现的代码调用为:runtask(task);这个方法也是运行的关键。
源码段10:
<code>private</code> <code>void</code> <code>runtask(runnable task) {</code>
<code> </code><code>final</code> <code>reentrantlock runlock =</code><code>this</code><code>.runlock;</code>
<code> </code><code>runlock.lock();</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>if</code> <code>(runstate < stop && thread.interrupted() && runstate >= stop)</code>
<code> </code><code>thread.interrupt();</code>
<code> </code><code>boolean</code> <code>ran =</code><code>false</code><code>;</code>
<code> </code><code>beforeexecute(thread, task);</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>task.run();</code>
<code> </code><code>ran =</code><code>true</code><code>;</code>
<code> </code><code>afterexecute(task,</code><code>null</code><code>);</code>
<code> </code><code>++completedtasks;</code>
<code> </code><code>}</code><code>catch</code> <code>(runtimeexception ex) {</code>
<code> </code><code>if</code> <code>(!ran)</code>
<code> </code><code>afterexecute(task, ex);</code>
<code> </code><code>throw</code> <code>ex;</code>
<code> </code><code>}</code>
<code> </code><code>}</code><code>finally</code> <code>{</code>
<code> </code><code>runlock.unlock();</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
你可以看到,这里面的task为传入的task信息,调用的不是start方法,而是run方法,因为run方法直接调用不会启动新的线程,也是因为这样,导致了你无法获取到你自己的线程的状态,因为线程池是直接调用的run方法,而不是start方法来运行。
这里有个beforeexecute和afterexecute方法,分别代表在执行前和执行后,你可以做一段操作,在这个类中,这两个方法都是【空body】的,因为普通线程池无需做更多的操作。
如果你要实现类似暂停等待通知的或其他的操作,可以自己extends后进行重写构造;
本文没有介绍关于scheduledthreadpoolexecutor调用的细节,下一篇文章会详细说明,因为大部分代码和本文一致,区别在于一些细节,在介绍:scheduledthreadpoolexecutor的时候,会明确的介绍它与timer和timertask的巨大区别,区别不在于使用,而是在于本身内在的处理细节。