天天看點

【Java技術指南】「并發程式設計專題」Fork/Join架構基本使用和原理探究(原理及源碼篇)

ForkJoin線程池架構回顧

  • ForkJoin架構其實就是一個線程池ExecutorService的實作,通過工作竊取(work-stealing)算法,擷取其他線程中未完成的任務來執行。
  • 可以充分利用機器的多處理器優勢,利用空閑的線程去并行快速完成一個可拆分為小任務的大任務,類似于分治算法。
  • ForkJoin的目标,就是利用所有可用的處理能力來提高程式的響應和性能。本文将介紹ForkJoin架構,源碼剖析。

ForkJoinPool的類架構圖

【Java技術指南】「并發程式設計專題」Fork/Join架構基本使用和原理探究(原理及源碼篇)

ForkJoinPool核心類實作

  • ForkJoin架構的核心是ForkJoinPool類,基于AbstractExecutorService擴充。
  • ForkJoinPool中維護了一個隊列數組WorkQueue[],每個WorkQueue維護一個ForkJoinTask數組和目前工作線程。
  • ForkJoinPool實作了工作竊取(work-stealing)算法并執行ForkJoinTask。
ForkJoinPool,所有線程和WorkQueue共享,用于工作竊取、任務狀态和工作狀态同步。
【Java技術指南】「并發程式設計專題」Fork/Join架構基本使用和原理探究(原理及源碼篇)

核心屬性介紹

  • ADD_WORKER: 100000000000000000000000000000000000000000000000 -> 1000 0000 0000 0000,用來配合ctl在控制線程數量時使用
  • ctl: 控制ForkJoinPool建立線程數量,(ctl & ADD_WORKER) != 0L 時建立線程,也就是當ctl的第16位不為0時,可以繼續建立線程
  • defaultForkJoinWorkerThreadFactory: 預設線程工廠,預設實作是DefaultForkJoinWorkerThreadFactory
  • runState: 全局鎖控制,全局運作狀态
  • workQueues: 工作隊列數組WorkQueue[]
  • config: 記錄并行數量和ForkJoinPool的模式(異步或同步)

ForkJoinTask

  • status: 任務的狀态,對其他工作線程和pool可見,運作正常則status為負數,異常情況為正數

WorkQueue

  • qlock: 并發控制,put任務時的鎖控制
  • array: 任務數組ForkJoinTask<?>[]
  • pool: ForkJoinPool,所有線程和WorkQueue共享,用于工作竊取、任務狀态和工作狀态同步
  • base: array數組中取任務的下标
  • top: array數組中放置任務的下标
  • owner: 所屬線程,ForkJoin架構中,隻有一個WorkQueue是沒有owner的,其他的均有具體線程owner。
  • WorkQueue 内部就是ForkJoinTask
workQueue: 目前線程的任務隊列,與WorkQueue的owner呼應
ForkJoinTask是能夠在ForkJoinPool中執行的任務抽象類,父類是Future,具體實作類有很多,這裡主要關注RecursiveAction和RecursiveTask。
  • RecursiveAction是沒有傳回結果的任務
  • RecursiveTask是需要傳回結果的任務
【Java技術指南】「并發程式設計專題」Fork/Join架構基本使用和原理探究(原理及源碼篇)
隻需要實作其compute()方法,在compute()中做最小任務控制,任務分解(fork)和結果合并(join)。
【Java技術指南】「并發程式設計專題」Fork/Join架構基本使用和原理探究(原理及源碼篇)

ForkJoinWorkerThread

ForkJoinPool中執行的預設線程是ForkJoinWorkerThread,由預設工廠産生,可以自己重寫要實作的工作線程。同時會将ForkJoinPool引用放在每個工作線程中,供工作竊取時使用。
【Java技術指南】「并發程式設計專題」Fork/Join架構基本使用和原理探究(原理及源碼篇)
  • ForkJoinPool作為最核心的元件,維護了所有的任務隊列WorkQueues,workQueues維護着所有線程池的工作線程,工作竊取算法就是在這裡進行的。
  • 每一個WorkQueue對象中使用pool保留對ForkJoinPool的引用,用來擷取其WorkQueues來竊取其他工作線程的任務來執行。
  • 同時WorkQueue對象中的owner是ForkJoinWorkerThread工作線程,綁定ForkJoinWorkerThread和WorkQueue的一對一關系,每個工作線程會優先完成自己隊列的任務,當自己隊列中的任務為空時,才會通過工作竊取算法從其他任務隊列中擷取任務。
  • WorkQueue中的ForkJoinTask<?>[] array,是每一個具體的任務,插入array中的第一個任務是最大的任務。

