天天看點

Java線程池架構(二)多線程排程器

我們如果要用java預設的線程池來做排程器,一種選擇就是timer和timertask的結合,在以前的文章:《timer與timertask的真正原理&使用介紹》中有明确的說明:一個timer為一個單獨的線程,雖然一個timer可以排程多個timertask,但是對于一個timer來講是串行的,至于細節請參看對應的那篇文章的内容,本文介紹的多線程排程器,也就是定時任務,基于多線程排程完成,當然你可以為了完成多線程使用多個timer,隻是這些timer的管理需要你來完成,不是一個架構體系,而schedulethreadpoolexecutor提供了這個功能,是以我們第一要搞清楚是如何使用排程器的,其次是需要知道它的内部原理是什麼,也就是知其然,再知其是以然!

首先如果我們要建立一個基于java本身的排程池通常的方法是:

executors.newscheduledthreadpool(int);

當有重載方法,我們最常用的是這個就從這個,看下定義:

<code>1</code>

<code>public</code> <code>static</code> <code>scheduledexecutorservice newscheduledthreadpool(</code><code>int</code> <code>corepoolsize) {</code>

<code>2</code>

<code>    </code><code>return</code> <code>new</code> <code>scheduledthreadpoolexecutor(corepoolsize);</code>

<code>3</code>

<code>}</code>

其實内部是new了一個執行個體化對象出來,并傳入大小,此時就跟蹤到scheduledthreadpoolexecutor的構造方法中:

