Fork/Join 架構主要由Fork 和 Join 兩個操作構成:
1)Fork操作主要用于對任務和資料進行劃分,一般是将一個大問題劃分為幾個小問題,以期望能夠友善地對簡化後地問題進行處理
2)Join操作主要作用于對各個部分地運作結果進行合并,相當于程式中的一個障栅。
這兩個操作與MapReduce中的Map/Reduce操作類似,簡而言之,Fork/Join架構是一種設計模式,是一種思想,在其他程式設計語言中也同樣适用。
Fork/Join 架構實作了任務的定義和任務處理功能的分離,使程式員在現實并行程式設計的同時,可以更好的集中精力實作業務邏輯相關的内容,這大大降低了并行程式設計的難度。
在Java中,Fork/Join 架構實作了ExecutorService接口,與任何以一個ExecutorService接口的功能相同的是:Fork/Join會自動地将任務配置設定給線程池中地線程,并負責線程管理地相關工作。
與ExecutorService不同地是:Fork/Join使用了工作竊取算法,已經完成自身任務線程可以從其他繁忙地線程中竊取任務來執行,進而保持了線程執行過程中地負載均衡。
Fork/Join 主要用途:Fork/Join架構通常被用于解決那些可以遞歸地分解為更小地任務地問題。該架構與負載均衡、分治(Divide and Conquer)方法及工作竊取算法(Work-steaing Algorithm)有密切地聯系。
Fork/Join地負債均衡
負載不均衡會導緻一些線程空閑,進而造成資源浪費。常見了負載不均衡地原因是配置設定給某個線程地任務過多,其他線程都執行完畢,但該線程仍繼續執行。
負載均衡是指在CPU處理器核上運作地線程全部以相同地繁忙程度來工作,理想情況下,所有地線程都都成同樣大小的工作量。
負載均衡有利于加快程式的執行,減少資源浪費,是以我們在設計并行程式時,也應該保證負載均衡,但有些情況的負債均衡任務是由作業系統或JVM虛拟機來完成的。
Frok/Join的分治方法
分治方法是簡化問題的一種處理方法,該方法的基本思想是将一個複雜的任務分解為若幹個小任務,然後分别解決這些小任務。
使用分治方法的步驟:
1)分解。将複雜的任務分成小任務,直到把任務分解到容易解決為止。
2)解決。對已完成分解的小人物進行求解。
3)j結果合并。如果每個任務都有傳回結果,則需要對結果進行合并。
分治法可以解決合并排序,二分搜尋和矩陣相乘問題。
在使用Fork/Join架構的過程中,首先對ForkJoinTask進行分解,任務分解的數目可以根據問題的特征以及CPU可同時處理的線程數進行設定,然後将分解的任務交給ForkJoinPool處理,最後對處理結果進行收集。
工作竊取算法:
工作竊取算法是提高程式性能,保證負債均衡的一種算法。該算法的基本思想是當程式中某些線程做完自身的工作後,去檢視其他線程是否還有未處理完成的工作,如果有,則竊取一部分工作來執行,進而幫助那些未完成的
程式盡快完成他們的工作。
程式在使用該算法後,一方面,可以保證線程始終處于一種忙碌狀态,提高資源的使用率;另一方面,也可以減少其他繁忙線程的處理時間,有助于提高程式的性能。
從本質上來說,工作竊取算法是一種任務排程方法,盡量使每個線程都能處于忙碌狀态。生活中"竊取"是一個編譯詞,但在這裡卻有很積極的意義。
Fork/Join架構與Executor架構的不同之處在于Fork/Join架構采用了工作竊取算法,可以說該算法是Fork/Join架構的核心。在ForKJoinPool中,有一些線程執行任務較快,在做完自己的工作之後,這些線程将嘗試發現那些未被執行的任務,
如果找到,則執行這些任務。
Fork/Join架構采用雙端隊列(Deque)作為任務的存儲結構,該隊列支援後進先出的資料pop和push操作,并且支援先進先出的take操作。由某一工作線程建立的子任務仍然會被加入到線程的隊列中,一般采用push操作,工作線程從自己的
雙端隊列中取出執行任務,一般采用pop操作,當工作線程需要從其他線程的工作隊列中竊取任務時,一般采用take操作。
線程池中的線程每次是從隊列的頭部取出任務來執行,當使用使用fork操作産生新任務時,會把新的任務加入到隊列的頭部,而不像其他線程池加入到尾部,這樣可以保證fork出來的新的任務可以盡快得到執行。
當某個線程執行完自己的任務,而沒有其他任務可處理時,就從隊列尾部竊取一個任務執行。
Fork/Join 架構的程式設計技術
java中Fok/Join架構比較适于解決那些具有遞歸性質,可以進行任務分解的程式。
Fork/Join架構需要對任務進行分解和合并操作,在分解前,首先檢視問題的規模是否超過了預設門檻值(Threshold Value),在任務規模不大的情況下,采用串行的解決方式,由于該方式省去了分解和合并的操作有時效果會更好。
在使用Fork/Join架構時,門檻值通常時人為地進行設定,當問題地規模小于門檻值時,說明沒必要采用并行地解決方式,是以更傾向于采用串行執行方式或者采用其他更優化地算法解決;
而當問題地規模大于門檻值時,采用Fork/Join架構求解。
使用Fork/Join架構程式設計模式:如下所示:
if(問題模式<門檻值){
//使用串行模式解決或者其他模式解決
}else{
//将任務Task盡心分解,分解為若幹個小任務 Task1,Task2.......
//将任務Task1、Task2送出到線程池執行
//如果任務有返滬結果,收集結果
}
ForkJoinPool 類
ForkJoinPool類是Fork/Join架構地核心,也是Fork/Join架構執行地入口點,它是實作了接口Executor Service。ForkJoinPool類地任務是負責管理線程,并提供線程執行狀态和任務處理地相關資訊。
ForkJoinPool類差別于其他ExecutorService的地方在于該類實作了工作竊取算法,線上程池中的線程總是嘗試發現其他可以運作的任務,可以通過相關的方法擷取工作竊取的執行情況。
ForkJoinPool的建立
ForkJoinPool類從AbstractExecutorService類繼承,主要用于處理ForkJoinTask中的任務。
以下是ForkJoinPool類構造方法的形式:
1)ForkJoinPool(); //該構造方法将預設生成一個線程池,線程池中可以同時運作的線程數和CPU能夠同時運作的最大線程數(可以通過Runtime.getRuntim.availableProcessors()擷取)相同。
2)ForkJoinPool(int parallelism);//使用者可以指定線程池中線程的數目。
//parallelism 指明并行運作的線程數
//factory 是線程工廠,用于建立新線程
//handler 使用者處理内部出現的異常
//asyncMode 用于指定ForkJoinPool的工作模式,當為true時,表示工作本地的先進先出模式。 預設為false。對于某些應用來講,為true比預設的更加合适。
3)ForJoinPool(int parallelism,ForkJoinWorkThreadFactory factory,Thread.UncaughtExceptionHandler handler,boolean asyncMode);//
demo 示例:
建立一個ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
ForkJoinPool pool = new ForkJoinPool(8);
ForkJoinPool的使用:
ForkJoinPool的使用大緻可以分為兩種,一種時通過invoke、execute和submit執行的任務,另一種時在程式執行過程中通過fork操作來執行的任務。
ForkJoinPool常用的方法:
1)invoke(ForkJoinTask<T> task);//是一個同步調用方法,用于處理給定任務,并在處理完畢後傳回處理的結果,傳回結果的類型由T指定。
2)invokeAll(Collection<? extends Callable<T> >tasks);//是一個同步調用方法,可以将若幹個任務組成的一個集合交給ForkJoinPool執行,任務在繼續運作之前會等待子任務的結束,該方法
傳回任務的結果,結果類型由T指定。ForkJoinTask類的該方法是Executor和Fork/Join架構的主要不同之處。
3)execute(Runnable task);//可以把一個Runnable線程所有代表的任務收納櫃ForkJoinPool中,需要指出的是:ForkJoinPool不會對Runnable對象使用工作竊取算法,該算法隻會被應用到ForkJoinTask對象中。
同樣,ForkJoinPool不會對Callable對象使用工作竊取算法。
4)submit(ForkJoinTask<T> task);//用于把一個任務送出給ForkJoinPool執行,傳回ForkJoinTask<T>的結果。
以上三類執行任務的方法,具體使用如下:
------------------------------------------------------------------------------------------------------------------------
| 在外部對Fork/Join操作的調用 | 在Fork/Join架構範圍内使用
------------------------------------------------------------------------------------------------------------------------
異步執行 | execute(ForkJoinTask) | ForkJoinTask.fork()
------------------------------------------------------------------------------------------------------------------------
同步執行(等待子任務完成) | invoke(ForkJoinTask) | ForkJoinTask.invoke()
------------------------------------------------------------------------------------------------------------------------
執行并獲得結果 | submit(ForkJoinTask) | ForkJoinTask.fork()
------------------------------------------------------------------------------------------------------------------------
Fork/Join架構中的任務
ForkJoinTask類是所有的在Fork/Join架構中執行任務的基類,提供了一系列機制來實作Fork和Join操作,該類由兩個子類,分别是RecursiveAction和RecursiveTask,從RecursiveAction類繼承類的子類方法一般沒有傳回值,
從RecursiveTask類繼承的子類則由傳回值。
public abstract class ForkJoinTask<?> extends Object implements Future<V>,Serializable
ForkJoinTask 類實作了接口Serializable 和 Future<V>,是以一般在子類加入serialVersionUID變量定義。 如:
private static final long SerialVersionUID = 1L;
Fork/Join 任務的建立
在建立任務時,最好不要從ForkJoinTask類直接繼承,而是從該類的子類RecursiveAction或RecursiveTask繼承。
1)從RecursiveAction 繼承承建任務
從RecursiveAction類繼承的子類方法一般沒有傳回值。繼承後的新類需要重寫該類的computer()方法。
computer()方法的形式如下:
@Override
public void computer(){
//方法體
}
isDone()方法,用于判斷任務是否完成。
cancel(booleanmayInterrupteIfRunning)用于取消一個任務的執行。
當任務送出到ForkJoinPool後獲得運作的機會。
demo 示例:
使用Fork/Join 架構對班級的人數進行更新。當需要更新班級的人超過10時,更新任務進行細分。
分析:Fork/Join 架構的門檻 10
//班級的對象類
public class ClassInfo{
private String name;
private int number;
public ClassInfo( String name,int number){
this.name = name;
this.number = number;
}
public String getName(){
return name;
}
public int getNumber(){
return number;
}
public void setNumber(int number){
this.number = number;
}
}
/更新操作的任務類
public class UpdateTask extends RecursiveAction{
private static final long serialVersionUID =1L;
private List<ClassInfo> classInfos;
private int start;
private int end;
private int increment;
private int nthreads;
private int threshold;
public UpdateTask(List<ClassInfo> classInfos,int start,int end,int increment,int nthreads,int threshold){
this.classInfos = classInfos;
this.start = start;
this.end = end;
this.increment = increment;
this.nthreads = nthreads;
this.threshold = threshold;
}
@Override
public void compute(){
//門檻 串行
if(end-start<=threshold | threshold ==1){
updateSequential();
}else{
UpdateTask [] tasks = new UpdateTask[nthreads];
int [] data = new int[nthreads+1];
int segment = (end -start+nthreads-1)/nthreads;
for(int i=0;i<=nthreads;i++){
data[i] = start+segment*i;
if(data[i]>end){
data[i] = end;
}
}
int mid = (end=start)/2;
for(int i=0;i<nthreads;i++){
tasks[i] = new UpdateTask(classInfos,data[i],data[i+1],increment,nthreads,threshold);
}
invokeAll(tasks);
}
}
public void updateSequential(){
for(int i = start;i<end;i++){
ClassInfo classInfo = classInfos.get(i);
classInfo.setNumber(classInfo.getNumber()+increment);
}
}
}
//啟動測試類
public class Index{
public static void main(String [] args ){
int nthreads = Runtime.getRuntime().availableProcessors();
int threshold = 10;
int increment = 5;
int baseNum = 50;
int size = 10000;
List<ClassInfo> classInfos = new ArrayList<ClassInfo>();
for(int i=0;i<size;i++){
ClassInfo classInfo = new ClassInfo("班級"+i,baseNum);
classInfos.add(classInfo);
}
ForkJoinPool pool = new ForkJoinPool();
UpdateTask updateTask = new UpdateTask(classInfos,0,classInfos.size(),increment,nthreads,threshold);
pool.execute(updateTask);
do{
System.out.printf("類Index:并行度:%d\n",pool.getParallelism());
System.out.printf("類Index:活動線程數:%d\n",pool.getActiveThreadCount());
System.out.printf("類Index:任務數:%d\n",pool.getQueuedTaskCount());
System.out.printf("類Index:%d\n",pool.getStealCount());
try{
TimeUnit.SECONDS.sleep(1);
}catch(InterruptedException e){
e.printStackTrace();
}
}while(!updateTask.isDone());
if(validate(classInfos,baseNum+increment)){
System.out.println("所有班級更新完畢!");
}else{
System.out.println("Something wrong happend.");
}
}
public static boolean validate(List<ClassInfo> classInfos, int total){
boolean pass = true;
for(ClassInfo info:classInfos){
if(info.getNumber()!=total){
pass = false;
}
}
return pass;
}
}
運作結果:
類Index:并行度:8
類Index:活動線程數:0
類Index:任務數:0
類Index:41
所有班級更新完畢!