目錄
-
- AQS實作線程通信原理
- 相關類源碼分析
在上一篇并發多線程之AQS源碼分析(上)中介紹了關于獨占鎖相關的方法,本篇來介紹AQS實作線程間通信Condition及ConditionObject等相關的源碼分析。當然在梳理之前我們先把原理搞懂可以起到事半功倍的效果。
AQS實作線程通信原理
關于條件隊列
在AQS中線程間通信是通過引入條件隊列來實作,條件隊列在前面已經介紹,條件隊列是一個單向的連結清單結構。如下:
AQS通過引入條件隊列,線上程不滿足運作條件時,将線程添加至條件隊列中并阻塞,當其他線程通過喚醒方法喚醒條件隊列中的線程後,線程由條件隊列添加至同步隊列中等待被喚醒。
兩個隊列的關系如下:
這裡需要說明一下幾點:
- 節點由條件隊列中添加至同步隊列時,會從條件隊列中剔除。
- 線程每次被放入條件隊列時,都是建立一個Node執行個體,是以可能會存在持有目前線程的多個Node分别位于條件隊列和同步隊列中,這點是需要注意的,源碼中也有很多相關的判斷。
線程間通信流程
以最常見的一個示例來說明。代碼如下:
public class LockDemo {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void block() {
try {
lock.lock();
while(!check()) {
condition.await();
}
// doSomething
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void nonBlock() {
try {
lock.lock();
// doSomething
if (check()) {
condition.signalAll();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
}
lock.unlock();
}
private boolean check() {
// 檢查條件是否繼續執行
return false;
}
}
那麼針對上面示例代碼,在AQS中執行的流程如下:
這裡簡要的說明一下流程:
- 首先線程需要搶占鎖,當然對于沒有搶占到的流程在上一篇中已經說明,在此不再贅述。
- 線程搶占到鎖以後,執行代碼塊,當不滿足條件時,那麼會調用await方法。
- 在調用await方法後,需要循環判斷目前線程是否在同步隊列中(為什麼要循環判斷,我的了解是線程位于同步隊列中随時可能被喚醒,及時目前線程被阻塞同時運作條件不滿足,那麼這種喚醒時無意思的,是以需要保證目前線程不再同步隊列中),目前線程不在同步隊列中則阻塞桑倩線程。
- 滿足條件的線程繼續運作,當滿足喚醒線程的條件時則喚醒條件隊列中的線程。喚醒的過程主要是将條件隊列中的節點追加至同步隊列中并且将節點的前置節點狀态修改為SIGNAL狀态,便于喚醒。
- 當同步隊列中線程執行到步驟4追加的節點後,喚醒調用await阻塞的線程。
- 被喚醒的由于調用await阻塞的線程進行鎖資源的搶占。當擷取到鎖資源後,執行後續的代碼。當然如果條件不滿足則會再次調用await方法進入條件隊列進而從步驟3開始運作。
- 以上步驟反複執行知道執行完代碼并釋放鎖,進而從同步隊列中喚醒後繼線程。
相關類源碼分析
AbstractQueuedSynchronizer類中與條件隊列相關的方法
- 判斷節點是否在同步隊列。
該方法主要是判斷參數節點是否在同步隊列中,當然提供了一些快捷判斷,如狀态為CONDITION的話那就在條件隊列中,同時如果後置節點也能證明在同步隊列中,為什麼不用前置節點不為空來判斷呢?因為同步隊列中頭節點的前置節點為空不能作為判斷依據,當這些快捷方法不起作用後,則調用findNodeFromTail來查找參數節點。final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) return true; return findNodeFromTail(node); }
- 周遊同步隊列查找節點。
該方法是從同步隊列的尾節點查找參數節點,存在傳回true,不存在傳回false。private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
- 完全釋放資源。
将目前線程持有的所有資源釋放。final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
- 将條件隊列中節點添加至同步隊列。
首先将參數節點的狀态由CONDITION狀态修改為0,當然沒有成功則直接傳回false。修改狀态後将節點加入同步隊列中,并且将同步隊列中前置節點狀态修改為SIGNAL便于後續的喚醒。final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
- 取消等待節點傳輸。
該方法是線上程取消等待後将其添加同步隊列中,如果取消操作是在被喚醒之前則傳回true。後續的循環是保證目前線程被添加至同步隊列中,這裡就是一個競态條件,目前線程可能被其他線程通過喚醒添加到同步隊列中了,或者是在目前線在阻塞前被取消了,是以會将自己添加到同步隊列,而這裡就是處理了這兩種情況下避免重複加入同步隊列。final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
以上就是AbstractQueuedSynchronizer類中關于條件隊列的相關操作。接下來來了解一下條件隊列的實作類ConditionObject的屬性和方法。
條件隊列的實作ConditionObject
- 條件隊列的頭節點和尾節點。
條件隊列中的頭節點和尾節點,兩者為空代表隊列為空。private transient Node firstWaiter; private transient Node lastWaiter;
- 無參構造。
- 向條件隊列中增加節點。
該方法首先是判斷隊列中是否可能存在取消的節點,存在則将取消的節點從同步隊列中剔除。在剔除完成後将目前線程封裝成Node添加到條件隊列中。private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
- 剔除取消隊列。
從條件隊列的頭節點向後周遊将取消的節點剔除。private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
- 從條件隊列中喚醒線程。
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
該方法從給定參數節點開始向條件隊列尾部周遊,将第一個waitStatus為CONDIITON的節點追加到同步隊列中。
注意:該方法隻能轉移一個或0個節點。
- 從條件隊列中喚醒所有的線程。
在該方法中,周遊條件隊列,将所有節點轉移至同步隊列中,同時将firstWaiter和lastWaiter均置為空。private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
doSignal和doSignalAll兩個方法均沒有判斷節點waitStatus為CONDIITON,因為在transferForSignal方法中進行了判斷,是通過compareAndSetWaitStatus(node, Node.CONDITION, 0)方法使用CAS判斷,不為CONDITOON則更新失敗,傳回false。
- 喚醒等待線程。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
1>通過AbstractQueuedSychronizeder的isHeldExclusively方法判斷獨占線程是 否是目前線程。
2>将條件隊列中的第一個節點調用doSignal方法喚醒。
- 喚醒所有等待線程。
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
1>判斷目前線程是否是獨占線程。
2>判斷條件隊列是否為空。
3>條件不為空則調用doSignalAll方法将所有節點喚醒。
- 判斷在等待時是否被中斷。
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
1>判斷目前線程的打斷狀态,沒有打斷則傳回0。
2>打斷,則通過調用transferAfterCancelledWait(Node node)方法判斷目前節點是有目前線程入隊,還是其他線程入隊。
3>如果是目前線程入隊,則傳回THROW_IE。
4>如果是其他線程入隊則傳回REINTERRUPT。
- 根據辨別是否再次中斷。
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
1>通過interruptMode來執行指定操作。
2>interruptMode為THROW_IE則抛出異常。
3>interruptMode為REINTERRUPT則代用selfInterrupt()修改中斷狀态。
- 等待(條件隊列的核心)。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
1>通過 Thread.interrupted()判斷,線程被打斷過則抛出異常。
2>調用addConditionWaiter()方法将目前節點添加至條件隊列中。
3>調用fullyRelease方法将目前資源全部釋放。
4>循環判斷目前節點是否在同步隊列中,不在則阻塞目前線程。
5>在循環中,除正常喚醒外,interrupt也會喚醒目前線程,此時局部變量interruptMode會更改為checkInterruptWhileWaiting傳回的值。
6>目前線程被喚醒後搶占資源過程中被打斷,并且目前節點不是目前線程添加至同步隊列的将interruptMode修改為REINTERRUPT。
7>從目前節點清除條件隊列中狀态不為CONDITION的節點。
8>調用reportInterruptAfterWait方法判斷是抛出異常還是修改線程中斷狀态。
以上的方法主要就是AQS的條件隊列實作的線程通信的相關源碼,當然隊列可中斷的及指定時間等待與await大同小異,await了解後其他的也沒有那麼困難。接下來就是介紹基于AQS實作的可重入鎖Lock及ReentrantLock,還有線程通信相關的Condition及ConditionObject是如何協作Lock進行的。這裡需要主要的是ConditionObject是AbstractQueuedSynchronizer的一個内部類(非靜态),是以ConditionObject的執行個體是與AQS的執行個體綁定的。同時一個AQS可以對應多個ConditionObject執行個體,每個ConditionObject對應一個條件隊列,所有AQS可以建立多個不同條件的條件隊列來存儲由于不同條件而阻塞的線程,這一點是需要意識到的,當然同步隊列對于一個AQS執行個體則隻有一個。