天天看點

java并發:AQS

AQS是一個FIFO的雙向隊列,其内部通過head和tail記錄隊首和隊尾元素,隊列元素的類型為 Node。

java并發:AQS
Node
java并發:AQS

Node 中的 thread變量用來存放進入 AQS 隊列的線程;

Node 中的 SHARED 用來标記該線程是擷取共享資源時被阻塞挂起後放入 AQS 隊列的;EXCLUSIVE 用來标記該線程是擷取獨占資源時被阻塞挂起後放入 AQS 隊列的;

waitStatus 記錄目前線程等待狀态,可以為:CANCELLED (線程被取消了)、SIGNAL (線程需要被喚醒)、CONDITION (線程在條件隊列裡面等待〉、PROPAGATE (釋放共享資源時需要通知其他節點);

prev 記錄目前節點的前驅節點;next 記錄目前節點的後繼節點

ConditionObject

AQS 的内部類 ConditionObject 結合鎖實作線程同步。

java并發:AQS

ConditionObject 可以直接通路 AQS對象内部的變量,比如 state 和 AQS 隊列。

ConditionObject 是條件變量,每個條件變量在内部維護了一個條件隊列 (單向連結清單隊列),這個條件隊列和 AQS 隊列不是一回事。

此處的隊列是用來存放調用條件變量的 await 方法後被阻塞的線程,隊列的頭、尾元素分别為 firstWaiter 和 lastWaiter。

Note:

調用條件變量的 await()方法就相當于調用共享變量的 wait()方法,調用條件變量的 signal方法就相當于調用共享變量的 notify()方法,調用條件變量的 signa!All ( )方法就相當于調用共享變量的 notifyAll()方法。

至此,相信大家都已經知道條件變量是什麼了,它能用來做什麼。 

示例

java并發:AQS

示例中(2)處使用Lock 對象的 newCondition ()方法建立了一個 ConditionObject 變量,該變量就是 Lock鎖對應的一個條件變量。

示例中(3)處擷取了獨占鎖,示例中(4)處則調用了條件變量的 await ()方法阻塞挂起了目前線程。

當其他線程調用條件變量的 signal方法時,被阻塞的線程才會從 await處傳回。

在調用條件變量的 signal 和 await方法前必須先擷取條件變量對應的鎖,如果在沒有擷取到鎖之前調用了條件變量的 await方法則會抛出 java.lang.IllegalMonitorStateException異常。

一個 Lock對象可以建立多個條件變量。

AQS 隻提供了 ConditionObject 的實作,并沒有提供 newCondition 函數,該函數用來 new 一個 ConditionObject對象;需要由 AQS 的子類來實作 newCondition 函數。 

小結:

下圖反映了前面描述的關系:

詳解:

當多個線程同時調用 lock.lock()方法擷取鎖時,隻有一個線程擷取到了鎖,其他線程會被轉換為 Node 節點插入到 lock 鎖對應的 AQS 阻塞隊列裡面,并做自旋 CAS 嘗試擷取鎖。

如果擷取到鎖的線程調用了對應條件變量的 await()方法,則該線程會釋放擷取到的鎖,并被轉換為 Node 節點插入到條件變量對應的條件隊列裡面。

此時因調用 lock.lock() 方法被阻塞到 AQS 隊列裡面的一個線程會擷取到被釋放的鎖,如果該線程也調用了條件變量的 await ()方法則該線程也會被放入條件變量的條件隊列裡面。

當另外一個線程調用條件變量的 signal()或者 signa!All()方法時,會把條件隊列裡面的一個或者全部 Node節點移動到 AQS 的阻塞隊列裡面,等待時機擷取鎖。

state

在 AQS 中維持了一個狀态值 state,可以通過 getState、setState、compareAndSetState 函數修改其值,代碼如下:

/**
     * The synchronization state.
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }      

對于 AQS 來說,線程同步的關鍵是對 state 進行操作。

根據 state 是否屬于一個線程,操作 state 的方式分為獨占方式和共享方式。

獨占方式

使用獨占方式擷取的資源是與具體線程綁定的,也就是說如果一個線程擷取到了資源,則進行标記,其他線程嘗試操作 state 擷取資源時會發現目前該資源的持有者不是自己,于是在擷取失敗後被阻塞。

例子:獨占鎖 ReentrantLock 的實作

當一個線程擷取了Reer rantLock的鎖後,在 AQS 内部會使用 CAS 操作把狀态值 state 從0 變為 1,然後設定目前鎖的持有者為目前線程,當該線程再次擷取鎖時發現它就是鎖的持有者,則把狀态值從 l 變為 2,也就是設定可重入次數,而當另外一個線程擷取鎖時發現自己不是該鎖的持有者就會被放入 AQS 阻塞隊列後挂起。

在獨占方式下擷取和釋放資源的方法為:

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * Acquires in exclusive mode, aborting if interrupted.
     * Implemented by first checking interrupt status, then invoking
     * at least once {@link #tryAcquire}, returning on
     * success.  Otherwise the thread is queued, possibly repeatedly
     * blocking and unblocking, invoking {@link #tryAcquire}
     * until success or the thread is interrupted.  This method can be
     * used to implement method {@link Lock#lockInterruptibly}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }      
/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }      

共享方式

使用共享方式擷取資源與具體線程不相關;當多個線程去請求資源時通過 CAS 方式競争。

當一個線程擷取到資源後,另一個線程嘗試擷取資源時,如果目前資源能滿足它的需要,則目前線程隻需要使用 CAS 方式進行擷取即可。

例子:Semaphore 信号量

當一個線程通過 acquire()方法擷取信号量時,會首先看目前信号量個數是否滿足需要,不滿足則把目前線程放入阻塞隊列,如果滿足則通過自旋 CAS 擷取信号量。

在共享方式下擷取和釋放資源的方法為:

/**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }      
/**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }      

AQS是鎖和同步容器的基礎架構,AQS并沒有提供可用的tryAcquire和tryRelease方法。

tryAcquire和 tryRelease需要由具體的子類來實作。

子類在實作 tryAcquire和 tryRelease時要根據具體場景使用 CAS算法嘗試修改 state狀态值, 成功則傳回 true,否則傳回 false。

子類還需要定義在調用 acquire 和 release 方法時狀态值 state 的增減代表什麼含義。 

例子:

繼承自 AQS 實作的獨占鎖 ReentrantLock,定義當 status 為 0 時表示鎖空閑,為 1 時表示鎖己經被占用。

故其在重寫 tryAcquire 時,需要使用 CAS 算法檢視目前 state 是否為 0,如果為 0 則使用 CAS 設定為 1,并設定目前鎖的持有者為目前線程,而後傳回true;如果 CAS 失敗則傳回 false。

同理,tryAcquireShared 和 tryReleaseShared 也需要由具體的子類來實作。

繼承自 AQS 實作的讀寫鎖 ReentrantReadWriteLock,讀鎖在重寫 tryAcquireShared 時,首先檢視寫鎖是否被其他線程持有,如果是則直接傳回 false; 否則使用 CAS 遞增 state 的高 16 位。

(在 ReentrantReadWriteLock 中, state 的高 16 位為擷取讀鎖的次數) 

基于 AQS 實作的鎖除了需要重寫上面介紹的方法外,還需要重寫 isHeldExclusively 方法,來判斷鎖是被目前線程獨占還是被共享。

問題:帶有 Interruptibly關鍵字的函數和不帶該關鍵字的函數有什麼差別? 

不帶 Intenuptibly 關鍵字的方法意思是不對中斷進行響應。