天天看點

java fork-join架構應用和分析問題來源fork-join pool的引入示例應用總結參考材料

問題來源

    記得很早以前自己學習算法的時候,聽說過一種divide and conquer的政策,從某種角度來說,它和遞歸是有着很緊密的聯系。比如說我們經常想到的一些排序的算法像快速排序、歸并排序等,他們都是本質上将原有的問題集合拆分成兩個子問題,然後再針對這些子問題進行進一步的處理,直到子問題已經得到解決。在這些子問題解決後上面的部分再将這些問題合并起來就得到了我們想要的答案。這個問題主要是針對單線程的運作場景。在把并行的一些思想考慮進來的時候,我發現他們可以碰撞出美麗的火花。我們仔細再來看一下前面的這一類型的問題,他們的本質上是将一個問題劃分成多個子問題,然後再逐個的去解決子問題。在很多情況下,他們這些子問題是互不相幹的。也就是說,我們針對他們每個執行的子問題,可以讓他們采用獨立的線程來運作。這樣的話我們可以充分的發揮現在并行處理器的優勢。

    在進一步的讨論之前,我們先看一下divide and conquer政策的處理問題方式,如下圖:

java fork-join架構應用和分析問題來源fork-join pool的引入示例應用總結參考材料

    這裡,我們每一個小的task表示我們劃分出來的一個子問題。當一個問題處理完畢後,他們的結果可以傳回給原來調用他們的部分。如果我們用僞代碼來描述一下divide and conquer政策的算法,則其基本的形式應該如下:

// PSEUDOCODE
Result solve(Problem problem) { 
    if (problem.size < SEQUENTIAL_THRESHOLD)
        return solveSequentially(problem);
    else {
        Result left, right;
        left = solve(extractLeftHalf(problem));
        right = solve(extractRightHalf(problem));
        return combine(left, right);
    }
}
           

     結合我們具體的一些算法,如歸并排序或者快速排序,他們的實作代碼結構是不是和這個很像呢?實際上,我們期望程式能夠帶有某種并行性的效果,其執行的過程更應該如以下的過程:

// PSEUDOCODE
Result solve(Problem problem) { 
    if (problem.size < SEQUENTIAL_THRESHOLD)
        return solveSequentially(problem);
    else {
        Result left, right;
        INVOKE-IN-PARALLEL { 
            left = solve(extractLeftHalf(problem));
            right = solve(extractRightHalf(problem));
        }
        return combine(left, right);
    }
}
           

    這種我們所期望的理想方式,在java 7的fork-join pool裡已經得到了解決。

fork-join pool的引入

    在正式使用fork-join pool之前,我們可能會有點好奇。我們已經有了一些現有的線程池了,如ThreadPoolExecutor,在大部分的情況下他們已經能用的很好。我們引入fork-join架構的意義在哪裡呢?他有哪些優點呢?

    我們先針對fork-join pool本身要解決的問題本身來看。如果我們用傳統的threadpool方式來解決這些問題,該采取什麼樣的手段呢?我們假設一個問題就是一個執行的線程,當一個問題被拆分成兩個或者多個子問題的時候,我們需要啟動多個子線程去執行,在必要的情況下會疊代的依次啟動下去。這裡就産生了一些線程之間的以來,我們這個大的問題需要等待它的子問題線程傳回,是以我們需要某些機制來保證他們的同步。這樣,當我們手工來實作這個過程的時候會有些麻煩。我們預設使用的線程池是期望他們所有執行的任務都是不相關的,可以盡可能的并行執行。

    另外,fork-join pool還有一個特點就是work stealing。每個工作線程都有自己的工作隊列,這是使用deque來實作的。當一個任務劃分一個新線程時,它将自己推到 deque 的頭部。當一個任務執行與另一個未完成任務的合并操作時,它會将另一個任務推到隊列頭部并執行,而不會休眠以等待另一任務完成(像 Thread.join() 的操作一樣)。當線程的任務隊列為空,它将嘗試從另一個線程的 deque 的尾部 竊取另一個任務。如果我們用傳統的ThreadPoolExecutor則比較難用上work stealing的技術。關于work stealing的細節可以參考文章後面的參考材料。

   fork-join pool和ThreadPoolExecutor之間也是有很緊密的關系的,下圖是他們相關的一個類圖:

java fork-join架構應用和分析問題來源fork-join pool的引入示例應用總結參考材料

    我們可以看到,他們共同的繼承了AbstractExecutorService,在一定的程度上,他們是可以互相替換使用的。在圖中我們還可以看到,ForkjoinPool使用到了RecursiveAction和RecursiveTask。他們兩個中RecursiveAction應用于執行的任務不需要傳回結果的場景,而RecursiveTask應用于需要傳回執行結果的場景。這點類似于ThreadPoolExecutor使用Runnable和Callable的參數來分别表示不需要傳回值和需要傳回值的線程執行對象。

