AQS簡介
AQS,AbstractQueuedSynchronizer,抽象隊列同步器,是一個用來建構鎖和同步器的架構,java中常見的鎖如ReentrantLock、Semaphore底層都是基于AQS來實作的。
AQS核心思想
如果被請求的共享資源空閑,則将目前請求資源設定為有效的線程,并且将共享資源設定為鎖定狀态,如果被請求的共享資源被占用,則會将目前請求線程進行阻塞操作,并在共享資源釋放時會嘗試去喚醒。
AQS是用CLH隊列鎖實作的,将在暫時擷取不到鎖的線程加入到隊列中。
CLH(Craig,Landin,and Hagersten)隊列是一個虛拟的雙向隊列(虛拟的雙向隊列即不存在隊列執行個體,僅存在結點之間的關聯關系)。AQS是将每條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node)來實作鎖的配置設定。
共享變量
AQS中使用一個volatile修飾的state作為共享變量
private volatile int state;//共享變量,使用volatile修飾保證線程可見性
共享方式
AQS定義了兩種資源共享方式
- 獨占鎖(Exclusive):隻有一個線程可以執行。而獨占鎖又分為公平鎖和非公平鎖
- 公平鎖:按照線程申請的順序,先到者先拿到鎖
- 非公平鎖:當多個線程擷取鎖時,無視隊列中的排隊順序,誰先搶到就是誰的
- 共享鎖(Share):多個線程可同時執行,如Semaphore/CountDownLatch
AQS資料結構
AbstractQueuedSynchronizer類底層的資料結構是使用CLH(Craig,Landin,and Hagersten)隊列是一個虛拟的雙向隊列(虛拟的雙向隊列即不存在隊列執行個體,僅存在結點之間的關聯關系)。AQS是将每條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node)來實作鎖的配置設定。其中Sync queue,即同步隊列,是雙向連結清單,包括head結點和tail結點,head結點主要用作後續的排程。而Condition queue不是必須的,其是一個單向連結清單,隻有當使用Condition時,才會存在此單向連結清單。并且可能會有多個Condition queue。
AQS源碼分析
AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer抽象類
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
AbstractOwnableSynchronizer
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// 版本序列号
private static final long serialVersionUID = 3737899427754241961L;
// 構造方法
protected AbstractOwnableSynchronizer() { }
// 獨占模式下的線程
private transient Thread exclusiveOwnerThread;
// 設定獨占線程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 擷取獨占線程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractOwnableSynchronizer中可以設定獨占線程,并且可以擷取獨占線程。
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer類有兩個内部類,Node和ConditionObject類
AbstractQueuedSynchronizer -- Node
static final class Node {
// 模式,分為共享與獨占
// 共享模式
static final Node SHARED = new Node();
// 獨占模式
static final Node EXCLUSIVE = null;
// 結點狀态
// CANCELLED,值為1,表示目前的線程被取消
// SIGNAL,值為-1,表示目前節點的後繼節點包含的線程需要運作,也就是unpark
// CONDITION,值為-2,表示目前節點在等待condition,也就是在condition隊列中
// PROPAGATE,值為-3,表示目前場景下後續的acquireShared能夠得以執行
// 值為0,表示目前節點在sync隊列中,等待着擷取鎖
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 結點狀态
volatile int waitStatus;
// 前驅結點
volatile Node prev;
// 後繼結點
volatile Node next;
// 結點所對應的線程
volatile Thread thread;
// 下一個等待者
Node nextWaiter;
// 結點是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 擷取前驅結點,若前驅結點為空,抛出異常
final Node predecessor() throws NullPointerException {
// 儲存前驅結點
Node p = prev;
if (p == null) // 前驅結點為空,抛出異常
throw new NullPointerException();
else // 前驅結點不為空,傳回
return p;
}
// 無參構造方法
Node() { // Used to establish initial head or SHARED marker
}
// 構造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 構造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AbstractQueuedSynchronizer -- ConditionObject
// 内部類
public class ConditionObject implements Condition, java.io.Serializable {
// 版本号
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
// condition隊列的頭節點
private transient Node firstWaiter;
/** Last node of condition queue. */
// condition隊列的尾結點
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
// 構造方法
public ConditionObject() { }
// Internal methods
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
// 添加新的waiter到wait隊列
private Node addConditionWaiter() {
// 儲存尾結點
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) { // 尾結點不為空,并且尾結點的狀态不為CONDITION
// 清除狀态為CONDITION的結點
unlinkCancelledWaiters();
// 将最後一個結點重新指派給t
t = lastWaiter;
}
// 建立一個結點
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) // 尾結點為空
// 設定condition隊列的頭節點
firstWaiter = node;
else // 尾結點不為空
// 設定為節點的nextWaiter域為node結點
t.nextWaiter = node;
// 更新condition隊列的尾結點
lastWaiter = node;
return node;
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
// 循環
do {
if ( (firstWaiter = first.nextWaiter) == null) // 該節點的nextWaiter為空
// 設定尾結點為空
lastWaiter = null;
// 設定first結點的nextWaiter域
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); // 将結點從condition隊列轉移到sync隊列失敗并且condition隊列中的頭節點不為空,一直循環
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
// condition隊列的頭節點尾結點都設定為空
lastWaiter = firstWaiter = null;
// 循環
do {
// 擷取first結點的nextWaiter域結點
Node next = first.nextWaiter;
// 設定first結點的nextWaiter域為空
first.nextWaiter = null;
// 将first結點從condition隊列轉移到sync隊列
transferForSignal(first);
// 重新設定first
first = next;
} while (first != null);
}
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
// 從condition隊列中清除狀态為CANCEL的結點
private void unlinkCancelledWaiters() {
// 儲存condition隊列頭節點
Node t = firstWaiter;
Node trail = null;
while (t != null) { // t不為空
// 下一個結點
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) { // t結點的狀态不為CONDTION狀态
// 設定t節點的nextWaiter域為空
t.nextWaiter = null;
if (trail == null) // trail為空
// 重新設定condition隊列的頭節點
firstWaiter = next;
else // trail不為空
// 設定trail結點的nextWaiter域為next結點
trail.nextWaiter = next;
if (next == null) // next結點為空
// 設定condition隊列的尾結點
lastWaiter = trail;
}
else // t結點的狀态為CONDTION狀态
// 設定trail結點
trail = t;
// 設定t結點
t = next;
}
}
// public methods
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 喚醒一個等待線程。如果所有的線程都在等待此條件,則選擇其中的一個喚醒。在從 await 傳回之前,該線程必須重新擷取鎖。
public final void signal() {
if (!isHeldExclusively()) // 不被目前線程獨占,抛出異常
throw new IllegalMonitorStateException();
// 儲存condition隊列頭節點
Node first = firstWaiter;
if (first != null) // 頭節點不為空
// 喚醒一個等待線程
doSignal(first);
}
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 喚醒所有等待線程。如果所有的線程都在等待此條件,則喚醒所有線程。在從 await 傳回之前,每個線程都必須重新擷取鎖。
public final void signalAll() {
if (!isHeldExclusively()) // 不被目前線程獨占,抛出異常
throw new IllegalMonitorStateException();
// 儲存condition隊列頭節點
Node first = firstWaiter;
if (first != null) // 頭節點不為空
// 喚醒所有等待線程
doSignalAll(first);
}
/**
* Implements uninterruptible condition wait.
* <ol>
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
// 等待,目前線程在接到信号之前一直處于等待狀态,不響應中斷
public final void awaitUninterruptibly() {
// 添加一個結點到等待隊列
Node node = addConditionWaiter();
// 擷取釋放的狀态
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) { //
// 阻塞目前線程
LockSupport.park(this);
if (Thread.interrupted()) // 目前線程被中斷
// 設定interrupted狀态
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted) //
selfInterrupt();
}
/*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
// // 等待,目前線程在接到信号或被中斷之前一直處于等待狀态
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 目前線程被中斷,抛出異常
throw new InterruptedException();
// 在wait隊列上添加一個結點
Node node = addConditionWaiter();
//
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 阻塞目前線程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 檢查結點等待時的中斷類型
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
// 等待,目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* Implements absolute timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
// 等待,目前線程在接到信号、被中斷或到達指定最後期限之前一直處于等待狀态
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
// 等待,目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态。此方法在行為上等效于: awaitNanos(unit.toNanos(time)) > 0
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// support for instrumentation
/**
* Returns true if this condition was created by the given
* synchronization object.
*
* @return {@code true} if owned
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
/**
* Queries whether any threads are waiting on this condition.
* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 查詢是否有正在等待此條件的任何線程
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/**
* Returns an estimate of the number of threads waiting on
* this condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 傳回正在等待此條件的線程數估計值
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/**
* Returns a collection containing those threads that may be
* waiting on this Condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 傳回包含那些可能正在等待此條件的線程集合
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
AbstractQueuedSynchronizer
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 版本号
private static final long serialVersionUID = 7373984972572414691L;
// 頭節點
private transient volatile Node head;
// 尾結點
private transient volatile Node tail;
// 狀态
private volatile int state;
// 自旋時間
static final long spinForTimeoutThreshold = 1000L;
// Unsafe類執行個體
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state記憶體偏移位址
private static final long stateOffset;
// head記憶體偏移位址
private static final long headOffset;
// state記憶體偏移位址
private static final long tailOffset;
// tail記憶體偏移位址
private static final long waitStatusOffset;
// next記憶體偏移位址
private static final long nextOffset;
// 靜态初始化塊
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
}
AQS核心方法
acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
一個線程的執行流程
- 首先調用tryAcquire,此方法的線程會試圖在獨占模式下擷取對象狀态。此方法應該查詢是否允許它在獨占模式下擷取對象狀态,但是這個在AbstractQueuedSynchronizer會抛出一個異常,預設是在子類中去實作
- 如果tryAcquire失敗,則調用addWaiter方法,将目前線程封裝成為一個Node并放入CLH的隊尾
- 調用acquireQueued方法,這個方法會不斷的去嘗試擷取資源進行加鎖操作,如果成功傳回true,如果失敗傳回false
addWaiter
// 添加等待者
private Node addWaiter(Node mode) {
// 新生成一個結點,預設為獨占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 儲存尾結點
Node pred = tail;
if (pred != null) { // 尾結點不為空,即已經被初始化
// 将node結點的prev域連接配接到尾結點
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 比較pred是否為尾結點,是則将尾結點設定為node
// 設定尾結點的next域為node
pred.next = node;
return node; // 傳回新生成的結點
}
}
enq(node); // 尾結點為空(即還沒有被初始化過),或者是compareAndSetTail操作失敗,則入隊列
return node;
}
acquireQueue
// sync隊列中的結點在獨占且忽略中斷的模式下擷取(資源)
final boolean acquireQueued(final Node node, int arg) {
// 标志
boolean failed = true;
try {
// 中斷标志
boolean interrupted = false;
for (;;) { // 無限循環
// 擷取node節點的前驅結點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 前驅為頭節點并且成功獲得鎖
setHead(node); // 設定頭節點
p.next = null; // help GC
failed = false; // 設定标志
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先擷取目前節點的前驅節點,如果前驅節點是頭節點并且能夠擷取(資源),代表該目前節點能夠占有鎖,設定頭節點為目前節點,傳回。否則,調用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法,首先,我們看shouldParkAfterFailedAcquire方法
shouldParkAfterFailedAcquire
// 當擷取(資源)失敗後,檢查并且更新結點狀态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 擷取前驅結點的狀态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 狀态為SIGNAL,為-1
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 可以進行park操作
return true;
if (ws > 0) { // 表示狀态為CANCELLED,為1
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 找到pred結點前面最近的一個狀态不為CANCELLED的結點
// 指派pred結點的next域
pred.next = node;
} else { // 為PROPAGATE -3 或者是0 表示無狀态,(為CONDITION -2時,表示此節點在condition queue中)
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 比較并設定前驅結點的狀态為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不能進行park操作
return false;
}
隻有當該節點的前驅結點的狀态為SIGNAL時,才可以對該結點所封裝的線程進行park操作。否則,将不能進行park操作。再看parkAndCheckInterrupt方法
parkAndCheckInterrupt
// 進行park操作并且傳回該線程是否被中斷
private final boolean parkAndCheckInterrupt() {
// 在許可可用之前禁用目前線程,并且設定了blocker
LockSupport.park(this);
return Thread.interrupted(); // 目前線程是否已被中斷,并清除中斷标記位
}
parkAndCheckInterrupt方法裡的邏輯是首先執行park操作,即禁用目前線程,然後傳回該線程是否已經被中斷
cancelAcquire
// 取消繼續擷取(資源)
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// node為空,傳回
if (node == null)
return;
// 設定node結點的thread為空
node.thread = null;
// Skip cancelled predecessors
// 儲存node的前驅結點
Node pred = node.prev;
while (pred.waitStatus > 0) // 找到node前驅結點中第一個狀态小于0的結點,即不為CANCELLED狀态的結點
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 擷取pred結點的下一個結點
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 設定node結點的狀态為CANCELLED
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { // node結點為尾結點,則設定尾結點為pred結點
// 比較并設定pred結點的next節點為null
compareAndSetNext(pred, predNext, null);
} else { // node結點不為尾結點,或者比較設定不成功
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) { // (pred結點不為頭節點,并且pred結點的狀态為SIGNAL)或者
// pred結點狀态小于等于0,并且比較并設定等待狀态為SIGNAL成功,并且pred結點所封裝的線程不為空
// 儲存結點的後繼
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 後繼不為空并且後繼的狀态小于等于0
compareAndSetNext(pred, predNext, next); // 比較并設定pred.next = next;
} else {
unparkSuccessor(node); // 釋放node的前一個結點
}
node.next = node; // help GC
}
}
該方法完成的功能就是取消目前線程對資源的擷取,即設定該結點的狀态為CANCELLED
unparkSuccessor
// 釋放後繼結點
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 擷取node結點的等待狀态
int ws = node.waitStatus;
if (ws < 0) // 狀态值小于0,為SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
// 比較并且設定結點等待狀态,設定為0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 擷取node節點的下一個結點
Node s = node.next;
if (s == null || s.waitStatus > 0) { // 下一個結點為空或者下一個節點的等待狀态大于0,即為CANCELLED
// s指派為空
s = null;
// 從尾結點開始從後往前開始周遊
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) // 找到等待狀态小于等于0的結點,找到最前的狀态小于等于0的結點
// 儲存結點
s = t;
}
if (s != null) // 該結點不為為空,釋放許可
LockSupport.unpark(s.thread);
}
該方法的作用就是為了釋放node節點的後繼結點。
release方法
以獨占鎖為例
public final boolean release(int arg) {
if (tryRelease(arg)) { // 釋放成功
// 儲存頭節點
Node h = head;
if (h != null && h.waitStatus != 0) // 頭節點不為空并且頭節點狀态不為0
unparkSuccessor(h); //釋放頭節點的後繼結點
return true;
}
return false;
}
AQS中的設計模式
AQS中的設計是基于模闆方法模式的,如果需要自定義同步器可以通過繼承AbstractQueuedSynchronizer來實作