天天看點

定制并發類(七)實作ThreadFactory接口生成自定義的線程給Fork/Join架構

實作threadfactory接口生成自定義的線程給fork/join架構

fork/join架構是java7中最有趣的特征之一。它是executor和executorservice接口的一個實作,允許你執行callable和runnable任務而不用管理這些執行線程。

這個執行者面向執行能被拆分成更小部分的任務。主要元件如下:

一個特殊任務,實作forkjointask類

兩種操作,将任務劃分成子任務的fork操作和等待這些子任務結束的join操作

一個算法,優化池中線程的使用的work-stealing算法。當一個任務正在等待它的子任務(結束)時,它的執行線程将執行其他任務(等待執行的任務)。

forkjoinpool類是fork/join的主要類。在它的内部實作,有如下兩種元素:

一個存儲等待執行任務的列隊。

一個執行任務的線程池

在這個指南中,你将學習如何實作一個在forkjoinpool類中使用的自定義的工作者線程,及如何使用一個工廠來使用它。

準備工作…

這個指南的例子使用eclipse ide實作。如果你使用eclipse或其他ide,如netbeans,打開它并建立一個新的java項目。

如何做…

按以下步驟來實作的這個例子:

1.建立一個繼承forkjoinworkerthread類的myworkerthread類。

2.聲明和建立一個參數化為integer類的threadlocal屬性,名為taskcounter。

3.實作這個類的構造器。

4.重寫onstart()方法。調用父類的這個方法,寫入一條資訊到控制台。設定目前線程的taskcounter屬性值為0。

5.重寫ontermination()方法。寫入目前線程的taskcounter屬性值到控制台。

6.實作addtask()方法。遞增taskcounter屬性值。

7.建立一個實作forkjoinworkerthreadfactory接口的myworkerthreadfactory類。實作newthread()方法,建立和傳回一個myworkerthread對象。

8.建立myrecursivetask類,它繼承一個參數化為integer類的recursivetask類。

9.聲明一個私有的、int類型的屬性array。

10.聲明兩個私有的、int類型的屬性start和end。

11.實作這個類的構造器,初始化它的屬性。

12.實作compute()方法,用來合計數組中在start和end位置之間的所有元素。首先,将執行這個任務的線程轉換成一個myworkerthread對象,然後使用addtask()方法來增長這個線程的任務計數器。

13.實作addresults()方法。計算和傳回兩個任務(接收參數)的結果的總和。

14.令這個線程睡眠10毫秒,然後傳回任務的結果。

15.實作這個例子的主類,通過建立main類,并實作main()方法。

16.建立一個名為factory的myworkerthreadfactory對象。

17.建立一個名為pool的forkjoinpool對象,将前面建立的factory對象作為參數傳給它的構造器。

18.建立一個大小為100000的整數數組,将所有元素初始化為值1。

19.建立一個新的task對象,用來合計數組中的所有元素。

20.使用execute()方法,将這個任務送出給池。

21.使用join()方法,等待這個任務的結束。

22.使用shutdown()方法,關閉這個池。

23.使用awaittermination()方法,等待這個執行者的結束。

24.使用get()方法,将任務的結束寫入到控制台。

25.寫入一條資訊到控制台,表明程式的結束。

它是如何工作的…

fork/join架構使用的線程叫工作者線程。java包含繼承thread類的forkjoinworkerthread類和使用fork/join架構實作工作者線程。

在這個指南中,你已實作了繼承forkjoinworkerthread類的myworkerthread類,并重寫這個類的兩個方法。你的目标是實作每個工作者線程的任務計數器,以至于你可以知道每個工作者線程執行多少個任務。你已經通過一個threadlocal屬性實作計數器。這樣,每個線程都擁有它自己的計數器,對于來你說是透明的。

你已重寫forkjoinworkerthread類的onstart()方法來實作任務的計數器。當工作者線程開始它的執行時,這個方法将被調用。你也重寫了ontermination()方法,将任務計數器的值寫入到控制台。當工作者線程結束它的執行時,這個方法将被調用。你也在myworkerthread類中實作addtask()方法,用來增加每個線程的任務計數器。

對于forkjoinpool類,與java并發api中的所有執行者一樣,使用工廠來建立它。是以,如果你想在forkjoinpool類中使用myworkerthread線程,你必須實作自己的線程工廠。對于fork/join架構,這個工廠必須實作forkjoinpool.forkjoinworkerthreadfactory類。為此,你已實作myworkerthreadfactory類。這個類隻有一個用來建立一個新的myworkerthread對象的方法。

最後,你隻要使用已建立的工廠來初始化forkjoinpool類。你已在main類中通過使用forkjoinpool的構造器實作了。

以下截圖顯示了這個程式的部分輸出:

定制并發類(七)實作ThreadFactory接口生成自定義的線程給Fork/Join架構

你可以看出forkjoinpool對象如何執行4個工作者線程及每個工作者線程執行多少個任務。

不止這些…

考慮一下,當一個線程正常結束或抛出一個exception異常時,調用的forkjoinworkerthread提供的ontermination()方法。這個方法接收一個throwable對象作為參數。如果這個參數值為null時,表明這個工作者線程正常結束。但是,如果這個參數的值不為null,表明這個線程抛出一個異常。你必須包含必要的代碼來處理這種情況。

參見

在第1章,線程管理中的通過工廠建立線程指南