示例應用

    OK,前面花了很多時間讨論了fork/join pool的特點,這裡我們就來看幾個具體的應用示例。

求最大值

    假定我們有一組數字,我們需要求裡面的最大值。用我們傳統的方法來求的話,其代碼實作如下:

public int solveSequentially() {
        int max = Integer.MIN_VALUE;
        for (int i=start; i<end; i++) {
            int n = numbers[i];
            if (n > max)
                max = n;
        }
        return max;
    }
           

    這裡,我們假定numbers這個數組儲存着所有需要比較的數字。

    如果我們應用ForkJoinPool的方式,則其實作如下:

public class MaxWithFJ extends RecursiveAction {
    private final int threshold;
    private final SelectMaxProblem problem;
    public int result;

    public MaxWithFJ(SelectMaxProblem problem, int threshold) {
        this.problem = problem;
        this.threshold = threshold;
    }

    protected void compute() {
        if (problem.size < threshold)
            result = problem.solveSequentially();
        else {
            int midpoint = problem.size / 2;
            MaxWithFJ left = new MaxWithFJ(problem.subproblem(0, midpoint), threshold);
            MaxWithFJ right = new MaxWithFJ(problem.subproblem(midpoint + 
              1, problem.size), threshold);
            left.fork();
            right.fork();
            result = Math.max(left.join(), right.join());
        }
    }
}
           

     我們可以看到,如果當我們在将任務拆分成更小的任務時,我們可以通過ForkJoinTask的fork()方法讓子問題異步的執行。然後我們再使用join方法得到異步方法執行的結果。

計算檔案目錄大小

    我們再來看一個示例。這裡是我們假定要周遊一個檔案目錄。因為檔案的目錄它可以包含嵌套若幹層的目錄或者檔案。從某種角度來說它構成了一個樹形結構。 我們再周遊到每個檔案的時候,可以将檔案目錄作為一個子task來處理,這裡就可以形成一個完整的fork/join pool應用。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.List;
import java.util.ArrayList;
import java.io.File;

public class FileSize
{
    private final static ForkJoinPool forkJoinPool = new ForkJoinPool();

    private static class FileSizeFinder extends RecursiveTask<Long>
    {
        final File file;

        public FileSizeFinder(final File theFile)
        {
            file = theFile;
        }

        @Override
        public Long compute()
        {
            long size = 0;
            if(file.isFile())
            {
                size = file.length();
            }
            else
            {
                final File[] children = file.listFiles();
                if(children != null)
                {
                    List<ForkJoinTask<Long>> tasks =
                        new ArrayList<ForkJoinTask<Long>>();
                    for(final File child : children)
                    {
                        if(child.isFile())
                        {
                            size += child.length();
                        }
                        else
                        {
                            tasks.add(new FileSizeFinder(child));
                        }
                    }

                    for(final ForkJoinTask<Long> task : invokeAll(tasks))
                    {
                        size += task.join();
                    }
                }
            }

            return size;
        }
    }
}
           

    這裡代碼看起來比較長,最關鍵的部分在compute方法裡。我們用了一個ArrayList tasks來儲存所有出現目錄的情形。當周遊出來的元素是檔案時,我們直接取檔案的長度size += child.length();而當為目錄時則tasks.add(new FileSizeFinder(child));這樣當我們周遊某個目錄的時候,它下面一級的子目錄就全部被封裝到tasks裡了。然後我們再通過invokeAll(tasks)這個方法去并行的執行所有周遊子目錄的線程。

    調用這部分代碼的程式如下:

public static void main(String[] args)
    {
        final long start = System.nanoTime();
        final long total = forkJoinPool.invoke(
            new FileSizeFinder(new File(args[0])));
        final long end = System.nanoTime();
        System.out.println("Total Size: " + total);
        System.out.println("Time taken: " + (end - start)/1.0e9);
    }
           

    我們可以運作一下比較具體的執行結果。

總結

    從分治的算法思想到fork/join架構,這種并行性的的融入可以更加高效率的解決一大批的問題。和我們一些傳統的多線程應用方式如ThreadPoolExecutor比起來,它有一些自己的特點。一個典型的地方就是work-stealing,它的一個優點是在傳統的線程池應用裡,我們配置設定的每個線程執行的任務并不能夠保證他們執行時間或者任務量是同樣的多,這樣就可能出現有的線程完成的早,有的完成的晚。在這裡,一個先完成的線程可以從其他正在執行任務的線程那裡拿一些任務過來執行。我們可以說這是人家學習雷鋒好榜樣。這樣發揮主觀能動性的線程架構肯定辦起事來就效率高了。

參考材料

A Java Fork/Join framework

Programming concurrency on the jvm

http://www.javaworld.com/javaworld/jw-10-2011/111004-jtip-recursion-in-java-7.html?page=1

http://www.ibm.com/developerworks/cn/java/j-jtp11137.html 

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html