天天看點

Java線程池架構(一)原理和源碼解析

文章中其實說明了外部的使用方式,但是沒有說内部是如何實作的,為了加深對實作的了解,在使用中可以放心,我們這裡将做源碼解析以及回報到原理上,executors工具可以建立普通的線程池以及schedule排程任務的排程池,其實兩者實作上還是有一些差別,但是了解了threadpoolexecutor,在看scheduledthreadpoolexecutor就非常輕松了,後面的文章中也會專門介紹這塊,但是需要先看這篇文章。

使用executors最常用的莫過于是使用:executors.newfixedthreadpool(int)這個方法,因為它既可以限制數量,而且線程用完後不會一直被cache住;那麼就通過它來看看源碼,回過頭來再看其他構造方法的差別:

<code>1</code>

<code>public</code> <code>static</code> <code>executorservice &lt;strong&gt;newfixedthreadpool&lt;/strong&gt;(</code><code>int</code> <code>nthreads) {</code>

<code>2</code>

<code>        </code><code>return</code> <code>new</code> <code>threadpoolexecutor(nthreads, nthreads,</code>

<code>3</code>

<code>                                      </code><code>0l, timeunit.milliseconds,</code>

<code>4</code>

<code>                                      </code><code>new</code> <code>linkedblockingqueue());</code>

<code>5</code>

<code>}</code>

其實你可以自己new一個threadpoolexecutor,來達到自己的參數可控的程度,例如,可以将linkedblockingqueue換成其它的(如:synchronousqueue),隻是可讀性會降低,這裡隻是使用了一種設計模式。

我們現在來看看threadpoolexecutor的源碼是怎麼樣的,也許你剛開始看他的源碼會很痛苦,因為你不知道作者為什麼是這樣設計的,是以本文就我看到的思想會給你做一個介紹,此時也許你通過知道了一些作者的思想,你也許就知道應該該如何去操作了。

這裡來看下構造方法中對那些屬性做了指派:

源碼段1:

<code>01</code>

<code>public</code> <code>threadpoolexecutor(</code><code>int</code> <code>corepoolsize,</code>

<code>02</code>

<code>                           </code><code>int</code> <code>maximumpoolsize,</code>

<code>03</code>

<code>                           </code><code>long</code> <code>keepalivetime,</code>

<code>04</code>

<code>                           </code><code>timeunit unit,</code>

<code>05</code>

<code>                           </code><code>blockingqueue workqueue,</code>

<code>06</code>

<code>                           </code><code>threadfactory threadfactory,</code>

<code>07</code>

<code>                           </code><code>rejectedexecutionhandler handler) {</code>

<code>08</code>

<code>     </code><code>if</code> <code>(corepoolsize &lt;</code><code>0</code> <code>||</code>

<code>09</code>

<code>         </code><code>maximumpoolsize &lt;=</code><code>0</code> <code>||</code>

<code>10</code>

<code>         </code><code>maximumpoolsize &lt; corepoolsize ||</code>

<code>11</code>

<code>         </code><code>keepalivetime &lt;</code><code>0</code><code>)</code>

<code>12</code>

<code>         </code><code>throw</code> <code>new</code> <code>illegalargumentexception();</code>

<code>13</code>

<code>    </code><code>if</code> <code>(workqueue ==</code><code>null</code> <code>|| threadfactory ==</code><code>null</code> <code>|| handler ==</code><code>null</code><code>)</code>

<code>14</code>

<code>          </code><code>throw</code> <code>new</code> <code>nullpointerexception();</code>

<code>15</code>

<code>      </code><code>this</code><code>.corepoolsize = corepoolsize;</code>

<code>16</code>

<code>      </code><code>this</code><code>.maximumpoolsize = maximumpoolsize;</code>

<code>17</code>

<code>      </code><code>this</code><code>.workqueue = workqueue;</code>

<code>18</code>

<code>      </code><code>this</code><code>.keepalivetime = unit.tonanos(keepalivetime);</code>

<code>19</code>

<code>      </code><code>this</code><code>.threadfactory = threadfactory;</code>

<code>20</code>

<code>      </code><code>this</code><code>.handler = handler;</code>

<code>21</code>

<code>  </code><code>}</code>

這裡你可以看到最終指派的過程,可以先大概知道下參數的意思:

