天天看點

【JDK】:Fork-Join架構概述工作竊取算法核心步驟部分源碼ForkJoinTask的join()參考和推薦閱讀

概述

前面的文章【JDK】:Executor架構提到Executor架構,而Fork-Join架構也與Executor架構相關的多線程并行運算的架構。

Fork-Join架構有自己的适用範圍。如果一個應用能被分解成多個子任務,并且組合多個子任務的結果就能夠獲得最終的答案,那麼這個應用就适合用 Fork-Join架構模式來解決。下圖給出了一個 Fork-Join架構模式的示意圖,位于圖上部的 Task 依賴于位于其下的 Task 的執行,隻有當所有的子任務都完成之後,調用者才能獲得 Task 0 的傳回結果。

【JDK】:Fork-Join架構概述工作竊取算法核心步驟部分源碼ForkJoinTask的join()參考和推薦閱讀

Fork-Join架構能夠解決很多種類的并行問題。軟體開發人員隻需要關注任務的劃分和中間結果的組合就能充分利用并行平台的優良性能。其他和并行相關的諸多難于處理的問題,例如負載平衡、同步等,都可以由架構采用統一的方式解決。

Fork-Join架構是ExecutorService接口的一種具體實作,目的是為了更好地利用多處理器。它是為那些能夠被遞歸地拆解成子任務的工作類型量身設計的。類似于ExecutorService接口的其他實作,Fork-Join架構會将任務分發給線程池中的工作線程。

Fork-Join架構的主要類圖和繼承關系如下:

【JDK】:Fork-Join架構概述工作竊取算法核心步驟部分源碼ForkJoinTask的join()參考和推薦閱讀

Fork-Join架構的核心是ForkJoinPool類,它是對AbstractExecutorService類的擴充。ForkJoinPool實作了工作竊取算法,并可以執行ForkJoinTask任務。

工作竊取算法

Fork-Join架構通過一種稱作工作竊取(work stealing) 的技術減少了工作隊列的争用情況。每個工作線程都有自己的工作隊列,這是使用雙端隊列(或者叫做 deque)來實作的(Java 6 在類庫中添加了幾種 deque 實作,包括 ArrayDeque 和 LinkedBlockingDeque)。當一個任務劃分一個新線程時,它将自己推到 deque 的頭部。當一個任務執行與另一個未完成任務的合并操作時,它會将另一個任務推到隊列頭部并執行,而不會休眠以等待另一任務完成(像 Thread.join() 的操作一樣)。當線程的任務隊列為空,它将嘗試從另一個線程的 deque 的尾部 竊取另一個任務。

【JDK】:Fork-Join架構概述工作竊取算法核心步驟部分源碼ForkJoinTask的join()參考和推薦閱讀

可以使用标準隊列實作工作竊取,但是與标準隊列相比,deque 具有兩方面的優勢:減少争用和竊取。因為隻有工作線程會通路自身的 deque 的頭部,deque 頭部永遠不會發生争用;因為隻有當一個線程空閑時才會通路 deque 的尾部,是以也很少存線上程的 deque 尾部的争用(在 fork-join 架構中結合 deque 實作會使這些通路模式進一步減少協調成本)。跟傳統的基于線程池的方法相比,減少争用會大大降低同步成本。此外,這種方法暗含的後進先出(last-in-first-out,LIFO)任務排隊機制意味着最大的任務排在隊列的尾部,當另一個線程需要竊取任務時,它将得到一個能夠分解成多個小任務的任務,進而避免了在未來竊取任務。是以,工作竊取實作了合理的負載平衡,無需進行協調并且将同步成本降到了最小。

核心步驟

Fork-Join架構的核心主要包括分割任務與合并任務結果兩個步驟。

第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,是以還需要不停的分割,直到分割出的子任務足夠小。

第二步執行任務并合并結果。分割的子任務分别放在雙端隊列裡,然後幾個啟動線程分别從雙端隊列裡擷取任務執行。子任務執行完的結果都統一放在一個隊列裡,啟動一個線程從隊列裡拿資料,然後合并這些資料。

Fork-Join使用兩個類來完成以上兩件事情(參考上面的類圖):

  • ForkJoinTask:我們要使用ForkJoin架構,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制,通常情況下我們不需要直接繼承ForkJoinTask類,而隻需要繼承它的子類,Fork/Join架構提供了以下兩個子類:
    • RecursiveAction:用于沒有傳回結果的任務。
    • RecursiveTask :用于有傳回結果的任務。
  • ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來驅動執行,任務分割出的子任務會添加到目前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列裡暫時沒有任務時,它會随機從其他工作線程的隊列的尾部擷取一個任務。

部分源碼

ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責存放程式送出給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。

ForkJoinTask的fork()

當我們調用ForkJoinTask的fork方法時,程式會調用ForkJoinWorkerThread的pushTask方法異步的執行這個任務,然後立即傳回結果。代碼如下:

public final ForkJoinTask fork() {          
    ((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);         
    return this; 
} 
           

pushTask方法把目前任務存放在ForkJoinTask 數組queue裡。然後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工作線程來執行任務。代碼如下:

final void pushTask(ForkJoinTask t) {
        ForkJoinTask[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - )) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + ;         // or use putOrderedInt
            if ((s -= queueBase) <= )
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }
           

ForkJoinTask的join()

Join方法的主要作用是阻塞目前線程并等待擷取結果。代碼如下:

public final V join() {
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
}

private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
                    UNSAFE.throwException(ex);
                return getRawResult();
        }
           

首先,它調用了doJoin()方法,通過doJoin()方法得到目前任務的狀态來判斷傳回什麼結果,任務狀态有四種:

已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出現異常(EXCEPTIONAL)。

  • 如果任務狀态是已完成,則直接傳回任務結果。
  • 如果任務狀态是被取消,則直接抛出CancellationException。
  • 如果任務狀态是抛出異常,則直接抛出對應的異常。

doJoin()方法的實作代碼:

private int doJoin() {
        Thread t; 
        ForkJoinWorkerThread w; 
        int s; 
        boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < )
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }
           

在doJoin()方法裡,首先通過檢視任務的狀态,看任務是否已經執行完了,如果執行完了,則直接傳回任務狀态,如果沒有執行完,則從任務數組裡取出任務并執行。如果任務順利執行完成了,則設定任務狀态為NORMAL,如果出現異常,則紀錄異常,并将任務狀态設定為EXCEPTIONAL。

參考和推薦閱讀

以下的文章中提到了如何使用Fork-Join的算例,例如擷取最大值、數字累加、Fibonacci數列、合并排序等等。

  1. 方騰飛,聊聊并發(八)——Fork/Join架構介紹
  2. JDK 7 中的 Fork/Join 模式
  3. Java 理論與實踐: 應用 fork-join 架構