源碼分析

ForkJoinPool構造函數

ForkJoinPool有四個構造函數,其中參數最全的那個構造函數如下所示:
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode)           
  • parallelism:可并行級别,Fork/Join架構将依據這個并行級别的設定,決定架構内并行執行的線程數量。并行的每一個任務都會有一個線程進行處理,但是千萬不要将這個屬性了解成Fork/Join架構中最多存在的線程數量,也不要将這個屬性和ThreadPoolExecutor線程池中的corePoolSize、maximumPoolSize屬性進行比較,因為ForkJoinPool的組織結構和工作方式與後者完全不一樣。
  • factory:當Fork/Join架構建立一個新的線程時,同樣會用到線程建立工廠。隻不過這個線程工廠不再需要實作ThreadFactory接口,而是需要實作ForkJoinWorkerThreadFactory接口。
    • 後者是一個函數式接口,隻需要實作一個名叫newThread的方法。
    • 在Fork/Join架構中有一個預設的ForkJoinWorkerThreadFactory接口實作:DefaultForkJoinWorkerThreadFactory。
  • handler:異常捕獲處理器。當執行的任務中出現異常,并從任務中被抛出時,就會被handler捕獲。
  • asyncMode:這個參數也非常重要,從字面意思來看是指的異步模式,它并不是說Fork/Join架構是采用同步模式還是采用異步模式工作。
    • Fork/Join架構中為每一個獨立工作的線程準備了對應的待執行任務隊列,這個任務隊列是使用數組進行組合的雙向隊列。即是說存在于隊列中的待執行任務,即可以使用先進先出的工作模式,也可以使用後進先出的工作模式。
當asyncMode設定為true的時候,隊列采用先進先出方式工作;反之則是采用後進先出的方式工作,該值預設為false
......
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
......           
  • ForkJoinPool還有另外兩個構造函數,一個構造函數隻帶有parallelism參數,既是可以設定Fork/Join架構的最大并行任務數量;
    • 另一個構造函數則不帶有任何參數,對于最大并行任務數量也隻是一個預設值——目前作業系統可以使用的CPU核心數量(Runtime.getRuntime().availableProcessors())。
    • 實際上ForkJoinPool還有一個私有的、原生構造函數,之上提到的三個構造函數都是對這個私有的、原生構造函數的調用。
private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }           
使用案例
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());           
先看ForkJoinPool的建立過程,這個比較簡單,建立了一個ForkJoinPool對象,帶有預設ForkJoinWorkerThreadFactory,并行數跟機器核數一樣,同步模式。

送出任務

forkJoinPool.invoke(new CountRecursiveTask(1, 100));會先執行到ForkJoinPool#externalPush中,此時forkJoinPool.workQueues并沒有完成初始化工作,是以執行到ForkJoinPool#externalSubmit。

externalSubmit
【Java技術指南】「并發程式設計專題」Fork/Join架構基本使用和原理探究(原理及源碼篇)

這裡是一個for無限循環實作,跳出邏輯全部在内部控制,主要結合runState來控制。

  1. 建ForkJoinPool的WorkQueue[]變量workQueues,長度為大于等于2倍并行數量的且是2的n次幂的數。這裡對傳入的并行數量使用了位運算,來計算出workQueues的長度。
  2. 建立一個WorkQueue變量q,q.base=q.top=4096,q的owner為null,無工作線程,放入workQueues數組中
  3. 建立q.array對象,長度8192,将ForkJoinTask也就是代碼案例中的CountRecursiveTask放入q.array,pool為傳入的ForkJoinPool,并将q.top加1,完成後q.base=4096,q.top=4097。然後執行ForkJoinPool#signalWork方法。(base下标表示用來取資料的,top下标表示用來放資料的,當base小于top時,說明有資料可以取)