corepoolsize:核心運作的poolsize,也就是當超過這個範圍的時候,就需要将新的runnable放入到等待隊列workqueue中了,我們把這些runnable就叫做要去執行的任務吧。

maximumpoolsize:一般你用不到,當大于了這個值就會将任務由一個丢棄處理機制來處理,但是當你發生:newfixedthreadpool的時候,corepoolsize和maximumpoolsize是一樣的,而corepoolsize是先執行的,是以他會先被放入等待隊列,而不會執行到下面的丢棄進行中,看了後面的代碼你就知道了。

workqueue:等待隊列,當達到corepoolsize的時候,就向該等待隊列放入線程資訊(預設為一個linkedblockingqueue),運作中的線程屬性為:workers,為一個hashset;我們的runnable内部被包裝了一層,後面會看到這部分代碼;這個隊列預設是一個無界隊列(你也可以設定一個有界隊列),是以在生産者瘋狂生産的時候,考慮如何控制的問題。

keepalivetime:預設都是0,當線程沒有任務處理後,保持多長時間,當你使用:newcachedthreadpool(),它将是60s的時間。這個參數在運作中的線程從workqueue擷取任務時,當(poolsize &gt;corepoolsize || allowcorethreadtimeout)會用到,當然allowcorethreadtimeout要設定為true,也會先判定keepalivetime是大于0的,不過由于它在corepoolsize上采用了integer.max_value,當遇到系統遇到瞬間沖擊,workers就會迅速膨脹,是以這個地方就不要去設定allowcorethreadtimeout=true,否則結果是這些運作中的線程會持續60s以上;另外,如果corepoolsize的值還沒到integer.max_value,當超過那個值以後,這些運作中的線程,也是

threadfactory:是構造thread的方法,你可以自己去包裝和傳遞,主要實作newthread方法即可;

handler:也就是參數maximumpoolsize達到後丢棄處理的方法,java提供了5種丢棄處理的方法,當然你也可以自己根據實際情況去重寫,主要是要實作接口:rejectedexecutionhandler中的方法: public void rejectedexecution(runnabler, threadpoolexecutor e) java預設的是使用:abortpolicy,他的作用是當出現這中情況的時候會抛出一個異常;

其餘的還包含:

1、callerrunspolicy:如果發現線程池還在運作,就直接運作這個線程

2、discardoldestpolicy:線上程池的等待隊列中,将頭取出一個抛棄,然後将目前線程放進去。

3、discardpolicy:什麼也不做

4、abortpolicy:java預設,抛出一個異常:rejectedexecutionexception。

你可以自己寫一個,例如我們想在這個進行中,既不是完全丢棄,也不是完全啟動,也不是抛異常,而是控制生産者的線程,那麼你就可以嘗試某種方式将生産者的線程blocking住,其實就有點類似提到的semaphor的功能了。

通常你得到線程池後,會調用其中的:submit方法或execute方法去操作;其實你會發現,submit方法最終會調用execute方法來進行操作,隻是他提供了一個future來托管傳回值的處理而已,當你調用需要有傳回值的資訊時,你用它來處理是比較好的;這個future會包裝對callable資訊,并定義一個sync對象(),當你發生讀取傳回值的操作的時候,會通過sync對象進入鎖,直到有傳回值的資料通知,具體細節先不要看太多。

繼續向下,來看看execute最為核心的方法吧: 源碼段2:

