實作threadfactory接口生成自定義的線程給fork/join架構
fork/join架構是java7中最有趣的特征之一。它是executor和executorservice接口的一個實作,允許你執行callable和runnable任務而不用管理這些執行線程。
這個執行者面向執行能被拆分成更小部分的任務。主要元件如下:
一個特殊任務,實作forkjointask類
兩種操作,将任務劃分成子任務的fork操作和等待這些子任務結束的join操作
一個算法,優化池中線程的使用的work-stealing算法。當一個任務正在等待它的子任務(結束)時,它的執行線程将執行其他任務(等待執行的任務)。
forkjoinpool類是fork/join的主要類。在它的内部實作,有如下兩種元素:
一個存儲等待執行任務的列隊。
一個執行任務的線程池
在這個指南中,你将學習如何實作一個在forkjoinpool類中使用的自定義的工作者線程,及如何使用一個工廠來使用它。
準備工作…
這個指南的例子使用eclipse ide實作。如果你使用eclipse或其他ide,如netbeans,打開它并建立一個新的java項目。
如何做…
按以下步驟來實作的這個例子:
1.建立一個繼承forkjoinworkerthread類的myworkerthread類。
2.聲明和建立一個參數化為integer類的threadlocal屬性,名為taskcounter。
3.實作這個類的構造器。
4.重寫onstart()方法。調用父類的這個方法,寫入一條資訊到控制台。設定目前線程的taskcounter屬性值為0。
5.重寫ontermination()方法。寫入目前線程的taskcounter屬性值到控制台。
6.實作addtask()方法。遞增taskcounter屬性值。
7.建立一個實作forkjoinworkerthreadfactory接口的myworkerthreadfactory類。實作newthread()方法,建立和傳回一個myworkerthread對象。
8.建立myrecursivetask類,它繼承一個參數化為integer類的recursivetask類。
9.聲明一個私有的、int類型的屬性array。
10.聲明兩個私有的、int類型的屬性start和end。
11.實作這個類的構造器,初始化它的屬性。
12.實作compute()方法,用來合計數組中在start和end位置之間的所有元素。首先,将執行這個任務的線程轉換成一個myworkerthread對象,然後使用addtask()方法來增長這個線程的任務計數器。
13.實作addresults()方法。計算和傳回兩個任務(接收參數)的結果的總和。
14.令這個線程睡眠10毫秒,然後傳回任務的結果。
15.實作這個例子的主類,通過建立main類,并實作main()方法。
16.建立一個名為factory的myworkerthreadfactory對象。
17.建立一個名為pool的forkjoinpool對象,将前面建立的factory對象作為參數傳給它的構造器。
18.建立一個大小為100000的整數數組,将所有元素初始化為值1。
19.建立一個新的task對象,用來合計數組中的所有元素。
20.使用execute()方法,将這個任務送出給池。
21.使用join()方法,等待這個任務的結束。
22.使用shutdown()方法,關閉這個池。
23.使用awaittermination()方法,等待這個執行者的結束。
24.使用get()方法,将任務的結束寫入到控制台。
25.寫入一條資訊到控制台,表明程式的結束。
它是如何工作的…
fork/join架構使用的線程叫工作者線程。java包含繼承thread類的forkjoinworkerthread類和使用fork/join架構實作工作者線程。
在這個指南中,你已實作了繼承forkjoinworkerthread類的myworkerthread類,并重寫這個類的兩個方法。你的目标是實作每個工作者線程的任務計數器,以至于你可以知道每個工作者線程執行多少個任務。你已經通過一個threadlocal屬性實作計數器。這樣,每個線程都擁有它自己的計數器,對于來你說是透明的。
你已重寫forkjoinworkerthread類的onstart()方法來實作任務的計數器。當工作者線程開始它的執行時,這個方法将被調用。你也重寫了ontermination()方法,将任務計數器的值寫入到控制台。當工作者線程結束它的執行時,這個方法将被調用。你也在myworkerthread類中實作addtask()方法,用來增加每個線程的任務計數器。
對于forkjoinpool類,與java并發api中的所有執行者一樣,使用工廠來建立它。是以,如果你想在forkjoinpool類中使用myworkerthread線程,你必須實作自己的線程工廠。對于fork/join架構,這個工廠必須實作forkjoinpool.forkjoinworkerthreadfactory類。為此,你已實作myworkerthreadfactory類。這個類隻有一個用來建立一個新的myworkerthread對象的方法。
最後,你隻要使用已建立的工廠來初始化forkjoinpool類。你已在main類中通過使用forkjoinpool的構造器實作了。
以下截圖顯示了這個程式的部分輸出:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiInBnauEDNvwFOw8CXzEDMy8CXzRWYvxGc19CX05WZ052bj1Cc39CXt92YuUmdlZWavw1LcpDc0RHaiojIsJye.jpg)
你可以看出forkjoinpool對象如何執行4個工作者線程及每個工作者線程執行多少個任務。
不止這些…
考慮一下,當一個線程正常結束或抛出一個exception異常時,調用的forkjoinworkerthread提供的ontermination()方法。這個方法接收一個throwable對象作為參數。如果這個參數值為null時,表明這個工作者線程正常結束。但是,如果這個參數的值不為null,表明這個線程抛出一個異常。你必須包含必要的代碼來處理這種情況。
參見
在第1章,線程管理中的通過工廠建立線程指南