externalSubmit主要完成3個小步驟工作,每個步驟都使用了鎖的機制來處理并發事件,既有對runState使用ForkJoinPool的全局鎖,也有對WorkQueue使用局部鎖。
signalWork
signalWork方法的簽名是:void signalWork(WorkQueue[] ws, WorkQueue q)。ws為ForkJoinPool中的workQueues,q為externalSubmit方法中建立的用于存放ForkJoinTask的WorkQueue.
  • signalWork中會根據ctl的值判斷是否需要建立建立工作線程,目前暫無,是以走到tryAddWorker(),并在createWorker()來建立,使用預設工廠方法ForkJoinWorkerThread#ForkJoinWorkerThread(ForkJoinPool)來建立一個ForkJoinWorkerThread,ForkJoinPool為前面建立的pool。
  • 并建立一個WorkQueue其owner為新建立的工作線程,其array為空,被命名為ForkJoinPool-1-worker-1,且将其存放在pool.workQueues數組中。
  • 建立完線程之後,工作線程start()開始工作。
  • 這樣就建立了兩個WorkQueue存放在pool.workQueues,其中一個WorkQueue儲存了第一個大的ForkJoinTask,owner為null,其base=4096,top=4097;第二個WorkQueue的owner為建立的工作線程,array為空,暫時無資料,base=4096,top=4096。
【Java技術指南】「并發程式設計專題」Fork/Join架構基本使用和原理探究(原理及源碼篇)
ForkJoinWorkerThread#run
  • 執行ForkJoinWorkerThread線程ForkJoinPool-1-worker-1,執行點來到ForkJoinWorkerThread#run,注意這裡是在ForkJoinWorkerThread中,此時的workQueue.array還是空的,pool為文中唯一的一個,是各個線程會共享的。
  • run方法中首先是一個判斷 if (workQueue.array == null) { // only run once,這也驗證了我們前面的分析,目前線程的workQueue.array是空的。每個建立的線程,擁有的workQueue.array是沒有任務的。那麼它要執行的任務從哪裡來?
  • runWorker()方法中會執行一個死循環,去scan掃描是否有任務可以執行。全文的講到的工作竊取work-stealing算法,就在java.util.concurrent.ForkJoinPool#scan。當有了上圖的模型概念時,這個方法的實作看過就會覺得其實非常簡單。
    【Java技術指南】「并發程式設計專題」Fork/Join架構基本使用和原理探究(原理及源碼篇)
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
    int b, n; long c;
    //如果pool.workQueues即ws的k下标元素不為空
    if ((q = ws[k]) != null) {
        //如果base<top且array不為空,則說明有元素。為什麼還需要array不為空才說明有元素?
        //從下面可以知道由于擷取元素後才會設定base=base+1,是以可能出現上一個線程拿到元素了但是沒有及時更新base
        if ((n = (b = q.base) - q.top) < 0 &&
            (a = q.array) != null) {      // non-empty
            long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
            //這裡使用getObjectVolatile去擷取目前WorkQueue的元素
            //volatile是保證線程可見性的,也就是上一個線程可能已經拿掉了,可能已經将這個任務置為空了。
            if ((t = ((ForkJoinTask<?>)
                      U.getObjectVolatile(a, i))) != null &&
                q.base == b) {
                if (ss >= 0) {
                        //拿到任務之後,将array中的任務用CAS的方式置為null,并将base加1
                    if (U.compareAndSwapObject(a, i, t, null)) {
                        q.base = b + 1;
                        if (n < -1)       // signal others
                            signalWork(ws, q);
                        return t;
                    }
                }
                else if (oldSum == 0 &&   // try to activate
                         w.scanState < 0)
                    tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
            }
            if (ss < 0)                   // refresh
                ss = w.scanState;
            r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
            origin = k = r & m;           // move and rescan
            oldSum = checkSum = 0;
            continue;
        }
        checkSum += b;
    }           
CountRecursiveTask#compute
重寫compute方法一般需要遵循這個規則來寫
if(任務足夠小){
  直接執行任務;
  如果有結果,return結果;
}else{
  拆分為2個子任務;
  分别執行子任務的fork方法;
  執行子任務的join方法;
  如果有結果,return合并結果;
}           
public final ForkJoinTask<V> fork() {
        Thread t;
        //如果是工作線程,則往自己線程中的workQuerue中添加子任務;否則走首次添加邏輯
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }           
ForkJoinPool.WorkQueue#push方法會将目前子任務存放到array中,并調用ForkJoinPool#signalWork添加線程或等待其他線程去竊取任務執行。過程又回到前面講到的signalWork流程。
ForkJoinTask#externalAwaitDone
  • 主線程在把任務放置在第一個WorkQueue的array之後,啟動工作線程就退出了。如果使用的是異步的方式,則使用Future的方式來擷取結果,即送出的ForkJoinTask,通過isDone(),get()方法判斷和得到結果。
  • 異步的方式跟同步方式在防止任務的過程是一樣的,隻是主線程可以任意時刻再通過ForkJoinTask去跟蹤結果。本案例用的是同步的寫法,是以主線程最後在ForkJoinTask#externalAwaitDone等待任務完成。
  • 這裡主線程會執行Object#wait(long),使用的是Object類中的wait,在目前ForkJoinTask等待,直到被notify。而notify這個動作會在ForkJoinTask#setCompletion中進行,這裡使用的是notifyAll,因為需要通知的有主線程和工作線程,他們都共同享用這個對象,需要被喚起。
ForkJoinTask#join
來看left.join() + right.join(),在将left和right的Task放置在目前工作線程的workQueue之後,執行join()方法,join()方法最終會在ForkJoinPool.WorkQueue#tryRemoveAndExec中将剛放入的left取出,将對應workQueue中array的left任務置為空,然後執行left任務。然後執行到left的compute方法。對于right任務也是一樣,繼續子任務的fork和join工作,如此循環往複。
public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }           