<code>public</code> <code>scheduledthreadpoolexecutor(</code><code>int</code> <code>corepoolsize) {</code>

<code>        </code><code>super</code><code>(corepoolsize, integer.max_value,</code><code>0</code><code>,timeunit.nanoseconds,</code>

<code>              </code><code>new</code> <code>delayedworkqueue());</code>

<code>4</code>

<code>5</code>

你會發現調用了super,而super你跟蹤進去會發現,是threadpoolexecutor中,那麼scheduledthreadpoolexecutor和threadpoolexecutor有何差別,就是本文要說得重點了,首先我們留下個引子,你發現在定義隊列的時候,不再是上文中提到的linkedblockingqueue,而是delayedworkqueue,那麼細節上我們接下來就是要講解的重點,既然他們又繼承關系,其實搞懂了不同點,就搞懂了共同點,而且有這樣的關系大多數應當是共同點,不同點的猜測:這個是要實作任務排程,任務排程不是立即的,需要延遲和定期做等情況,那麼是如何實作的呢?

這就是我們需要思考的了,通過源碼考察,我們發現,他們都有execute方法,隻是scheduledthreadpoolexecutor将源碼進行了重寫,并且還有以下四個排程器的方法:

<code>01</code>

<code>public</code> <code>scheduledfuture&lt;?&gt; schedule(runnable command,</code>

<code>02</code>

<code>                       </code><code>long</code> <code>delay, timeunit unit);</code>

<code>03</code>

<code>04</code>

<code>public</code>  <code>scheduledfuture schedule(callable callable,</code>

<code>05</code>

<code>06</code>

<code>07</code>

<code>public</code> <code>scheduledfuture&lt;?&gt; scheduleatfixedrate(runnable command,</code>

<code>08</code>

<code>                          </code><code>long</code> <code>initialdelay,</code>

<code>09</code>

<code>                          </code><code>long</code> <code>period,</code>

<code>10</code>

<code>                          </code><code>timeunit unit);</code>

<code>11</code>

<code>12</code>

<code>public</code> <code>scheduledfuture&lt;?&gt; schedulewithfixeddelay(runnable command,</code>

<code>13</code>

<code>                             </code><code>long</code> <code>initialdelay,</code>

<code>14</code>

<code>                             </code><code>long</code> <code>delay,</code>

<code>15</code>

<code>                             </code><code>timeunit unit);</code>

那麼這四個方法有什麼差別呢?其實第一個和第二個差別不大,一個是runnable、一個是callable,内部包裝後是一樣的效果;是以把頭兩個方法幾乎當成一種排程,那麼三種情況分别是:

1、 進行一次延遲排程:延遲delay這麼長時間,機關為:timeunit傳入的的一個基本機關,例如:timeunit.seconds屬于提供好的枚舉資訊;(适合于方法1和方法2)。

2、 多次排程,每次依照上一次預計排程時間進行排程,例如:延遲2s開始,5s一次,那麼就是2、7、12、17,如果中間由于某種原因導緻線程不夠用,沒有得到排程機會,那麼接下來計算的時間會優先計算進去,因為他的排序會被排在前面,有點類似timer中的:scheduleatfixedrate方法,隻是這裡是多線程的,它的方法名也叫:scheduleatfixedrate,是以這個是比較好記憶的(适合方法3)

3、 多次排程,每次按照上一次實際執行的時間進行計算下一次時間,同上,如果在第7秒沒有被得到排程,而是第9s才得到排程,那麼計算下一次排程時間就不是12秒,而是9+5=14s,如果再次延遲,就會延遲一個周期以上,也就會出現少調用的情況(适合于方法3);

4、 最後補充execute方法是一次排程,期望被立即排程,時間為空:

public void execute(runnable command) {

if (command == null)

throw new nullpointerexception();

schedule(command, 0, timeunit.nanoseconds);

}

我們簡單看看scheduleatfixedrate、schedulewithfixeddelay對下面的分析會更加有用途:

<code>                                                  </code><code>long</code> <code>initialdelay,</code>

<code>                                                  </code><code>long</code> <code>period,</code>

<code>                                                  </code><code>timeunit unit) {</code>

<code>        </code><code>if</code> <code>(command ==</code><code>null</code> <code>|| unit ==</code><code>null</code><code>)</code>

<code>            </code><code>throw</code> <code>new</code> <code>nullpointerexception();</code>

<code>        </code><code>if</code> <code>(period &lt;=</code><code>0</code><code>)</code>

<code>            </code><code>throw</code> <code>new</code> <code>illegalargumentexception();</code>

<code>        </code><code>runnablescheduledfuture&lt;?&gt; t = decoratetask(command,</code>

<code>            </code><code>new</code> <code>scheduledfuturetask&lt;object&gt;(command,</code>

<code>                                            </code><code>null</code><code>,</code>

<code>                                            </code><code>triggertime(initialdelay, unit),</code>

<code>                                            </code><code>unit.tonanos(period)));</code>

<code>        </code><code>delayedexecute(t);</code>

<code>        </code><code>return</code> <code>t;</code>

<code>16</code>

<code>    </code><code>}</code>

<code>17</code>

<code>18</code>

<code>    </code><code>public</code> <code>scheduledfuture&lt;?&gt; schedulewithfixeddelay(runnable command,</code>

<code>19</code>

<code>                                                     </code><code>long</code> <code>initialdelay,</code>

<code>20</code>

<code>                                                     </code><code>long</code> <code>delay,</code>

<code>21</code>

<code>                                                     </code><code>timeunit unit) {</code>

<code>22</code>

<code>23</code>

<code>24</code>

<code>        </code><code>if</code> <code>(delay &lt;=</code><code>0</code><code>)</code>

<code>25</code>

<code>26</code>

<code>27</code>

<code>            </code><code>new</code> <code>scheduledfuturetask&lt;boolean&gt;(command,</code>

<code>28</code>

<code>                                             </code><code>null</code><code>,</code>

<code>29</code>

<code>                                             </code><code>triggertime(initialdelay, unit),</code>

<code>30</code>

<code>                                             </code><code>unit.tonanos(-delay)));</code>

<code>31</code>

<code>32</code>

<code>33</code>

兩段源碼唯一的差別就是在unit.tonanos(int)這唯一一個地方,scheduleatfixedrate裡面是直接傳入值,而schedulewithfixeddelay裡面是取了相反數,也就是假如我們都傳入正數,schedulewithfixeddelay其實就取反了,沒有任何差別,你是否聯想到前面文章介紹timer中類似的處理手段通過正負數區分時間間隔方法,為0代表僅僅排程一次,其實在這裡同樣是這樣的,他們也同樣有一個問題就是,如果你傳遞負數,方法的功能正好是相反的。

而你會發現,不論是那個schedule方法裡頭,都會建立一個scheduledfuturetask類的執行個體,此類究竟是何方神聖呢,我們來看看。

scheduledfuturetask的類(schedulethreadpoolexecutor的私有的内部類)來進行排程,那麼可以看看内部做了什麼操作,如下:

<code>scheduledfuturetask(runnable r, v result,</code><code>long</code> <code>ns) {</code>

<code>    </code><code>super</code><code>(r, result);</code>

<code>    </code><code>this</code><code>.time = ns;</code>

<code>    </code><code>this</code><code>.period =</code><code>0</code><code>;</code>

<code>    </code><code>this</code><code>.sequencenumber = sequencer.getandincrement();</code>

<code>/**</code>

<code> </code><code>* creates a periodic action with given nano time and period.</code>

<code> </code><code>*/</code>

<code>scheduledfuturetask(runnable r, v result,</code><code>long</code> <code>ns,</code><code>long</code> <code>period) {</code>

<code>    </code><code>this</code><code>.period = period;</code>

<code> </code><code>* creates a one-shot action with given nanotime-based trigger.</code>

<code>scheduledfuturetask(callable callable,</code><code>long</code> <code>ns) {</code>

<code>    </code><code>super</code><code>(callable);</code>

最核心的幾個參數正好對應了排程的延遲的構造方法,這些參數如何用起來的?那麼它還提供了什麼方法呢?

<code>public</code> <code>long</code> <code>getdelay(timeunit unit) {</code>

<code>     </code><code>return</code> <code>unit.convert(time - now(), timeunit.nanoseconds);</code>

<code> </code><code>}</code>

<code> </code><code>public</code> <code>int</code> <code>compareto(delayed other) {</code>

<code>     </code><code>if</code> <code>(other ==</code><code>this</code><code>)</code><code>// compare zero only if same object</code>

<code>         </code><code>return</code> <code>0</code><code>;</code>

<code>     </code><code>if</code> <code>(other</code><code>instanceof</code> <code>scheduledfuturetask) {</code>

<code>         </code><code>scheduledfuturetask&lt;?&gt; x = (scheduledfuturetask&lt;?&gt;)other;</code>

<code>         </code><code>long</code> <code>diff = time - x.time;</code>

<code>         </code><code>if</code> <code>(diff &lt;</code><code>0</code><code>)</code>

<code>             </code><code>return</code> <code>-</code><code>1</code><code>;</code>

<code>         </code><code>else</code> <code>if</code> <code>(diff &gt;</code><code>0</code><code>)</code>

<code>             </code><code>return</code> <code>1</code><code>;</code>

<code>         </code><code>else</code> <code>if</code> <code>(sequencenumber &lt; x.sequencenumber)</code>

<code>         </code><code>else</code>

<code>     </code><code>}</code>

<code>     </code><code>long</code> <code>d = (getdelay(timeunit.nanoseconds) -</code>

<code>               </code><code>other.getdelay(timeunit.nanoseconds));</code>

<code>     </code><code>return</code> <code>(d ==</code><code>0</code><code>)?</code><code>0</code> <code>: ((d &lt;</code><code>0</code><code>)? -</code><code>1</code> <code>:</code><code>1</code><code>);</code>

這裡發現了,他們可以運作,且判定時間的方法是getdelay方法我們知道了。 對比時間的方法是:compareto,傳入了參數類型為:delayed類型,不難猜測出,scheduledfuturetask和delayed有某種繼承關系,沒錯,scheduledfuturetask實作了delayed的接口,隻是它是間接實作的;并且delayed接口繼承了comparable接口,這個接口可用來幹什麼?看過我前面寫的一篇文章關于中文和對象排序的應該知道,這個是用來自定義對比和排序的,我們的排程任務是一個對象,是以需要排序才行,接下來我們回溯到開始定義的代碼中,找一個實際調用的代碼來看看它是如何啟動到run方法的?如何排序的?如何調用延遲的?就是我們下文中會提到的,而這裡我們先提出問題,後文我們再來說明這些問題。 我們先來看下run方法的一些定義。

<code>/**            * 時間片類型任務執行            */</code>

<code>          </code><code>private</code> <code>void</code> <code>runperiodic() {</code>

<code>             </code><code>//運作對應的程式,這個是具體的程式</code>

<code>             </code><code>boolean</code> <code>ok = scheduledfuturetask.</code><code>super</code><code>.runandreset();</code>

<code>             </code><code>boolean</code> <code>down = isshutdown();</code>

<code>             </code><code>// reschedule if not cancelled and not shutdown or policy allows</code>

<code>             </code><code>if</code> <code>(ok &amp;&amp; (!down ||                        (getcontinueexistingperiodictasksaftershutdownpolicy() &amp;&amp;</code>

<code>!isstopped()))) {</code>

<code>                 </code><code>long</code> <code>p = period;</code>

<code>                 </code><code>if</code> <code>(p &gt;</code><code>0</code><code>)</code><code>//規定時間間隔算出下一次時間</code>

<code>                    </code><code>time += p;</code>

<code>                </code><code>else</code><code>//用目前時間算出下一次時間,負負得正</code>

<code>                    </code><code>time = triggertime(-p);</code>

<code>                </code><code>//計算下一次時間,并資深再次放入等待隊列中</code>

<code>                </code><code>scheduledthreadpoolexecutor.</code><code>super</code><code>.getqueue().add(</code><code>this</code><code>);</code>

<code>            </code><code>}</code>

<code>            </code><code>else</code> <code>if</code> <code>(down)</code>

<code>                </code><code>interruptidleworkers();</code>

<code>        </code><code>}</code>

<code>        </code><code>/**</code>

<code>         </code><code>* 是否為逐片段執行,如果不是,則調用父親類的run方法</code>

<code>         </code><code>*/</code>

<code>        </code><code>public</code> <code>void</code> <code>run() {</code>

<code>            </code><code>if</code> <code>(isperiodic())</code><code>//周期任務</code>

<code>                </code><code>runperiodic();</code>

<code>            </code><code>else</code><code>//隻執行一次的任務</code>

<code>                </code><code>scheduledfuturetask.</code><code>super</code><code>.run();</code>

可以看到run方法首先通過isperiod()判定是否為時間片,判定的依據就是我們說的時間片是否“不為零”,如果不是周期任務,就直接運作一次,如果是周期任務,則除了運作還會計算下一次執行的時間,并将其再次放入等待隊列,這裡對應到scheduleatfixedrate、schedulewithfixeddelay這兩個方法一正一負,在這裡得到判定,并且将為負數的取反回來,負負得正,java就是這麼幹的,呵呵,是以不要認為什麼是不可能的,隻要好用什麼都是可以的,然後計算的時間一個是基于标準的time加上一個時間片,一個是根據目前時間計算一個時間片,在上文中我們已經明确說明了兩者的差別。

以:schedule方法為例:

<code>                                           </code><code>long</code> <code>delay,</code>

<code>                                           </code><code>timeunit unit) {</code>

<code>        </code><code>if</code> <code>(callable ==</code><code>null</code> <code>|| unit ==</code><code>null</code><code>)</code>

<code>        </code><code>runnablescheduledfuture t = decoratetask(callable,</code>

<code>            </code><code>new</code> <code>scheduledfuturetask(callable,</code>

<code>                       </code><code>triggertime(delay, unit)));</code>

其實這個方法内部建立的就是一個我們剛才提到的:scheduledfuturetask,外面又包裝了下叫做runnablescheduledfuture,也就是适配了下而已,呵呵,代碼裡面就是一個return操作,java這樣做的目的是友善子類去擴充。

關鍵是delayedexecute(t)方法中做了什麼?看名稱是延遲執行的意思,難道java的線程可以延遲執行,那所有的任務線程都在運作狀态?

它的源碼是這樣的:

<code>private</code> <code>void</code> <code>delayedexecute(runnable command) {</code>

<code>    </code><code>if</code> <code>(isshutdown()) {</code>

<code>        </code><code>reject(command);</code>

<code>        </code><code>return</code><code>;</code>

<code>    </code><code>if</code> <code>(getpoolsize() &lt; getcorepoolsize())</code>

<code>        </code><code>prestartcorethread();</code>

<code>    </code><code>super</code><code>.getqueue().add(command);</code>

我們主要關心prestartcorethread()和super.getqueue().add(command),因為如果系統關閉,這些讨論都沒有意義的,我們分别叫他們第二小段代碼和第三小段代碼。

第二個部分如果線程數小于核心線程數設定,那麼就調用一個prestartcorethread(),看方法名應該是:預先啟動一個核心線程的意思,先看完第三個部分,再跟蹤進去看源碼。

第三個部分很明了,就是調用super.getqueue().add(command);也就是說直接将任務放入一個隊列中,其實super是什麼?super就是我們上一篇文章所提到的threadpoolexecutor,那麼這個queue就是上一篇文章中提到的等待隊列,也就是任何schedule任務首先放入等待隊列,然後等待被排程的。

<code>public</code> <code>boolean</code> <code>prestartcorethread() {</code>

<code>    </code><code>return</code> <code>addifundercorepoolsize(</code><code>null</code><code>);</code>

<code>private</code> <code>boolean</code> <code>addifundercorepoolsize(runnable firsttask) {</code>

<code>    </code><code>thread t =</code><code>null</code><code>;</code>

<code>6</code>

<code>    </code><code>final</code> <code>reentrantlock mainlock =</code><code>this</code><code>.mainlock;</code>

<code>7</code>

<code>    </code><code>mainlock.lock();</code>

<code>8</code>

<code>    </code><code>try</code> <code>{</code>

<code>9</code>

<code>        </code><code>if</code> <code>(poolsize &lt; corepoolsize &amp;&amp; runstate == running)                 t = addthread(firsttask);         }</code><code>finally</code> <code>{             mainlock.unlock();         }        </code><code>if</code> <code>(t ==</code><code>null</code><code>)            </code><code>return</code> <code>false</code><code>;         t.start();        </code><code>return</code> <code>true</code><code>; }</code>

這個代碼是否似曾相似,沒錯,這個你在上一篇文章介紹threadpoolexecutor的時候就見到過,說明不論是threadpoolexecutor還是schedulethreadpoolexecutor他們的thread都是由一個worker來處理的(上一篇文章有介紹),而這個worker處理的基本機制就是将目前任務執行後,不斷從線程等待隊列中擷取資料,然後用以執行,直到隊列為空為止。 那麼他們的差別在哪裡呢?延遲是如何實作的呢?和我們上面介紹的scheduledfuturetask又有何關系呢? 那麼我們回過頭來看看schedulethreadpool的定義是如何的。

發現它和threadpoolexecutor有個定義上很大的差別就是,threadpoolexecutor用的是linkedblockingqueue(當然可以修改),它用的是delayedweorkqueue,而這個delayedworkqueue裡面你會發現它僅僅是對java.util.concurrent.delayedqueue類一個簡單通路包裝,這個隊列就是等待隊列,可以看到任務是被直接放到等待隊列中的,是以取資料必然從這裡擷取,而這個延遲的隊列有何神奇之處呢,它又是如何實作的呢,我們從什麼地方下手去看這個delayworkqueue? 我們還是回頭看看worker裡面的run方法(上一篇文章中已經講過):

<code>public</code> <code>void</code> <code>run() {</code>

<code>     </code><code>try</code> <code>{</code>

<code>         </code><code>runnable task = firsttask;</code>

<code>         </code><code>firsttask =</code><code>null</code><code>;</code>

<code>         </code><code>while</code> <code>(task !=</code><code>null</code> <code>|| (task = gettask()) !=</code><code>null</code><code>) {</code>

<code>             </code><code>runtask(task);</code>

<code>             </code><code>task =</code><code>null</code><code>;</code>

<code>         </code><code>}</code>

<code>     </code><code>}</code><code>finally</code> <code>{</code>

<code>         </code><code>workerdone(</code><code>this</code><code>);</code>

這裡面要調用等待隊列就是gettask()方法:

<code>runnable gettask() {</code>

<code>         </code><code>for</code> <code>(;;) {</code>

<code>             </code><code>try</code> <code>{</code>

<code>                 </code><code>int</code> <code>state = runstate;</code>

<code>                 </code><code>if</code> <code>(state &gt; shutdown)</code>

<code>                    </code><code>return</code> <code>null</code><code>;</code>

<code>                </code><code>runnable r;</code>

<code>                </code><code>if</code> <code>(state == shutdown) </code><code>// help drain queue</code>

<code>                    </code><code>r = workqueue.poll();</code>

<code>                </code><code>else</code> <code>if</code> <code>(poolsize &gt; corepoolsize || allowcorethreadtimeout)</code>

<code>                    </code><code>r = workqueue.poll(keepalivetime, timeunit.nanoseconds);</code>

<code>                </code><code>else</code>

<code>                    </code><code>r = workqueue.take();</code>

<code>                </code><code>if</code> <code>(r !=</code><code>null</code><code>)</code>

<code>                    </code><code>return</code> <code>r;</code>

<code>                </code><code>if</code> <code>(workercanexit()) {</code>

<code>                    </code><code>if</code> <code>(runstate &gt;= shutdown)</code><code>// wake up others</code>

<code>                        </code><code>interruptidleworkers();</code>

<code>                </code><code>}</code>

<code>            </code><code>}</code><code>catch</code> <code>(interruptedexception ie) {</code>

發現沒有,如果沒有設定逾時,預設隻會通過workqueue.take()方法擷取資料,那麼我們就看take方法,而增加到隊列裡面的方法自然看offer相關的方法。接下來我們來看下delayqueue這個隊列的take方法:

<code>public</code> <code>e take()</code><code>throws</code> <code>interruptedexception {</code>

<code>        </code><code>final</code> <code>reentrantlock lock =</code><code>this</code><code>.lock;</code>

<code>        </code><code>lock.lockinterruptibly();</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>for</code> <code>(;;) {</code>

<code>                </code><code>e first = q.peek();</code>

<code>                </code><code>if</code> <code>(first ==</code><code>null</code><code>) {</code>

<code>                    </code><code>available.await();</code><code>//等待信号,線程一直挂在哪裡</code>

<code>                </code><code>}</code><code>else</code> <code>{</code>

<code>                    </code><code>long</code> <code>delay =  first.getdelay(timeunit.nanoseconds);</code>

<code>                    </code><code>if</code> <code>(delay &gt;</code><code>0</code><code>) {</code>

<code>                        </code><code>long</code> <code>tl = available.awaitnanos(delay);</code><code>//最左等delay的時間段</code>

<code>                    </code><code>}</code><code>else</code> <code>{</code>

<code>                        </code><code>e x = q.poll();</code><code>//可以運作,取出一個</code>

<code>                        </code><code>assert</code> <code>x !=</code><code>null</code><code>;</code>

<code>                        </code><code>if</code> <code>(q.size() !=</code><code>0</code><code>)</code>

<code>                            </code><code>available.signalall();</code>

<code>                        </code><code>return</code> <code>x;</code>

<code>                    </code><code>}</code>

<code>        </code><code>}</code><code>finally</code> <code>{</code>

<code>            </code><code>lock.unlock();</code>

這裡的for就是要找到資料為止,否則就等着,而這個“q”和“available”是什麼呢?

private transient final condition available = lock.newcondition();

private final priorityqueue q = new priorityqueue();

怎麼裡面還有一層隊列,不用怕,從這裡你貌似看出點名稱意味了,就是它是優先級隊列,而對于任務排程來講,優先級的方式就是時間,我們用這中猜測來繼續深入源碼。

上面首先擷取這個隊列的第一個元素,若為空,就等待一個“available”發出的信号,我們可以猜測到這個offer的時候會發出的信号,一會來驗證即可;若不為空,則通過getdelay方法來擷取時間資訊,這個getdelay方法就用上了我們開始說的scheduledfuturetask了,如果是時間大于0,則也進入等待,因為還沒開始執行,等待也是“available”發出信号,但是有一個最長時間,為什麼還要等這個信号,是因為有可能進來一個新的任務,比這個等待的任務還要先執行,是以要等這個信号;而最多等這麼長時間,就是因為如果這段時間沒任務進來肯定就是它執行了。然後就傳回的這個值,被worker(上面有提到)拿到後調用其run()方法進行運作。

那麼寫入隊列在那裡?他們是如何排序的?

我們看看隊列的寫入方法是這樣的:

<code>public</code> <code>boolean</code> <code>offer(e e) {</code>

<code>        </code><code>lock.lock();</code>

<code>            </code><code>e first = q.peek();</code>

<code>            </code><code>q.offer(e);</code>

<code>            </code><code>if</code> <code>(first ==</code><code>null</code> <code>|| e.compareto(first) &lt;</code><code>0</code><code>)</code>

<code>                 </code><code>available.signalall();</code>

<code>             </code><code>return</code> <code>true</code><code>;</code>

<code>         </code><code>}</code><code>finally</code> <code>{</code>

<code>             </code><code>lock.unlock();</code>

隊列也是首先取出第一個(後面會用來和目前任務做比較),而這裡“q”是上面提到的“priorityqueue”,看來offer的關鍵還在它的裡面,我們看看調用過程:

<code>         </code><code>if</code> <code>(e ==</code><code>null</code><code>)</code>

<code>             </code><code>throw</code> <code>new</code> <code>nullpointerexception();</code>

<code>         </code><code>modcount++;</code>

<code>         </code><code>int</code> <code>i = size;</code>

<code>         </code><code>if</code> <code>(i &gt;= queue.length)</code>

<code>            </code><code>grow(i +</code><code>1</code><code>);</code>

<code>        </code><code>size = i +</code><code>1</code><code>;</code>

<code>        </code><code>if</code> <code>(i ==</code><code>0</code><code>)</code>

<code>            </code><code>queue[</code><code>0</code><code>] = e;</code>

<code>        </code><code>else</code>

<code>            </code><code>siftup(i, e);</code><code>//主要是這條代碼很關鍵</code>

<code>        </code><code>return</code> <code>true</code><code>;</code>

<code>private</code> <code>void</code> <code>siftup(</code><code>int</code> <code>k, e x) {</code>

<code>        </code><code>if</code> <code>(comparator !=</code><code>null</code><code>)</code>

<code>            </code><code>siftupusingcomparator(k, x);</code>

<code>        </code><code>//我們預設走這裡,因為delayqueue定義它的時候預設沒有給定義comparator</code>

<code>            </code><code>siftupcomparable(k, x);</code>

<code>/*</code>

<code>可以發現這個方法是将任務按照compareto對比後,放在隊列的合适位置,但是它肯定不是絕對順序的,這一點和timer的内部排序機制類似。</code>

<code>*/</code>

<code>private</code> <code>void</code> <code>siftupcomparable(</code><code>int</code> <code>k, e x) {</code>

<code>        </code><code>comparable&lt;?</code><code>super</code> <code>e&gt; key = (comparable&lt;?</code><code>super</code> <code>e&gt;) x;</code>

<code>        </code><code>while</code> <code>(k &gt;</code><code>0</code><code>) {</code>

<code>            </code><code>int</code> <code>parent = (k -</code><code>1</code><code>) &gt;&gt;&gt;</code><code>1</code><code>;</code>

<code>            </code><code>object e = queue[parent];</code>

<code>            </code><code>if</code> <code>(key.compareto((e) e) &gt;=</code><code>0</code><code>)</code>

<code>                </code><code>break</code><code>;</code>

<code>            </code><code>queue[k] = e;</code>

<code>            </code><code>k = parent;</code>

<code>34</code>

<code>35</code>

<code>        </code><code>queue[k] = key;</code>

<code>36</code>

你是否發現,compareto也用上了,就是我們前面描述一大堆的:scheduledfuturetask類中的一個方法,那麼run方法也用上了,這個過程貌似完整了。

我們再來理一下思路:

1、調用的thread的包裝,由在threadpoolexecutor中的worker調用你傳入的runnable的run方法,變成了worker調用runnable的run方法,由它來處理時間片的資訊調用你傳入的線程。

2、scheduledfuturetask類在整個過程中提供了基礎參考的方法,其中最為關鍵的就是實作了接口comparable,實作内部的compareto方法,也實作了delayed接口中的getdelay方法用以判定時間(當然delayed接口本身也是繼承于comparable,我們不要糾結于細節概念就好)。

3、等待隊列由在threadpoolexecutor中預設使用的linkedblockingqueue換成了delayqueue(它是被delayworkqueue包裝了一下子,沒多大差別),而delayqueue主要提供了一個信号量“available”來作為寫入和讀取的信号控制開關,通過另一個優先級隊列“priorityqueue”來控制實際的隊列順序,他們的順序就是基于上面提到的scheduledfuturetask類中的compareto方法,而是否運作也是基于getdelay方法來實作的。

4、scheduledfuturetask類的run方法會判定是否為時間片資訊,如果為時間片,在執行完對應的方法後,開始計算下一次執行時間(注意判定時間片大于0,小于0,分别代表的是以目前執行完的時間為準計算下一次時間還是以目前時間為準),這個在前面有提到。

5、它是支援多線程的,和timer的機制最大的差別就在于多個線程會最征用這個隊列,隊裡的排序方式和timer有很多相似之處,并非完全有序,而是通過位移動來盡量找到合适的位置,有點類似貪心的算法,呵呵。