1.簡介
AQS(AbstractQueuedSynchronizer)本身是一個抽象類,主要的使用方法是繼承它作為一個内部類,JDK中許多并發工具類的内部實作都依賴于AQS,如ReentrantLock, Semaphore, CountDownLatch等等。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable
2.實作思路:
AQS的實作依賴内部的同步隊列(FIFO雙向隊列),線程首先會嘗試擷取鎖,如果失敗,則将目前線程以及等待狀态等資訊包成一個node節點加到同步隊列尾部,接着會不斷循環嘗試擷取鎖(條件是目前節點為head的直接後繼才會嘗試),如果失敗則會阻塞自己,直至被喚醒;而當持有鎖的線程釋放鎖時,會喚醒隊列中的後繼線程。
2.1 主要作用:
1.同步狀态的管理
2.線程的阻塞和喚醒
3.同步隊列的維護
2.2 鎖的擷取
調用内部類的sync.acquire(1)方法,該方法會調用tryacquire()方法來試着擷取獨占鎖,一般該方法需要重寫
@Override
public void lock() {
sync.acquire(1);
}
@Override
protected boolean tryAcquire(int arg){
assert arg == 1;
System.out.println("111");
//原子設定目前狀态
if(compareAndSetState(0,1)){ //原子操作
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
2.3 鎖的釋放
調用release()方法,該方法再調用tryRelease()方法。
3 API簡介
- int getState(): 擷取同步狀态
- void setState(): 設定同步狀态
-
boolean compareAndSetState(int expect, int update):基于CAS,原子設定目前狀态
自定義同步工具的話,一般都會基于覆寫以下幾個方法來實作同步狀态的管理
方法 | 描述 |
---|---|
boolean tryAcquire(int arg) | 試擷取獨占鎖 |
boolean tryRelease(int arg) | 試釋放獨占鎖 |
int tryAcquireShared(int arg) | 試擷取共享鎖 |
boolean tryReleaseShared(int arg) | 試釋放共享鎖 |
boolean isHeldExclusively() | 目前線程是否獲得了獨占鎖 |
内嵌Node的定義
static final class Node {
/**
* 用于标記一個節點在共享模式下等待
*/
static final Node SHARED = new Node();
/**
* 用于标記一個節點在獨占模式下等待
*/
static final Node EXCLUSIVE = null;
/**
* 等待狀态:取消
*/
static final int CANCELLED = 1;
/**
* 等待狀态:通知
*/
static final int SIGNAL = -1;
/**
* 等待狀态:條件等待
*/
static final int CONDITION = -2;
/**
* 等待狀态:傳播
*/
static final int PROPAGATE = -3;
/**
* 等待狀态
*/
volatile int waitStatus;
/**
* 前驅節點
*/
volatile Node prev;
/**
* 後繼節點
*/
volatile Node next;
/**
* 節點對應的線程
*/
volatile Thread thread;
/**
* 等待隊列中的後繼節點
*/
Node nextWaiter;
/**
* 目前節點是否處于共享模式等待
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 擷取前驅節點,如果為空的話抛出空指針異常
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) {
throw new NullPointerException();
} else {
return p;
}
}
Node() {
}
/**
* addWaiter會調用此構造函數
*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/**
* Condition會用到此構造函數
*/
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
3 節點等待的狀态定義
值 | 描述 |
---|---|
CANCELLED (1) | 目前線程因為逾時或者中斷被取消。這是一個終結态,也就是狀态到此為止 |
SIGNAL (-1) | 目前線程的後繼線程被阻塞或者即将被阻塞,目前線程釋放鎖或者取消後需要喚醒後繼線程。這個狀态一般都是後繼線程來設定前驅節點的 |
CONDITION (-2) | 目前線程在condition隊列中。 |
PROPAGATE (-3) | 用于将喚醒後繼線程傳遞下去,這個狀态的引入是為了完善和增強共享鎖的喚醒機制。在一個節點成為頭節點之前,是不會躍遷為此狀态的 |
表示無狀态。 |
3.1 擷取獨占鎖的代碼實作
/**
* 擷取獨占鎖,對中斷不敏感。
* 首先嘗試擷取一次鎖,如果成功,則傳回;
* 否則會把目前線程包裝成Node插入到隊列中,在隊列中會檢測是否為head的直接後繼,并嘗試擷取鎖,
* 如果擷取失敗,則會通過LockSupport阻塞目前線程,直至被釋放鎖的線程喚醒或者被中斷,随後再次嘗試擷取鎖,如此反複。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 在隊列中新增一個節點。
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 快速嘗試
if (pred != null) {
node.prev = pred;
// 通過CAS在隊尾插入目前節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 初始情況或者在快速嘗試失敗後插入節點
enq(node);
return node;
}
/**
* 通過循環+CAS在隊列中成功插入一個節點後傳回。
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 初始化head和tail
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
* AQS的精妙就是展現在很多細節的代碼,比如需要用CAS往隊尾裡增加一個元素
* 此處的else分支是先在CAS的if前設定node.prev = t,而不是在CAS成功之後再設定。
* 一方面是基于CAS的雙向連結清單插入目前沒有完美的解決方案,另一方面這樣子做的好處是:
* 保證每時每刻tail.prev都不會是一個null值,否則如果node.prev = t
* 放在下面if的裡面,會導緻一個瞬間tail.prev = null,這樣會使得隊列不完整。
*/
node.prev = t;
// CAS設定tail為node,成功後把老的tail也就是t連接配接到node。
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 在隊列中的節點通過此方法擷取鎖,對中斷不敏感。
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
* 檢測目前節點前驅是否head,這是試擷取鎖的資格。
* 如果是的話,則調用tryAcquire嘗試擷取鎖,
* 成功,則将head置為目前節點。
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
* 如果未成功擷取鎖則根據前驅節點判斷是否要阻塞。
* 如果阻塞過程中被中斷,則置interrupted标志位為true。
* shouldParkAfterFailedAcquire方法在前驅狀态不為SIGNAL的情況下都會循環重試擷取鎖。
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 根據前驅節點中的waitStatus來判斷是否需要阻塞目前線程。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驅節點設定為SIGNAL狀态,在釋放鎖的時候會喚醒後繼節點,
* 是以後繼節點(也就是目前節點)現在可以阻塞自己。
*/
return true;
if (ws > 0) {
/*
* 前驅節點狀态為取消,向前周遊,更新目前節點的前驅為往前第一個非取消節點。
* 目前線程會之後會再次回到循環并嘗試擷取鎖。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/**
* 等待狀态為0或者PROPAGATE(-3),設定前驅的等待狀态為SIGNAL,
* 并且之後會回到循環再次重試擷取鎖。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 該方法實作某個node取消擷取鎖。
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 周遊并更新節點前驅,把node的prev指向前部第一個非取消節點。
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 記錄pred節點的後繼為predNext,後續CAS會用到。
Node predNext = pred.next;
// 直接把目前節點的等待狀态置為取消,後繼節點即便也在cancel可以跨越node節點。
node.waitStatus = Node.CANCELLED;
/*
* 如果CAS将tail從node置為pred節點了
* 則剩下要做的事情就是嘗試用CAS将pred節點的next更新為null以徹底切斷pred和node的聯系。
* 這樣一來就斷開了pred與pred的所有後繼節點,這些節點由于變得不可達,最終會被回收掉。
* 由于node沒有後繼節點,是以這種情況到這裡整個cancel就算是處理完畢了。
*
* 這裡的CAS更新pred的next即使失敗了也沒關系,說明有其它新入隊線程或者其它取消線程更新掉了。
*/
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果node還有後繼節點,這種情況要做的事情是把pred和後繼非取消節點拼起來。
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
/*
* 如果node的後繼節點next非取消狀态的話,則用CAS嘗試把pred的後繼置為node的後繼節點
* 這裡if條件為false或者CAS失敗都沒關系,這說明可能有多個線程在取消,總歸會有一個能成功的。
*/
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
/*
* 這時說明pred == head或者pred狀态取消或者pred.thread == null
* 在這些情況下為了保證隊列的活躍性,需要去喚醒一次後繼線程。
* 舉例來說pred == head完全有可能實際上目前已經沒有線程持有鎖了,
* 自然就不會有釋放鎖喚醒後繼的動作。如果不喚醒後繼,隊列就挂掉了。
*
* 這種情況下看似由于沒有更新pred的next的操作,隊列中可能會留有一大把的取消節點。
* 實際上不要緊,因為後繼線程喚醒之後會走一次試擷取鎖的過程,
* 失敗的話會走到shouldParkAfterFailedAcquire的邏輯。
* 那裡面的if中有處理前驅節點如果為取消則維護pred/next,踢掉這些取消節點的邏輯。
*/
unparkSuccessor(node);
}
/*
* 取消節點的next之是以設定為自己本身而不是null,
* 是為了友善AQS中Condition部分的isOnSyncQueue方法,
* 判斷一個原先屬于條件隊列的節點是否轉移到了同步隊列。
*
* 因為同步隊列中會用到節點的next域,取消節點的next也有值的話,
* 可以斷言next域有值的節點一定在同步隊列上。
*
* 在GC層面,和設定為null具有相同的效果。
*/
node.next = node;
}
}
/**
* 喚醒後繼線程。
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 嘗試将node的等待狀态置為0,這樣的話,後繼争用線程可以有機會再嘗試擷取一次鎖。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
/*
* 這裡的邏輯就是如果node.next存在并且狀态不為取消,則直接喚醒s即可
* 否則需要從tail開始向前找到node之後最近的非取消節點。
*
* 這裡為什麼要從tail開始向前查找也是值得琢磨的:
* 如果讀到s == null,不代表node就為tail,參考addWaiter以及enq函數中的我的注釋。
* 不妨考慮到如下場景:
* 1. node某時刻為tail
* 2. 有新線程通過addWaiter中的if分支或者enq方法添加自己
* 3. compareAndSetTail成功
* 4. 此時這裡的Node s = node.next讀出來s == null,但事實上node已經不是tail,它有後繼了!
*/
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
3.2 釋放獨占鎖的實作
釋放線程需要做的事情是喚醒隊列中的後繼線程,讓它嘗試去擷取獨占鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {
/*
* 此時的head節點可能有3種情況:
* 1. null (AQS的head延遲初始化+無競争的情況)
* 2. 目前線程在擷取鎖時new出來的節點通過setHead設定的
* 3. 由于通過tryRelease已經完全釋放掉了獨占鎖,有新的節點在acquireQueued中擷取到了獨占鎖,并設定了head
* 第三種情況可以再分為兩種情況:
* (一)時刻1:線程A通過acquireQueued,持鎖成功,set了head
* 時刻2:線程B通過tryAcquire試圖擷取獨占鎖失敗失敗,進入acquiredQueued
* 時刻3:線程A通過tryRelease釋放了獨占鎖
* 時刻4:線程B通過acquireQueued中的tryAcquire擷取到了獨占鎖并調用setHead
* 時刻5:線程A讀到了此時的head實際上是線程B對應的node
* (二)時刻1:線程A通過tryAcquire直接持鎖成功,head為null
* 時刻2:線程B通過tryAcquire試圖擷取獨占鎖失敗失敗,入隊過程中初始化了head,進入acquiredQueued
* 時刻3:線程A通過tryRelease釋放了獨占鎖,此時線程B還未開始tryAcquire
* 時刻4:線程A讀到了此時的head實際上是線程B初始化出來的傀儡head
*/
Node h = head;
// head節點狀态不會是CANCELLED,是以這裡h.waitStatus != 0相當于h.waitStatus < 0
if (h != null && h.waitStatus != 0)
// 喚醒後繼線程,此函數在acquire中已經分析過,不再列舉說明
unparkSuccessor(h);
return true;
}
return false;
}
3.3 擷取共享鎖的實作
與擷取獨占鎖的實作不同的關鍵在于,共享鎖允許多個線程持有。
如果需要使用AQS中共享鎖,在實作tryAcquireShared方法時需要注意,傳回負數表示擷取失敗;傳回0表示成功,但是後繼争用線程不會成功;傳回正數表示
擷取成功,并且後繼争用線程也可能成功。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 一旦共享擷取成功,設定新的頭結點,并且喚醒後繼線程
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 這個函數做的事情有兩件:
* 1. 在擷取共享鎖成功後,設定head節點
* 2. 根據調用tryAcquireShared傳回的狀态以及節點本身的等待狀态來判斷是否要需要喚醒後繼線程。
*/
private void setHeadAndPropagate(Node node, int propagate) {
// 把目前的head封閉在方法棧上,用以下面的條件檢查。
Node h = head;
setHead(node);
/*
* propagate是tryAcquireShared的傳回值,這是決定是否傳播喚醒的依據之一。
* h.waitStatus為SIGNAL或者PROPAGATE時也根據node的下一個節點共享來決定是否傳播喚醒,
* 這裡為什麼不能隻用propagate > 0來決定是否可以傳播在本文下面的思考問題中有相關講述。
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 這是共享鎖中的核心喚醒函數,主要做的事情就是喚醒下一個線程或者設定傳播狀态。
* 後繼線程被喚醒後,會嘗試擷取共享鎖,如果成功之後,則又會調用setHeadAndPropagate,将喚醒傳播下去。
* 這個函數的作用是保障在acquire和release存在競争的情況下,保證隊列中處于等待狀态的節點能夠有辦法被喚醒。
*/
private void doReleaseShared() {
/*
* 以下的循環做的事情就是,在隊列存在後繼線程的情況下,喚醒後繼線程;
* 或者由于多線程同時釋放共享鎖由于處在中間過程,讀到head節點等待狀态為0的情況下,
* 雖然不能unparkSuccessor,但為了保證喚醒能夠正确穩固傳遞下去,設定節點狀态為PROPAGATE。
* 這樣的話擷取鎖的線程在執行setHeadAndPropagate時可以讀到PROPAGATE,進而由擷取鎖的線程去釋放後繼等待線程。
*/
for (;;) {
Node h = head;
// 如果隊列中存在後繼線程。
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 如果h節點的狀态為0,需要設定為PROPAGATE用以保證喚醒的傳播。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 檢查h是否仍然是head,如果不是的話需要再進行循環。
if (h == head)
break;
}
}
3.4 釋放共享鎖
釋放共享鎖與擷取共享鎖的代碼共享了doReleaseShared,用于實作喚醒的傳播。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// doReleaseShared的實作上面擷取共享鎖已經介紹
doReleaseShared();
return true;
}
return false;
}
參考部落格:https://www.cnblogs.com/micrari/p/6937995.html