AQS
- 1.什麼是AQS??
- 2.以ReentrantLock為例,了解AQS
-
- 2.1首先來看公平與非公平是咋實作的??
-
- 2.1.1 加鎖
- 2.2 可重入鎖實作原理
- 2.3 打斷與不可打斷實作原理
-
- 2.1.1 解鎖
- 2.4 擷取鎖逾時實作
- 2.5 條件變量的實作
1.什麼是AQS??
AQS: 抽象的隊列同步器:
是JUC的基石,是建構鎖(ReentrantLock和ReentrantReadWriteLock)和其他同步器元件(Semaphore,CountDownLatch,CyclicBarriar)的基礎架構;
-
抽象: 我們要使用AQS,需要繼承AQS并重寫諸如
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
等抽象方法;
- 隊列: AQS抽象類中維護了一個CLH的變種,即雙向的先進先出的隊列,用head,tail記錄隊首和隊尾元素,元素的類型為Node,将暫時擷取不到鎖的線程封裝為Node節點加入到隊列中;
-
同步器: AQS中定義了一個int型的被volatile修飾的變量,可以通過getState,setState,compareAndSetState函數修改其值;線程同步的關鍵就是對state進行操作,根據state是否屬于一個線程,操作state的方式分為獨占式和共享式
對于ReentrantLock來說,state可以用來表示擷取鎖的可重入次數
對于ReentrantReadWriteLock,state的高16位表示擷取讀鎖的次數,低16位表示擷取到寫鎖的線程的可重入次數
對于Semaphore來說,state用來表示目前可用信号的個數
對于CountDownLatch來說,用來表示計數器目前的值
AQS的組成
(紅色線代表是内部類)
AQS中還有一個ConditionObject内部類
Node節點
AQS在
獨占方式
下擷取和釋放資源使用的方法是:
void acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
void acquireInterruptibly(int arg)
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
boolean release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
其中tryAcquire和tryRelease方法需要由具體的子類來實作,這裡是模闆方法的展現
可以發現,AQS中該方法是直接抛出異常的
在
共享方式
下擷取和釋放資源使用的方法為:
void acquireShared(int arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean releaseShared(int arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2.以ReentrantLock為例,了解AQS
我們知道ReentrantLock具有
可重入
可打斷
公平和非公平
條件變量
可逾時等特性
2.1首先來看公平與非公平是咋實作的??
可以看到ReentrantLock中維護了Sync,FairSync,NonFairSync3個内部類
以及他們的繼承關系Sync繼承AQS,FairSync,NonFairSync繼承Sync
預設建立的是非公平鎖
當調用Reentrant的ock方法時,實際上是調用sync的lock方法,sync進而調用fairSync或者NonFairSync的lock方法
public void lock() {
sync.lock();
}
這裡可以發現公平鎖與非公平鎖的第一個差別:
非公平鎖: 調用lock時,首先就會執行一次CAS,如果失敗,才會進入acquire方法
公平鎖: 不會進行CAS搶占鎖,直接進入acquire方法
具體tryAcquire方法交給子類來實作
可以明顯看出公平鎖與非公平鎖的第二個差別在于tryAcquire方法**
就在于公平鎖在擷取同步鎖時多了一個限制條件:hasQueuedPredecessors()
hasQueuedPredecessors是判斷等待隊列中是否存在
有效節點
的方法
如果h == t,說明目前隊列為空,就直接傳回false,那麼公平鎖發現隊列中沒有人,那我就去擷取鎖
如果h != t, 也就是head != tail
------- > 如果h != t 并且 s ==null,說明有一個元素将要作為AQS的第一個節點,傳回true
--------> 如果h != t 并且 s != null 以及 s.thread != Thread.currentThread(),則說明,隊列裡面已經有節點了,但是不是目前線程,則傳回true
---------->如果h != t 并且 s != null 以及 s.thread == Thread.currentThread(),說明隊列中的節點是目前線程,那麼傳回false(這代表着鎖重入!!!)
是以總結一下: 隻有
隊列為空
或者
目前線程節點是AQS中的第一個節點
,則傳回false,代表後續可以去擷取鎖;
否則其他情況都表示,已經有前驅節點,傳回true代表後續不能擷取鎖!!
// ㈠ AQS 繼承過來的方法, 友善閱讀, 放在此處
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 時表示隊列中有 Node
return h != t &&
(
// (s = h.next) == null 表示隊列中還有沒有老二
(s = h.next) == null ||
// 或者隊列中老二線程不是此線程
s.thread != Thread.currentThread()
);
}
2.1.1 加鎖
2.2 可重入鎖實作原理
**`從上面的分析可知:
- 如果cas設定state==0 成功,即擷取鎖成功,則傳回true,
- 如果發現state != 0,但是持有鎖的線程是目前線程,則表明目前線程要重入,是以給state + 1(
)所謂的可重入的特性,就是這裡實作的
-
隻有當state的狀态不是為0(代表已經有線程持有鎖)并且持有鎖的線程不是目前線程,則傳回false,即擷取鎖失敗,
然後來到addWaiter()和acquireQueued()方法`**
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先來看看addWaiter()
private Node addWaiter(Node mode) {
// 把目前線程封裝為一個節點, mode傳入的是EXCLUSIVE代表獨占模式
Node node = new Node(Thread.currentThread(), mode);
//把tail指派給pred
Node pred = tail;
//剛開始,taii == head == pred == null
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//把目前節點入隊
enq(node);
return node;
}
enq方法
private Node enq(final Node node) {
//循環
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一次循環:
由于t == null 建立一個哨兵節點
注意: 這裡沒有隻是new Node(),建立了一個哨兵節點!!!
,是以進入
compareAndSetHead(new Node()
,把head的引用指向哨兵節點,然後tail = head,tail也指向哨兵結點
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
示意圖如下:
建立好哨兵節點後,進入第二次循環
此時才開始把node節點入隊,上面隻是設定哨兵節點
node.prev設定node節點的前驅節點為哨兵節點,然後改變tail的引用指向node節點
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
至此,addWaiter方法執行結束,傳回node節點,進入acquireQueued()
acquireQueued()方法: acquireQueued 會在一個死循環中不斷嘗試獲得鎖,失敗後進入 park 阻塞
-
如果目前節點的前驅節點是head(代表目前隊列中我是第一個要擷取鎖的節點) 那麼再次tryAcquire()擷取鎖,
1.1擷取鎖成功,則通過setHead方法把head指向node,并把node節點中線程置位null,prev置位null,然後p.next置位null,即node節點變為了此時的哨兵節點.然後傳回打斷标記
1.2如果擷取鎖失敗,進入shouldParkAfterFailedAcquire
-
如果目前節點的前驅節點不是head,那麼就不會嘗試擷取鎖了(我前面還有人等着呢!!!老實去排隊!!),直接進入shouldParkAfterFailedAcquire方法
2.1shouldParkAfterFailedAcquire傳回false,則繼續循環
2.2 如果傳回true,則執行parkAndCheckInterrupt()方法,真正阻塞自己
(1)如果線程因為interrupt被喚醒,parkAndCheckInterrupt()會傳回true則interrupted置為true,繼續循環去嘗試擷取鎖
(2)如果線程是被unpark喚醒, parkAndCheckInterrupt()會傳回false,然後繼續循環
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//得到node的前驅節點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 還是需要獲得鎖後, 才能傳回打斷狀态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//這裡個人感覺不會執行,有大佬知道請評論區指點!!
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire(p, node)方法: 該方法會嘗試讓目前的節點的前驅節點的waitStatus設定為-1,
該方法接收的參數: 是
node節點
和
node節點的前驅節點
這裡會涉及到Node節點中waitStatus的狀态值,列舉出來,共有4種狀态,預設是0
/** waitStatus value to indicate thread has cancelled */
//表示線程已經取消,就是線程在隊列中等待過程中,取消等待(老子不等了!!!)
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
//把目前線程置位-1, 表示需要喚醒(unpark)後繼節點(線程)
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
//目前線程因條件不滿足,在條件變量(ConditionObject)的等待隊列中
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//或得前驅節點的狀态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//跳過取消的前驅節點
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 如果前驅節點的狀态已經是-1 .則傳回true
- 如果前驅節點的狀态為1 (>0),則表明前驅節點被取消,則跳過前驅節點,直到找到一個狀态 <= 0的,然後把前驅的節點的next置位node,然後傳回到外層循環
- 如果前驅節點的狀态不是以上兩種.,意味着隻有waitStatus為0(預設值)和-3的能到這條分支,那麼就執行compareAndSetWaitStatus方法,
;把前驅節點的waitStatus置位-1(代表前驅節點有義務喚醒後繼節點)
- 如果CAS成功,然後進入parkAndCheckInterrupt方法,阻塞自己;
- 如果失敗,傳回false,傳回到acquireQueued中繼續循環嘗試
- 如果以上3種情況都不走,傳回false,傳回到acquireQueued中繼續循環嘗試
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);
}
2.3 打斷與不可打斷實作原理
注意: park()被打斷時,不會清除打斷标記,sleep被打斷會清除打斷标記
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//打斷時,清除打斷标記
return Thread.interrupted();
}
是以這裡使用Thread.interrupted()方法,除了判斷線程是否被打斷外,還會清除打斷标記
好,峰回路轉,來到最初的地方
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
執行完,acquireQueued方法後,如果傳回true,代表線程被打斷,則會執行selfInterrupt(),再次将自己中斷!!!
所謂的不可打斷就是在這裡實作的!!!
在此模式下,即使它被打斷,仍會駐留在 AQS 隊列中,一直要等到獲得鎖後方能得知自己被打斷了
(即如果使用lock方法加鎖,在等待過程中被打斷,那麼他還是會去争搶鎖,如果搶鎖成功,然後傳回打斷标記(打斷标記是隻有搶鎖成功才會一同傳回!!),然後會自我中斷
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
那麼可打斷是怎樣實作的呢???
當我們調用Reentrant中的lockInterruptibly()方法時,會調用AQS中的acquireInterruptibly
如果其他線程調用了目前線程的interrupt方法,則目前線程會抛出InterruptedException,然後傳回
如果沒有被打斷,那麼tryAcquire()嘗試擷取鎖,如果擷取鎖失敗,然後進入doAcquireInterruptibly(arg)方法
在 park 過程中如果被 interrupt 會進入此, 這時候抛出異常, 而不會再次進入 for (;;)
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.1.1 解鎖
public void unlock() {
sync.release(1);
}
會調用AQS中的release方法
1.首先tryRelease進行釋放鎖,由于可重入鎖的緣故,隻有當state-- 為0,才會傳回true,那麼此時進入if塊
2.判斷目前隊列是否有結點,并且該節點的waitStatus是否為-1,如果兩者都滿足,則進入unparkSuccessor喚醒後繼節點
public final boolean release(int arg) {
// 嘗試釋放鎖
if (tryRelease(arg)) {
// 隊列頭節點 unpark
Node h = head;
if (
// 隊列不為 null
h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0
) {
// unpark AQS 中等待的線程,後繼節點 進入 ㈡
unparkSuccessor(h);
}
return true;
}
return false;
}
tryRelease方法同樣需要子類實作,但是
公平和非公平鎖的釋放鎖是一樣!
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支援鎖重入, 隻有 state 減為 0, 才釋放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
假設此時狀态如下所示:釋放鎖的線程已經把state置位0,setExclusiveOwnerThread(null)為null,接下來需要喚醒阻塞隊列中的線程!
首先擷取頭結點的waitStatus,嘗試重置為0.允許失敗
該方法找到隊列中離 head
最近的一個 Node(沒取消的)
,unpark 恢複其運作,
// ㈡ AQS 繼承過來的方法, 友善閱讀, 放在此處
//參數是頭結點
private void unparkSuccessor(Node node) {
// 如果狀态為 Node.SIGNAL 嘗試重置狀态為 0
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 找到需要 unpark 的節點, 但本節點從 AQS 隊列中脫離, 是由喚醒節點完成的
Node s = node.next;
// 不考慮已取消的節點, 從 AQS 隊列從後至前找到隊列最前面需要 unpark 的節點
//找到隊列中離 head 最近的一個 Node(沒取消的),unpark 恢複其運作,
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
unpark該線程後,線程會回到當初acquireQueued中park的地方繼續去擷取鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//得到node的前驅節點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 還是需要獲得鎖後, 才能傳回打斷狀态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//這裡個人感覺不會執行,有大佬知道請評論區指點!!
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//打斷時,清除打斷标記
return Thread.interrupted();
}
2.4 擷取鎖逾時實作
先看一下tryLock方法
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
發現: tryLock不會引起目前線程阻塞,擷取不到鎖就立即傳回傳回false了
由于tryLock調用的是nonfairTryAcquire(1),是以使用的是非公平政策!!
下面是設定了逾時時間的tryLock,如果逾時時間到,沒有擷取到鎖,則傳回false
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.5 條件變量的實作
先回憶一下條件變量的使用:
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的煙");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送煙來了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐來了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
輸出
18:52:27.680 [main] c.TestCondition - 送早餐來了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送煙來了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的煙
一個鎖對應一個AQS阻塞隊列,可以對應多個條件變量,每個條件變量有自己的一個條件隊列
開篇我們提到過AQS中有一個内部類ConditionObject
我們在lock.newCondition()的時候,其實是new了一個在AQS内部聲明的ConditionObject對象;
需要注意的是,AQS隻提供了ConditionObject的實作,并沒有提供newCondition函數,該函數用來new一個ConditionObject對象,需要由AQS的子類來提供newConditionObject函數
await流程
假設此時Thread-0持有鎖,調用await,進入ConditionObject的addConditionWaiter流程,建立新的 Node 狀态為 -2(Node.CONDITION),關聯 Thread-0,加入等待隊列尾部
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//所有已經取消的Node從隊列删除
unlinkCancelledWaiters();
t = lastWaiter;
}
// 建立一個關聯目前線程的新 Node, 添加至隊列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
接下來進入AQS的fullyRelease流程,釋放同步器上的鎖(如果是可重入的,需要都釋放掉)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
unpark AQS 隊列中的下一個節點,競争鎖,假設沒有其他競争線程,那麼 Thread-1 競争成功
然後阻塞Thread-0
signal流程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
什麼情況transferForSignal(first)會不成功??
即該線程在等待過程中被打斷或逾時就會放棄對該鎖的擷取,那就沒必要再添加到隊列尾部
private void doSignal(Node first) {
do {
//已經是尾結點了
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 将等待隊列中的 Node 轉移至 AQS 隊列, 不成功且還有節點則繼續循環
} while (!transferForSignal(first) &&
// 隊列還有節點
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
如果狀态已經不是 Node.CONDITION, 說明被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
加入 AQS 隊列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (
// 上一個節點被取消
ws > 0 ||
// 上一個節點不能設定狀态為 Node.SIGNAL
!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
) {
// unpark 取消阻塞, 讓線程重新同步狀态
LockSupport.unpark(node.thread);
}
return true;
}