天天看點

Fork/Join 架構的基礎以及示例

Fork/Join 架構主要由Fork 和 Join 兩個操作構成:

        1)Fork操作主要用于對任務和資料進行劃分,一般是将一個大問題劃分為幾個小問題,以期望能夠友善地對簡化後地問題進行處理

        2)Join操作主要作用于對各個部分地運作結果進行合并,相當于程式中的一個障栅。

        這兩個操作與MapReduce中的Map/Reduce操作類似,簡而言之,Fork/Join架構是一種設計模式,是一種思想,在其他程式設計語言中也同樣适用。

    Fork/Join 架構實作了任務的定義和任務處理功能的分離,使程式員在現實并行程式設計的同時,可以更好的集中精力實作業務邏輯相關的内容,這大大降低了并行程式設計的難度。

    在Java中,Fork/Join 架構實作了ExecutorService接口,與任何以一個ExecutorService接口的功能相同的是:Fork/Join會自動地将任務配置設定給線程池中地線程,并負責線程管理地相關工作。

    與ExecutorService不同地是:Fork/Join使用了工作竊取算法,已經完成自身任務線程可以從其他繁忙地線程中竊取任務來執行,進而保持了線程執行過程中地負載均衡。

    Fork/Join 主要用途:Fork/Join架構通常被用于解決那些可以遞歸地分解為更小地任務地問題。該架構與負載均衡、分治(Divide and Conquer)方法及工作竊取算法(Work-steaing Algorithm)有密切地聯系。

    Fork/Join地負債均衡

        負載不均衡會導緻一些線程空閑,進而造成資源浪費。常見了負載不均衡地原因是配置設定給某個線程地任務過多,其他線程都執行完畢,但該線程仍繼續執行。

        負載均衡是指在CPU處理器核上運作地線程全部以相同地繁忙程度來工作,理想情況下,所有地線程都都成同樣大小的工作量。

        負載均衡有利于加快程式的執行,減少資源浪費,是以我們在設計并行程式時,也應該保證負載均衡,但有些情況的負債均衡任務是由作業系統或JVM虛拟機來完成的。

    Frok/Join的分治方法

        分治方法是簡化問題的一種處理方法,該方法的基本思想是将一個複雜的任務分解為若幹個小任務,然後分别解決這些小任務。

        使用分治方法的步驟:

            1)分解。将複雜的任務分成小任務,直到把任務分解到容易解決為止。

            2)解決。對已完成分解的小人物進行求解。

            3)j結果合并。如果每個任務都有傳回結果,則需要對結果進行合并。

        分治法可以解決合并排序,二分搜尋和矩陣相乘問題。

        在使用Fork/Join架構的過程中,首先對ForkJoinTask進行分解,任務分解的數目可以根據問題的特征以及CPU可同時處理的線程數進行設定,然後将分解的任務交給ForkJoinPool處理,最後對處理結果進行收集。

    工作竊取算法:

        工作竊取算法是提高程式性能,保證負債均衡的一種算法。該算法的基本思想是當程式中某些線程做完自身的工作後,去檢視其他線程是否還有未處理完成的工作,如果有,則竊取一部分工作來執行,進而幫助那些未完成的

        程式盡快完成他們的工作。

        程式在使用該算法後,一方面,可以保證線程始終處于一種忙碌狀态,提高資源的使用率;另一方面,也可以減少其他繁忙線程的處理時間,有助于提高程式的性能。

        從本質上來說,工作竊取算法是一種任務排程方法,盡量使每個線程都能處于忙碌狀态。生活中"竊取"是一個編譯詞,但在這裡卻有很積極的意義。

        Fork/Join架構與Executor架構的不同之處在于Fork/Join架構采用了工作竊取算法,可以說該算法是Fork/Join架構的核心。在ForKJoinPool中,有一些線程執行任務較快,在做完自己的工作之後,這些線程将嘗試發現那些未被執行的任務,

        如果找到,則執行這些任務。

        Fork/Join架構采用雙端隊列(Deque)作為任務的存儲結構,該隊列支援後進先出的資料pop和push操作,并且支援先進先出的take操作。由某一工作線程建立的子任務仍然會被加入到線程的隊列中,一般采用push操作,工作線程從自己的

        雙端隊列中取出執行任務,一般采用pop操作,當工作線程需要從其他線程的工作隊列中竊取任務時,一般采用take操作。

        線程池中的線程每次是從隊列的頭部取出任務來執行,當使用使用fork操作産生新任務時,會把新的任務加入到隊列的頭部,而不像其他線程池加入到尾部,這樣可以保證fork出來的新的任務可以盡快得到執行。

        當某個線程執行完自己的任務,而沒有其他任務可處理時,就從隊列尾部竊取一個任務執行。

    Fork/Join 架構的程式設計技術

        java中Fok/Join架構比較适于解決那些具有遞歸性質,可以進行任務分解的程式。

        Fork/Join架構需要對任務進行分解和合并操作,在分解前,首先檢視問題的規模是否超過了預設門檻值(Threshold Value),在任務規模不大的情況下,采用串行的解決方式,由于該方式省去了分解和合并的操作有時效果會更好。

        在使用Fork/Join架構時,門檻值通常時人為地進行設定,當問題地規模小于門檻值時,說明沒必要采用并行地解決方式,是以更傾向于采用串行執行方式或者采用其他更優化地算法解決;

        而當問題地規模大于門檻值時,采用Fork/Join架構求解。

        使用Fork/Join架構程式設計模式:如下所示:

        if(問題模式<門檻值){

            //使用串行模式解決或者其他模式解決

        }else{

            //将任務Task盡心分解,分解為若幹個小任務 Task1,Task2.......

            //将任務Task1、Task2送出到線程池執行

            //如果任務有返滬結果,收集結果        

        }

    ForkJoinPool 類

        ForkJoinPool類是Fork/Join架構地核心,也是Fork/Join架構執行地入口點,它是實作了接口Executor Service。ForkJoinPool類地任務是負責管理線程,并提供線程執行狀态和任務處理地相關資訊。

        ForkJoinPool類差別于其他ExecutorService的地方在于該類實作了工作竊取算法,線上程池中的線程總是嘗試發現其他可以運作的任務,可以通過相關的方法擷取工作竊取的執行情況。

    ForkJoinPool的建立

        ForkJoinPool類從AbstractExecutorService類繼承,主要用于處理ForkJoinTask中的任務。

        以下是ForkJoinPool類構造方法的形式:

            1)ForkJoinPool(); //該構造方法将預設生成一個線程池,線程池中可以同時運作的線程數和CPU能夠同時運作的最大線程數(可以通過Runtime.getRuntim.availableProcessors()擷取)相同。

            2)ForkJoinPool(int parallelism);//使用者可以指定線程池中線程的數目。

            //parallelism 指明并行運作的線程數

            //factory 是線程工廠,用于建立新線程

            //handler 使用者處理内部出現的異常

            //asyncMode 用于指定ForkJoinPool的工作模式,當為true時,表示工作本地的先進先出模式。 預設為false。對于某些應用來講,為true比預設的更加合适。

            3)ForJoinPool(int parallelism,ForkJoinWorkThreadFactory factory,Thread.UncaughtExceptionHandler handler,boolean asyncMode);//        

        demo 示例:

            建立一個ForkJoinPool

            ForkJoinPool pool = new ForkJoinPool();

            ForkJoinPool pool = new ForkJoinPool(8);

    ForkJoinPool的使用:

        ForkJoinPool的使用大緻可以分為兩種,一種時通過invoke、execute和submit執行的任務,另一種時在程式執行過程中通過fork操作來執行的任務。

        ForkJoinPool常用的方法:

            1)invoke(ForkJoinTask<T> task);//是一個同步調用方法,用于處理給定任務,并在處理完畢後傳回處理的結果,傳回結果的類型由T指定。

            2)invokeAll(Collection<? extends Callable<T> >tasks);//是一個同步調用方法,可以将若幹個任務組成的一個集合交給ForkJoinPool執行,任務在繼續運作之前會等待子任務的結束,該方法

            傳回任務的結果,結果類型由T指定。ForkJoinTask類的該方法是Executor和Fork/Join架構的主要不同之處。

            3)execute(Runnable task);//可以把一個Runnable線程所有代表的任務收納櫃ForkJoinPool中,需要指出的是:ForkJoinPool不會對Runnable對象使用工作竊取算法,該算法隻會被應用到ForkJoinTask對象中。

            同樣,ForkJoinPool不會對Callable對象使用工作竊取算法。

            4)submit(ForkJoinTask<T> task);//用于把一個任務送出給ForkJoinPool執行,傳回ForkJoinTask<T>的結果。

        以上三類執行任務的方法,具體使用如下:

            ------------------------------------------------------------------------------------------------------------------------

                         |    在外部對Fork/Join操作的調用    |    在Fork/Join架構範圍内使用

            ------------------------------------------------------------------------------------------------------------------------

            異步執行                 |     execute(ForkJoinTask)    |      ForkJoinTask.fork()

            ------------------------------------------------------------------------------------------------------------------------

            同步執行(等待子任務完成)     |     invoke(ForkJoinTask)    |      ForkJoinTask.invoke()

            ------------------------------------------------------------------------------------------------------------------------

            執行并獲得結果         |     submit(ForkJoinTask)    |      ForkJoinTask.fork()

            ------------------------------------------------------------------------------------------------------------------------

    Fork/Join架構中的任務

        ForkJoinTask類是所有的在Fork/Join架構中執行任務的基類,提供了一系列機制來實作Fork和Join操作,該類由兩個子類,分别是RecursiveAction和RecursiveTask,從RecursiveAction類繼承類的子類方法一般沒有傳回值,

        從RecursiveTask類繼承的子類則由傳回值。

        public abstract class ForkJoinTask<?> extends Object implements Future<V>,Serializable

        ForkJoinTask 類實作了接口Serializable 和 Future<V>,是以一般在子類加入serialVersionUID變量定義。 如:

        private static final long SerialVersionUID = 1L;

    Fork/Join 任務的建立

        在建立任務時,最好不要從ForkJoinTask類直接繼承,而是從該類的子類RecursiveAction或RecursiveTask繼承。

        1)從RecursiveAction 繼承承建任務

            從RecursiveAction類繼承的子類方法一般沒有傳回值。繼承後的新類需要重寫該類的computer()方法。

            computer()方法的形式如下:

            @Override

            public void computer(){

                //方法體

            }        

        isDone()方法,用于判斷任務是否完成。

        cancel(booleanmayInterrupteIfRunning)用于取消一個任務的執行。

        當任務送出到ForkJoinPool後獲得運作的機會。

        demo 示例:    

        使用Fork/Join 架構對班級的人數進行更新。當需要更新班級的人超過10時,更新任務進行細分。

        分析:Fork/Join 架構的門檻 10

        //班級的對象類

        public class ClassInfo{

            private String name;

            private int number;

            public ClassInfo( String name,int number){

                this.name = name;

                this.number = number;

            }

            public String getName(){

                return name;

            }

            public int getNumber(){

                return number;

            }

            public void setNumber(int number){

                this.number = number;

            }    

        }

        /更新操作的任務類

        public class UpdateTask extends RecursiveAction{

            private static final long serialVersionUID =1L;

            private List<ClassInfo> classInfos;

            private int start;

            private int end;

            private int increment;

            private int nthreads;

            private int threshold;

            public UpdateTask(List<ClassInfo> classInfos,int start,int end,int increment,int nthreads,int threshold){

                this.classInfos = classInfos;

                this.start = start;

                this.end = end;

                this.increment = increment;

                this.nthreads = nthreads;

                this.threshold = threshold;

            }

            @Override

            public void compute(){

                //門檻 串行

                if(end-start<=threshold | threshold ==1){

                    updateSequential();

                }else{

                    UpdateTask []  tasks = new UpdateTask[nthreads];

                    int []  data = new int[nthreads+1];

                    int segment = (end -start+nthreads-1)/nthreads;

                    for(int i=0;i<=nthreads;i++){

                        data[i] = start+segment*i;

                        if(data[i]>end){

                            data[i] = end;

                        }

                    }

                    int mid = (end=start)/2;

                    for(int i=0;i<nthreads;i++){

                        tasks[i] = new UpdateTask(classInfos,data[i],data[i+1],increment,nthreads,threshold);

                    }

                    invokeAll(tasks);

                }

            }

            public void updateSequential(){

                for(int i = start;i<end;i++){

                    ClassInfo  classInfo = classInfos.get(i);

                    classInfo.setNumber(classInfo.getNumber()+increment);

                }

            }

        }

        //啟動測試類

        public class Index{

            public static void main(String [] args ){

                int nthreads = Runtime.getRuntime().availableProcessors();

                int threshold = 10;

                int increment = 5;

                int baseNum = 50;

                int size = 10000;

                List<ClassInfo> classInfos = new ArrayList<ClassInfo>();

                for(int i=0;i<size;i++){

                    ClassInfo classInfo = new ClassInfo("班級"+i,baseNum);

                    classInfos.add(classInfo);

                }

                ForkJoinPool pool = new ForkJoinPool();

                UpdateTask updateTask = new UpdateTask(classInfos,0,classInfos.size(),increment,nthreads,threshold);

                pool.execute(updateTask);

                do{

                    System.out.printf("類Index:并行度:%d\n",pool.getParallelism());

                    System.out.printf("類Index:活動線程數:%d\n",pool.getActiveThreadCount());

                    System.out.printf("類Index:任務數:%d\n",pool.getQueuedTaskCount());

                    System.out.printf("類Index:%d\n",pool.getStealCount());

                    try{

                        TimeUnit.SECONDS.sleep(1);

                    }catch(InterruptedException e){

                        e.printStackTrace();

                    }

                }while(!updateTask.isDone());

                if(validate(classInfos,baseNum+increment)){

                    System.out.println("所有班級更新完畢!");

                }else{

                    System.out.println("Something wrong happend.");

                }

            }

            public static boolean validate(List<ClassInfo> classInfos, int total){

                boolean pass = true;

                for(ClassInfo info:classInfos){

                    if(info.getNumber()!=total){

                        pass = false;

                    }

                }

                return pass;

            }    

        }

運作結果:

類Index:并行度:8

類Index:活動線程數:0

類Index:任務數:0

類Index:41

所有班級更新完畢!

繼續閱讀