當工作線程執行結束後,會執行getRawResult,拿到結果。

Work-Steal算法

相比其他線程池實作,這個是ForkJoin架構中最大的亮點。當空閑線程在自己的WorkQueue沒有任務可做的時候,會去周遊其他的WorkQueue,并進行任務竊取和執行,提高程式響應和性能。
取2的n次幂作為長度的實作
//代碼位于java.util.concurrent.ForkJoinPool#externalSubmit
    if ((rs & STARTED) == 0) {
        U.compareAndSwapObject(this, STEALCOUNTER, null,
                               new AtomicLong());
        // create workQueues array with size a power of two
        int p = config & SMASK; // ensure at least 2 slots
        int n = (p > 1) ? p - 1 : 1;
        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
        workQueues = new WorkQueue[n];
        ns = STARTED;
    }           
這裡的p其實就是設定的并行線程數,在為ForkJoinPool建立WorkQueue[]數組時,會對傳入的p進行一系列位運算,最終得到一個大于等于2p的2的n次幂的數組長度
記憶體屏障
//代碼位于java.util.concurrent.ForkJoinPool#externalSubmit
    if ((a != null && a.length > s + 1 - q.base) ||
        (a = q.growArray()) != null) {
        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
        //通過Unsafe進行記憶體值的設定,高效,且屏蔽了處理器和Java編譯器的指令亂序問題
        U.putOrderedObject(a, j, task);
        U.putOrderedInt(q, QTOP, s + 1);
        submitted = true;
    }           
//代碼位于java.util.concurrent.ForkJoinPool#externalSubmit
    //如果qlock為0,說明目前沒有其他線程操作改WorkQueue
    //嘗試CAS操作,修改qlock為1,對這個WorkQueue進行加鎖
    if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a = q.array;
        int s = q.top;
        boolean submitted = false; // initial submission or resizing
        try {                      // locked version of push
            if ((a != null && a.length > s + 1 - q.base) ||
                (a = q.growArray()) != null) {
                int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                submitted = true;
            }
        } finally {
              //finally将qlock置為0,進行鎖的釋放,其他線程可以使用
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        if (submitted) {
            signalWork(ws, q);
            return;
        }
    }