<code>public</code> <code>void</code> <code>execute(runnable command) {</code>

<code>     </code><code>if</code> <code>(command ==</code><code>null</code><code>)</code>

<code>         </code><code>throw</code> <code>new</code> <code>nullpointerexception();</code>

<code>     </code><code>if</code> <code>(poolsize &gt;= corepoolsize || !addifundercorepoolsize(command)) {</code>

<code>        </code><code>if</code> <code>(runstate == running &amp;&amp; workqueue.offer(command)) {</code>

<code>            </code><code>if</code> <code>(runstate != running || poolsize ==</code><code>0</code><code>)</code>

<code>                </code><code>ensurequeuedtaskhandled(command);</code>

<code>        </code><code>}</code>

<code>        </code><code>else</code> <code>if</code> <code>(!addifundermaximumpoolsize(command))</code>

<code>            </code><code>reject(command);</code><code>// is shutdown or saturated</code>

<code>    </code><code>}</code>

這段代碼看似簡單,其實有點難懂,很多人也是這裡沒看懂,沒事,我一個if一個if說:

首先第一個判定空操作就不用說了,下面判定的poolsize &gt;= corepoolsize成立時候會進入if的區域,當然它不成立也有可能會進入,他會判定addifundercorepoolsize是否傳回false,如果傳回false就會進去;

我們先來看下addifundercorepoolsize方法的源碼是什麼:

源碼段3:

<code>private</code> <code>boolean</code> <code>addifundercorepoolsize(runnable firsttask) {</code>

<code>    </code><code>thread t =</code><code>null</code><code>;</code>

<code>    </code><code>final</code> <code>reentrantlock mainlock =</code><code>this</code><code>.mainlock;</code>

<code>    </code><code>mainlock.lock();</code>

<code>    </code><code>try</code> <code>{</code>

<code>        </code><code>if</code> <code>(poolsize &lt; corepoolsize &amp;&amp; runstate == running)</code>

<code>            </code><code>t = addthread(firsttask);</code>

<code>    </code><code>}</code><code>finally</code> <code>{</code>

<code>        </code><code>mainlock.unlock();</code>

<code>    </code><code>if</code> <code>(t ==</code><code>null</code><code>)</code>

<code>        </code><code>return</code> <code>false</code><code>;</code>

<code>    </code><code>t.start();</code>

<code>    </code><code>return</code> <code>true</code><code>;</code>

可以發現,這段源碼是如果發現小雨corepoolsize就會建立一個新的線程,并且調用線程的start()方法将線程運作起來:這個addthread()方法,我們先不考慮細節,因為我們還要先看到前面是怎麼進去的,這裡可以發信啊,隻有沒有建立成功thread才會傳回false,也就是當目前的poolsize &gt; corepoolsize的時候,或線程池已經不是在running狀态的時候才會出現;

注意:這裡在外部判定一次poolsize和corepoolsize隻是初步判定,内部是加鎖後判定的,以得到更為準确的結果,而外部初步判定如果是大于了,就沒有必要進入這段有鎖的代碼了。

此時我們知道了,目前線程數量大于corepoolsize的時候,就會進入【代碼段2】的第一個if語句中,回到【源碼段2】,繼續看if語句中的内容:

這裡标記為

源碼段4:

<code>if</code> <code>(runstate == running &amp;&amp; workqueue.offer(command)) {</code>

<code>    </code><code>if</code> <code>(runstate != running || poolsize ==</code><code>0</code><code>)</code>

<code>        </code><code>ensurequeuedtaskhandled(command);</code>

<code>else</code> <code>if</code> <code>(!addifundermaximumpoolsize(command))</code>

<code>6</code>

<code>    </code><code>reject(command);</code><code>// is shutdown or saturated</code>

第一個if,也就是當目前狀态為running的時候,就會去執行workqueue.offer(command),這個workqueue其實就是一個blockingqueue,offer()操作就是在隊列的尾部寫入一個對象,此時寫入的對象為線程的對象而已;是以你可以認為隻有線程池在running狀态,才會在隊列尾部插入資料,否則就執行else if,其實else if可以看出是要做一個是否大于maximumpoolsize的判定,如果大于這個值,就會做reject的操作,關于reject的說明,我們在【源碼段1】的解釋中已經非常明确的說明,這裡可以簡單看下源碼,以應征結果:

源碼段5:

<code>private</code> <code>boolean</code> <code>addifundermaximumpoolsize(runnable firsttask) {</code>

<code>        </code><code>if</code> <code>(poolsize &lt; maximumpoolsize &amp;&amp; runstate == running)                </code><code>//在corepoolsize = maximumpoolsize下,該代碼幾乎不可能運作                 t = addthread(firsttask);         } finally {             mainlock.unlock();         }         if (t == null)             return false;         t.start();         return true; } void reject(runnable command) {         handler.rejectedexecution(command, this);     }</code>

也就是如果線程池滿了,而且線程池調用了shutdown後,還在調用execute方法時,就會抛出上面說明的異常:rejectedexecutionexception 再回頭來看下【代碼段4】中進入到等待隊列後的操作:

<code>if</code> <code>(runstate != running || poolsize ==</code><code>0</code><code>)     ensurequeuedtaskhandled(command);</code>

這段代碼是要線上程池運作狀态不是running或poolsize == 0才會調用,他是幹啥呢? 他為什麼會不等于running呢?外面那一層不是判定了他== running了麼,其實有時間差就是了,如果是poolsize == 0也會執行這段代碼,但是裡面的判定條件是如果不是running,就做reject操作,在第一個線程進去的時候,會将第一個線程直接啟動起來;很多人也是看這段代碼很繞,因為不斷的循環判定類似的判定條件,你主要記住他們之間有時間差,要取最新的就好了。 此時貌似代碼看完了?咦,此時有問題了: 1、 等待中的線程在後來是如何跑起來的呢?線程池是不是有類似timer一樣的守護程序不斷掃描線程隊列和等待隊列?還是利用某種鎖機制,實作類似wait和notify實作的? 2、 線程池的運作隊列和等待隊列是如何管理的呢?這裡還沒看出影子呢! no,no,no! java在實作這部分的時候,使用了怪異的手段,神馬手段呢,還要再看一部分代碼才曉得。 在前面【源碼段3】中,我們看到了一個方法叫:addthread(),也許很少有人會想到關鍵在這裡,其實關鍵就是在這裡: 我們看看addthread()方法到底做了什麼。 源碼段6:

<code>private</code> <code>thread addthread(runnable firsttask) {         worker w =</code><code>new</code> <code>worker(firsttask);         thread t = threadfactory.newthread(w);        </code><code>if</code> <code>(t !=</code><code>null</code><code>) {             w.thread = t;             workers.add(w);            </code><code>int</code> <code>nt = ++poolsize;            </code><code>if</code> <code>(nt &gt; largestpoolsize)</code>

<code>            </code><code>largestpoolsize = nt;</code>

<code>    </code><code>return</code> <code>t;</code>

這裡建立了一個worker,其餘的操作,就是将poolsize++的操作,然後将将其放入workers的運作的hashset中等操作;

我們主要關心worker是幹什麼的,因為這個threadfactory對我們用途不大,隻是做了thread的命名處理;而worker你會發現它的定義也是一個runnable,外部開始在代碼段中發現了調用哪個這個worker的start()方法,也就是線程的啟動方法,其實也就是調用了worker的run()方法,那麼我們重點要關心run方法是如何處理的

源碼段7:

<code>public</code> <code>void</code> <code>run() {</code>

<code>     </code><code>try</code> <code>{</code>

<code>         </code><code>runnable task = firsttask;</code>

<code>         </code><code>firsttask =</code><code>null</code><code>;</code>

<code>         </code><code>while</code> <code>(task !=</code><code>null</code> <code>|| (task = gettask()) !=</code><code>null</code><code>) {</code>

<code>             </code><code>runtask(task);</code>

<code>             </code><code>task =</code><code>null</code><code>;</code>

<code>         </code><code>}</code>

<code>     </code><code>}</code><code>finally</code> <code>{</code>

<code>         </code><code>workerdone(</code><code>this</code><code>);</code>

<code>     </code><code>}</code>

<code> </code><code>}</code>

firsttask其實就是開始在建立work的時候,由外部傳入的runnable對象,也就是你自己的thread,你會發現它如果發現task為空,就會調用gettask()方法再判定,直到兩者為空,并且是一個while循環體。

那麼看看gettask()方法的實作為:

源碼段8:

<code>runnable gettask() {</code>

<code>    </code><code>for</code> <code>(;;) {</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>int</code> <code>state = runstate;</code>

<code>            </code><code>if</code> <code>(state &gt; shutdown)</code>

<code>                </code><code>return</code> <code>null</code><code>;</code>

<code>            </code><code>runnable r;</code>

<code>            </code><code>if</code> <code>(state == shutdown) </code><code>// help drain queue</code>

<code>                </code><code>r = workqueue.poll();</code>

<code>            </code><code>else</code> <code>if</code> <code>(poolsize &gt; corepoolsize || allowcorethreadtimeout)</code>

<code>                </code><code>r = workqueue.poll(keepalivetime, timeunit.nanoseconds);</code>

<code>            </code><code>else</code>

<code>                </code><code>r = workqueue.take();</code>

<code>            </code><code>if</code> <code>(r !=</code><code>null</code><code>)</code>

<code>                </code><code>return</code> <code>r;</code>

<code>            </code><code>if</code> <code>(workercanexit()) {</code>

<code>                </code><code>if</code> <code>(runstate &gt;= shutdown)</code><code>// wake up others</code>

<code>                    </code><code>interruptidleworkers();</code>

<code>            </code><code>}</code>

<code>            </code><code>// else retry</code>

<code>22</code>

<code>        </code><code>}</code><code>catch</code> <code>(interruptedexception ie) {</code>

<code>23</code>

<code>            </code><code>// on interruption, re-check runstate</code>

<code>24</code>

<code>25</code>

<code>26</code>

你會發現它是從workqueue隊列中,也就是等待隊列中擷取一個元素出來并傳回!

回過頭來根據代碼段6了解下:

目前線程運作完後,在到workqueue中去擷取一個task出來,繼續運作,這樣就保證了線程池中有一定的線程一直在運作;此時若跳出了while循環,隻有workqueue隊列為空才會出現或出現了類似于shutdown的操作,自然運作隊列會減少1,當再有新的線程進來的時候,就又開始向worker裡面放資料了,這樣以此類推,實作了線程池的功能。

這裡可以看下run方法的finally中調用的workerdone方法為:

源碼段9:

<code>void</code> <code>workerdone(worker w) {</code>

<code>        </code><code>completedtaskcount += w.completedtasks;</code>

<code>        </code><code>workers.remove(w);</code>

<code>        </code><code>if</code> <code>(--poolsize ==</code><code>0</code><code>)</code>

<code>            </code><code>tryterminate();</code>

注意這裡将workers.remove(w)掉,并且調用了—poolsize來做操作。

至于tryterminate是做了更多關于回收方面的操作。

最後我們還要看一段代碼就是在【源碼段6】中出現的代碼調用為:runtask(task);這個方法也是運作的關鍵。

源碼段10:

<code>private</code> <code>void</code> <code>runtask(runnable task) {</code>

<code>       </code><code>final</code> <code>reentrantlock runlock =</code><code>this</code><code>.runlock;</code>

<code>       </code><code>runlock.lock();</code>

<code>       </code><code>try</code> <code>{</code>

<code>           </code><code>if</code> <code>(runstate &lt; stop &amp;&amp;                     thread.interrupted() &amp;&amp;                     runstate &gt;= stop)</code>

<code>               </code><code>thread.interrupt();</code>

<code>           </code><code>boolean</code> <code>ran =</code><code>false</code><code>;</code>

<code>           </code><code>beforeexecute(thread, task);</code>

<code>           </code><code>try</code> <code>{</code>

<code>               </code><code>task.run();</code>

<code>               </code><code>ran =</code><code>true</code><code>;</code>

<code>               </code><code>afterexecute(task,</code><code>null</code><code>);</code>

<code>               </code><code>++completedtasks;</code>

<code>           </code><code>}</code><code>catch</code> <code>(runtimeexception ex) {</code>

<code>               </code><code>if</code> <code>(!ran)</code>

<code>                   </code><code>afterexecute(task, ex);</code>

<code>               </code><code>throw</code> <code>ex;</code>

<code>           </code><code>}</code>

<code>       </code><code>}</code><code>finally</code> <code>{</code>

<code>           </code><code>runlock.unlock();</code>

<code>       </code><code>}</code>

<code>   </code><code>}</code>

你可以看到,這裡面的task為傳入的task資訊,調用的不是start方法,而是run方法,因為run方法直接調用不會啟動新的線程,也是因為這樣,導緻了你無法擷取到你自己的線程的狀态,因為線程池是直接調用的run方法,而不是start方法來運作。

這裡有個beforeexecute和afterexecute方法,分别代表在執行前和執行後,你可以做一段操作,在這個類中,這兩個方法都是【空body】的,因為普通線程池無需做更多的操作。

如果你要實作類似暫停等待通知的或其他的操作,可以自己extends後進行重寫構造;

本文沒有介紹關于scheduledthreadpoolexecutor調用的細節,下一篇文章會詳細說明,因為大部分代碼和本文一緻,差別在于一些細節,在介紹:scheduledthreadpoolexecutor的時候,會明确的介紹它與timer和timertask的巨大差別,差別不在于使用,而是在于本身内在的處理細節。