AQS是一個FIFO的雙向隊列,其内部通過head和tail記錄隊首和隊尾元素,隊列元素的類型為 Node。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISPrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdsATOfd3bkFGazxCMx8VesATMfhHLlN3XnxCMwEzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5iMkJTNmVmNiNGMzY2NzQWY1kDOlZDM0EjNkVWMmFWNh9CXyEzLchDMxIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjL0M3Lc9CX6MHc0RHaiojIsJye.png)
Node 中的 thread變量用來存放進入 AQS 隊列的線程;
Node 中的 SHARED 用來标記該線程是擷取共享資源時被阻塞挂起後放入 AQS 隊列的;EXCLUSIVE 用來标記該線程是擷取獨占資源時被阻塞挂起後放入 AQS 隊列的;
waitStatus 記錄目前線程等待狀态,可以為:CANCELLED (線程被取消了)、SIGNAL (線程需要被喚醒)、CONDITION (線程在條件隊列裡面等待〉、PROPAGATE (釋放共享資源時需要通知其他節點);
prev 記錄目前節點的前驅節點;next 記錄目前節點的後繼節點
ConditionObject
AQS 的内部類 ConditionObject 結合鎖實作線程同步。
ConditionObject 可以直接通路 AQS對象内部的變量,比如 state 和 AQS 隊列。
ConditionObject 是條件變量,每個條件變量在内部維護了一個條件隊列 (單向連結清單隊列),這個條件隊列和 AQS 隊列不是一回事。
此處的隊列是用來存放調用條件變量的 await 方法後被阻塞的線程,隊列的頭、尾元素分别為 firstWaiter 和 lastWaiter。
Note:
調用條件變量的 await()方法就相當于調用共享變量的 wait()方法,調用條件變量的 signal方法就相當于調用共享變量的 notify()方法,調用條件變量的 signa!All ( )方法就相當于調用共享變量的 notifyAll()方法。
至此,相信大家都已經知道條件變量是什麼了,它能用來做什麼。
示例
示例中(2)處使用Lock 對象的 newCondition ()方法建立了一個 ConditionObject 變量,該變量就是 Lock鎖對應的一個條件變量。
示例中(3)處擷取了獨占鎖,示例中(4)處則調用了條件變量的 await ()方法阻塞挂起了目前線程。
當其他線程調用條件變量的 signal方法時,被阻塞的線程才會從 await處傳回。
在調用條件變量的 signal 和 await方法前必須先擷取條件變量對應的鎖,如果在沒有擷取到鎖之前調用了條件變量的 await方法則會抛出 java.lang.IllegalMonitorStateException異常。
一個 Lock對象可以建立多個條件變量。
AQS 隻提供了 ConditionObject 的實作,并沒有提供 newCondition 函數,該函數用來 new 一個 ConditionObject對象;需要由 AQS 的子類來實作 newCondition 函數。
小結:
下圖反映了前面描述的關系:
詳解:
當多個線程同時調用 lock.lock()方法擷取鎖時,隻有一個線程擷取到了鎖,其他線程會被轉換為 Node 節點插入到 lock 鎖對應的 AQS 阻塞隊列裡面,并做自旋 CAS 嘗試擷取鎖。
如果擷取到鎖的線程調用了對應條件變量的 await()方法,則該線程會釋放擷取到的鎖,并被轉換為 Node 節點插入到條件變量對應的條件隊列裡面。
此時因調用 lock.lock() 方法被阻塞到 AQS 隊列裡面的一個線程會擷取到被釋放的鎖,如果該線程也調用了條件變量的 await ()方法則該線程也會被放入條件變量的條件隊列裡面。
當另外一個線程調用條件變量的 signal()或者 signa!All()方法時,會把條件隊列裡面的一個或者全部 Node節點移動到 AQS 的阻塞隊列裡面,等待時機擷取鎖。
state
在 AQS 中維持了一個狀态值 state,可以通過 getState、setState、compareAndSetState 函數修改其值,代碼如下:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
對于 AQS 來說,線程同步的關鍵是對 state 進行操作。
根據 state 是否屬于一個線程,操作 state 的方式分為獨占方式和共享方式。
獨占方式
使用獨占方式擷取的資源是與具體線程綁定的,也就是說如果一個線程擷取到了資源,則進行标記,其他線程嘗試操作 state 擷取資源時會發現目前該資源的持有者不是自己,于是在擷取失敗後被阻塞。
例子:獨占鎖 ReentrantLock 的實作
當一個線程擷取了Reer rantLock的鎖後,在 AQS 内部會使用 CAS 操作把狀态值 state 從0 變為 1,然後設定目前鎖的持有者為目前線程,當該線程再次擷取鎖時發現它就是鎖的持有者,則把狀态值從 l 變為 2,也就是設定可重入次數,而當另外一個線程擷取鎖時發現自己不是該鎖的持有者就會被放入 AQS 阻塞隊列後挂起。
在獨占方式下擷取和釋放資源的方法為:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
共享方式
使用共享方式擷取資源與具體線程不相關;當多個線程去請求資源時通過 CAS 方式競争。
當一個線程擷取到資源後,另一個線程嘗試擷取資源時,如果目前資源能滿足它的需要,則目前線程隻需要使用 CAS 方式進行擷取即可。
例子:Semaphore 信号量
當一個線程通過 acquire()方法擷取信号量時,會首先看目前信号量個數是否滿足需要,不滿足則把目前線程放入阻塞隊列,如果滿足則通過自旋 CAS 擷取信号量。
在共享方式下擷取和釋放資源的方法為:
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
AQS是鎖和同步容器的基礎架構,AQS并沒有提供可用的tryAcquire和tryRelease方法。
tryAcquire和 tryRelease需要由具體的子類來實作。
子類在實作 tryAcquire和 tryRelease時要根據具體場景使用 CAS算法嘗試修改 state狀态值, 成功則傳回 true,否則傳回 false。
子類還需要定義在調用 acquire 和 release 方法時狀态值 state 的增減代表什麼含義。
例子:
繼承自 AQS 實作的獨占鎖 ReentrantLock,定義當 status 為 0 時表示鎖空閑,為 1 時表示鎖己經被占用。
故其在重寫 tryAcquire 時,需要使用 CAS 算法檢視目前 state 是否為 0,如果為 0 則使用 CAS 設定為 1,并設定目前鎖的持有者為目前線程,而後傳回true;如果 CAS 失敗則傳回 false。
同理,tryAcquireShared 和 tryReleaseShared 也需要由具體的子類來實作。
繼承自 AQS 實作的讀寫鎖 ReentrantReadWriteLock,讀鎖在重寫 tryAcquireShared 時,首先檢視寫鎖是否被其他線程持有,如果是則直接傳回 false; 否則使用 CAS 遞增 state 的高 16 位。
(在 ReentrantReadWriteLock 中, state 的高 16 位為擷取讀鎖的次數)
基于 AQS 實作的鎖除了需要重寫上面介紹的方法外,還需要重寫 isHeldExclusively 方法,來判斷鎖是被目前線程獨占還是被共享。
問題:帶有 Interruptibly關鍵字的函數和不帶該關鍵字的函數有什麼差別?
不帶 Intenuptibly 關鍵字的方法意思是不對中斷進行響應。