

AQS是一個FIFO的雙向隊列,其内部通過head和tail記錄隊首和隊尾元素,隊列元素的類型為 Node。


Node 中的 thread變量用來存放進入 AQS 隊列的線程;

Node 中的 SHARED 用來标記該線程是擷取共享資源時被阻塞挂起後放入 AQS 隊列的;EXCLUSIVE 用來标記該線程是擷取獨占資源時被阻塞挂起後放入 AQS 隊列的;

waitStatus 記錄目前線程等待狀态,可以為:CANCELLED (線程被取消了)、SIGNAL (線程需要被喚醒)、CONDITION (線程在條件隊列裡面等待〉、PROPAGATE (釋放共享資源時需要通知其他節點);

prev 記錄目前節點的前驅節點;next 記錄目前節點的後繼節點


AQS 的内部類 ConditionObject 結合鎖實作線程同步。


ConditionObject 可以直接通路 AQS對象内部的變量,比如 state 和 AQS 隊列。

ConditionObject 是條件變量,每個條件變量在内部維護了一個條件隊列 (單向連結清單隊列),這個條件隊列和 AQS 隊列不是一回事。

此處的隊列是用來存放調用條件變量的 await 方法後被阻塞的線程,隊列的頭、尾元素分别為 firstWaiter 和 lastWaiter。


調用條件變量的 await()方法就相當于調用共享變量的 wait()方法,調用條件變量的 signal方法就相當于調用共享變量的 notify()方法,調用條件變量的 signa!All ( )方法就相當于調用共享變量的 notifyAll()方法。




示例中(2)處使用Lock 對象的 newCondition ()方法建立了一個 ConditionObject 變量,該變量就是 Lock鎖對應的一個條件變量。

示例中(3)處擷取了獨占鎖,示例中(4)處則調用了條件變量的 await ()方法阻塞挂起了目前線程。

當其他線程調用條件變量的 signal方法時,被阻塞的線程才會從 await處傳回。

在調用條件變量的 signal 和 await方法前必須先擷取條件變量對應的鎖,如果在沒有擷取到鎖之前調用了條件變量的 await方法則會抛出 java.lang.IllegalMonitorStateException異常。

一個 Lock對象可以建立多個條件變量。

AQS 隻提供了 ConditionObject 的實作,并沒有提供 newCondition 函數,該函數用來 new 一個 ConditionObject對象;需要由 AQS 的子類來實作 newCondition 函數。 




當多個線程同時調用 lock.lock()方法擷取鎖時,隻有一個線程擷取到了鎖,其他線程會被轉換為 Node 節點插入到 lock 鎖對應的 AQS 阻塞隊列裡面,并做自旋 CAS 嘗試擷取鎖。

如果擷取到鎖的線程調用了對應條件變量的 await()方法,則該線程會釋放擷取到的鎖,并被轉換為 Node 節點插入到條件變量對應的條件隊列裡面。

此時因調用 lock.lock() 方法被阻塞到 AQS 隊列裡面的一個線程會擷取到被釋放的鎖,如果該線程也調用了條件變量的 await ()方法則該線程也會被放入條件變量的條件隊列裡面。

當另外一個線程調用條件變量的 signal()或者 signa!All()方法時,會把條件隊列裡面的一個或者全部 Node節點移動到 AQS 的阻塞隊列裡面,等待時機擷取鎖。


在 AQS 中維持了一個狀态值 state,可以通過 getState、setState、compareAndSetState 函數修改其值,代碼如下:

     * The synchronization state.
    private volatile int state;

     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
    protected final int getState() {
        return state;

     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
    protected final void setState(int newState) {
        state = newState;

     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);

對于 AQS 來說,線程同步的關鍵是對 state 進行操作。

根據 state 是否屬于一個線程,操作 state 的方式分為獨占方式和共享方式。


使用獨占方式擷取的資源是與具體線程綁定的,也就是說如果一個線程擷取到了資源,則進行标記,其他線程嘗試操作 state 擷取資源時會發現目前該資源的持有者不是自己,于是在擷取失敗後被阻塞。

例子:獨占鎖 ReentrantLock 的實作

當一個線程擷取了Reer rantLock的鎖後,在 AQS 内部會使用 CAS 操作把狀态值 state 從0 變為 1,然後設定目前鎖的持有者為目前線程,當該線程再次擷取鎖時發現它就是鎖的持有者,則把狀态值從 l 變為 2,也就是設定可重入次數,而當另外一個線程擷取鎖時發現自己不是該鎖的持有者就會被放入 AQS 阻塞隊列後挂起。


     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

     * Acquires in exclusive mode, aborting if interrupted.
     * Implemented by first checking interrupt status, then invoking
     * at least once {@link #tryAcquire}, returning on
     * success.  Otherwise the thread is queued, possibly repeatedly
     * blocking and unblocking, invoking {@link #tryAcquire}
     * until success or the thread is interrupted.  This method can be
     * used to implement method {@link Lock#lockInterruptibly}.
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @throws InterruptedException if the current thread is interrupted
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            return true;
        return false;


使用共享方式擷取資源與具體線程不相關;當多個線程去請求資源時通過 CAS 方式競争。

當一個線程擷取到資源後,另一個線程嘗試擷取資源時,如果目前資源能滿足它的需要,則目前線程隻需要使用 CAS 方式進行擷取即可。

例子:Semaphore 信号量

當一個線程通過 acquire()方法擷取信号量時,會首先看目前信号量個數是否滿足需要,不滿足則把目前線程放入阻塞隊列,如果滿足則通過自旋 CAS 擷取信号量。


     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)

     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            return true;
        return false;


tryAcquire和 tryRelease需要由具體的子類來實作。

子類在實作 tryAcquire和 tryRelease時要根據具體場景使用 CAS算法嘗試修改 state狀态值, 成功則傳回 true,否則傳回 false。

子類還需要定義在調用 acquire 和 release 方法時狀态值 state 的增減代表什麼含義。 


繼承自 AQS 實作的獨占鎖 ReentrantLock,定義當 status 為 0 時表示鎖空閑,為 1 時表示鎖己經被占用。

故其在重寫 tryAcquire 時,需要使用 CAS 算法檢視目前 state 是否為 0,如果為 0 則使用 CAS 設定為 1,并設定目前鎖的持有者為目前線程,而後傳回true;如果 CAS 失敗則傳回 false。

同理,tryAcquireShared 和 tryReleaseShared 也需要由具體的子類來實作。

繼承自 AQS 實作的讀寫鎖 ReentrantReadWriteLock,讀鎖在重寫 tryAcquireShared 時,首先檢視寫鎖是否被其他線程持有,如果是則直接傳回 false; 否則使用 CAS 遞增 state 的高 16 位。

(在 ReentrantReadWriteLock 中, state 的高 16 位為擷取讀鎖的次數) 

基于 AQS 實作的鎖除了需要重寫上面介紹的方法外,還需要重寫 isHeldExclusively 方法,來判斷鎖是被目前線程獨占還是被共享。

問題:帶有 Interruptibly關鍵字的函數和不帶該關鍵字的函數有什麼差別? 

不帶 Intenuptibly 關鍵字的方法意思是不對中斷進行響應。