天天看點

Java多線程——FutureTask源碼解析

一個很常見的多線程案例是,我們安排主線程作為配置設定任務和彙總的一方,然後将計算工作切分為多個子任務,安排多個線程去計算,最後所有的計算結果由主線程進行彙總。比如,歸并排序,字元頻率的統計等等。

我們知道Runnable是不傳回計算結果的,如果想利用多線程的話,隻能存儲到一個執行個體的内部變量裡面進行互動,但存在一個問題,如何判斷是否已經計算完成了。用Thread.join是一個方案,但是我們隻能依次等待一個線程結束後處理一個線程,如果線程1恰好特别慢,則後續已經完成的線程不能被及時處理。我們希望能夠獲知線程的執行狀态,發現哪個線程處理完就先統計它的計算結果。可以考慮使用Callable和FutureTask來完成。

先說Callable它是一個功能接口,它隻有一個方法V call(),計算一個結果,失敗的話抛出一個異常。和Runnable不同的是,它不能直接交給Thread來執行,是以需要一個别的類來封裝它與Runnable,這個類就是FutureTask。FutureTask是一個類,繼承了RunnableFuture,而RunnableFuture是一個多繼承接口,它繼承了Runnable和 Future,是以FutureTask是可以作為實作了Runnable的執行個體交給Thread執行。

從内部變量來看,含有一個下層Callable執行個體,一個狀态表示,一個傳回結果,以及對運作線程的記錄

/**
     * 任務的運作狀态,最初是NEW。運作狀态隻在set, setException和cancel方法中過度到最終狀态。
     * 在完成過程中,狀态可能發生轉移到COMPLETING(在設定結果時)或者INTERRUPTING(僅當中斷運作來滿足cancel(true)時)。
     * 從這些中間狀态轉移到最終狀态使用成本更低有序/懶惰寫入,因為值是唯一的且之後不能再修改。
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** 下層的callable,運作後為null */
    private Callable<V> callable;
    /** get()操作傳回的結果或者抛出的異常*/
    private Object outcome; // 不是volatile,由reads/writes狀态來保護
    /** 運作callable的線程,在run()通過CAS修改*/
    private volatile Thread runner;
    /** 等待線程的Treiber堆棧 */
    private volatile WaitNode waiters;           

構造函數

構造函數總共有兩種重載,第一種直接給出Callable執行個體

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // 確定callable的可見性
    }           

第二種,給出Runnable執行個體和期望的傳回結果,如果Runnable執行個體運作成功則傳回的是result

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);//建立一個callable
        this.state = NEW;       // 確定callable的可見性
    }           

run

run方法是Thread運作FutureTask内任務的接口。首先,根據最上方的進入條件可以看出,隻有成功競争到修改runnerOffset成功的線程才能執行後續方法,而搜尋整個類檔案,可以發現隻有在run結束後才會重置為null,是以同一時間隻能有一個線程執行run方法成功。然後要檢查state和callable的狀态,因為run會将它們修改。調用callable.call方法擷取傳回結果,成功的話設定結果,失敗的話設定傳回結果為異常。無論是否執行成功,runner會被重置為null。

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;//狀态必須是NEW且修改執行線程成功,否則直接傳回,避免被多個線程同時執行
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();//調用callable.call方法擷取傳回結果
                    ran = true;//執行成功
                } catch (Throwable ex) {
                    result = null;
                    ran = false;//執行失敗
                    setException(ex);//設定傳回異常并喚醒等待線程解除阻塞
                }
                if (ran)
                    set(result);//設定結果
            }
        } finally {
            //runner直到狀态設定完成不能為null來避免并發調用run()
            runner = null;
            //在将runner設定為null後需要重新讀取state避免漏掉中斷
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }           

set方法先修改state為COMPLETING,然後将outcome設定為剛才計算出來的結果,最後設定state為NORMAL,并調用finishCompletion。這個方法移除并通知所有等待的線程解除阻塞,調用done(),并将callable設為null。done方法預設是什麼也不做。

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終狀态
            finishCompletion();//喚醒等待線程,将callable設為null
        }
    }

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//如果線程被park阻塞,解除阻塞
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // 取消連接配接幫助gc
                    q = next;
                }
                break;
            }
        }

        done();//未重寫時什麼也不做

        callable = null;        // to reduce footprint減少覆寫區
    }           

setException跟set邏輯上基本一樣,除了設定傳回結果是Throwable對象

protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//修改狀态
            outcome = t;//結果為Throwable
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最終狀态
            finishCompletion();
        }
    }           

get

get方法如果FutureTask已經執行完成則傳回結果,否則會等待并阻止線程排程。等待時長可以輸入,機關為納秒,不輸入為不限時等待,限時等待逾時仍然沒有完成會抛出異常。

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);//等待完成
        return report(s);//檢查時傳回結果還是抛出異常
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//限時等待完成
            throw new TimeoutException();
        return report(s);
    }           

awaitDone這個方法會阻塞目前線程(get方法的調用線程)的排程并增加等待結點,阻塞時長根據輸入的時間長度決定。如果執行Callable任務的線程完成了運作或者被中斷,則會解除棧中等待結點對應線程的阻塞。然後會根據執行結果決定是否要抛出異常還是傳回執行完成的結果。

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;//是否完成入棧
        for (;;) {
            if (Thread.interrupted()) {//檢查線程是否已經被中斷
                removeWaiter(q);//移除被中斷的等待結點
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {//已經完成
                if (q != null)
                    q.thread = null;//移除等待
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet還沒有逾時
                Thread.yield();//已經在指派,是以隻需讓出時間片等待指派完成
            //下方都是還在沒有完成call方法的情況
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);//q加入到棧的最前方
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {//逾時了
                    removeWaiter(q);//移除逾時的等待結點
                    return state;
                }
                LockSupport.parkNanos(this, nanos);//阻塞目前線程nanos納秒
            }
            else
                LockSupport.park(this);//阻塞目前線程
        }
    }           

cancel

cancel輸入的參數表示如果目前還在運作中是否要中斷執行線程,如果輸入參數是false則隻有線程已經執行完成或者抛出異常或者已經被中斷時可以把狀态修改為CANCELLED,如果是true則會中斷線程并将狀态改為INTERRUPTED。是以,cancel在該任務已經結束或者已被取消,或者競争修改狀态失敗時都會失敗。如果中斷成功,會釋放所有被阻塞的等待線程。

public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;//已經完成或者被取消或者競争取消失敗傳回false
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }           

狀态檢查

非常簡單的兩個方法。因為CANCELLED是state中最大的,是以隻有cancel方法成功才會是這種狀态。而isDone隻要不是還在運作或者還沒有被執行就是傳回true。

public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }           

簡單的使用示例

public class CallableTest implements Callable<Integer>{
    private int start;
    
    public CallableTest(int start) {
        this.start = start;
    }
    
    @Override
    public Integer call() throws Exception {
        Thread.sleep(500);
        return start + 1;
    }
    
    public static void main(String args[]) throws InterruptedException, ExecutionException{
        long start = System.currentTimeMillis();
        FutureTask<Integer> task1 = new FutureTask<>(new CallableTest(2));
        new Thread(task1).start();
        FutureTask<Integer> task2 = new FutureTask<>(new CallableTest(4));
        new Thread(task2).start();
        System.out.println(task1.get() + task2.get());//8
        long end = System.currentTimeMillis();
        System.out.println(end - start);//506
    }
}