1 基本設計
一種同步輔助,允許一或多個線程等待,直到在其它線程中執行的一組操作完成。
用給定的 count 初始化。由于調用countDown(),await 方法會阻塞,直到目前計數為0,之後釋放所有等待線程,并立即傳回任何後續的 await 調用。計數無法重置,若需重置計數,可使用CyclicBarrier。
CountDownLatch 是一種通用的同步工具,可用于:
count為1時初始化的CountDownLatch用作簡單的門開關:所有調用wait的線程都在門口等待,直到調用countDown()的線程打開它
初始化為N的CountDownLatch可用來讓一個線程等待,直到N個線程完成某動作或某動作已完成N次
CountDownLatch的一個有用的特性是,它不需要調用倒計時的線程等待計數達到0才繼續,它隻是防止任何線程繼續等待,直到所有線程都通過。
2 類架構
CountDownLatch并未顯式繼承什麼接口或類。
構造器
構造一個用給定計數初始化的CountDownLatch。
參數 count :線上程通過await()前,必須調用countDown()的次數。
CountDownLatch 的 state 并非 AQS 的預設值 0,而是可指派的,即在 CountDownLatch 初始化時,count 就代表 state 的初始值。
new Sync(count) 就是調用了内部類 Sync 的如下構造器
count 表示我們希望等待的線程數,可能是等待一組線程全部啟動或執行完成。
内部類
和 ReentrantLock 一樣,CountDownLatch類也存在一個内部同步器 Sync,繼承了 AbstractQueuedSynchronizer
唯一字段:
private static final class Sync extends AbstractQueuedSynchronizer {
...
// 傳回目前計數
int getCount() {
return getState();
}
// 共享模式下擷取鎖
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 共享模式下的釋放鎖
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 擷取鎖狀态
int c = getState();
// 鎖未被任何線程持有
if (c == 0)
return false;
// 對 state 進行遞減,直到 state 變成 0;當 state 遞減為 0 時,才傳回 true
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
3 await
可以叫做等待,也可以稱為加鎖。
3.1 無參
使得目前線程等待,直到鎖存器 count 為0,除非線程被中斷。
若目前計數為零,則此方法立即傳回。
若目前線程數>0,則目前線程将出于線程排程的目的而被禁用,并處于睡眠狀态,直到發生如下情況之一:
由于調用countDown()方法,count為0
其他線程中斷了目前線程
若目前線程:
在進入此方法時,已設定其中斷狀态
或在等待時被中斷
就會抛 InterruptedException,并清除目前線程的中斷狀态。
無參版 await 内部使用的是 acquireSharedInterruptibly 方法,實作在 AQS 中的 final 方法:
使用CountDownLatch 的内部類 Sync 重寫的tryAcquireShared 嘗試獲得鎖,若擷取到鎖直接傳回,擷取不到走2
擷取鎖失敗,用 Node 封裝一下目前線程,追加到同步隊列尾部,等待在合适時機去獲得鎖,本步已完全實作在 AQS 中
3.2 帶逾時參數
最終都會轉化成ms
相比于無參版本,或者指定的等待時間已過。
如果目前計數為零,則此方法立即傳回值 true。
如果目前線程數大于0,則目前線程将出于線程排程的目的而禁用,并處于休眠狀态,直到發生以下三種情況之一:
由于調用了countDown()方法,計數為零;或
其他一些線程中斷目前線程;或
指定的等待時間已經過了
如果計數為零,則該方法傳回值true。
在進入此方法時已設定其中斷狀态;或
在等待時中斷,
就會抛出InterruptedException,并清除目前線程的中斷狀态。
如果指定的等待時間過期,則傳回false值。如果時間小于或等于0,則該方法根本不會等待。
使用的是 AQS# tryAcquireSharedNanos 方法
獲得鎖時,state 的值不會發生變化,像 ReentrantLock 在獲得鎖時,會把 state + 1,但 CountDownLatch 不會
4 countDown
降低鎖存器的計數,如果計數為 0,則釋放所有等待的線程。
如果目前計數大于零,則遞減。如果新計數為零,那麼所有等待的線程都将重新啟用,以便進行線程排程。
如果目前計數等于0,則什麼也不會發生。
releaseShared 已經完全實作在 AQS
主要分成兩步:
- 嘗試釋放鎖(tryReleaseShared),鎖釋放失敗直接傳回,釋放成功走 2,本步由 Sync 實作
- 釋放目前節點的後置等待節點,該步 AQS 已經完全實作
5 最佳實踐
Kafka中的線程控制代碼大量使用CountDownLatch實作優雅的線程啟動、線程關閉等操作。
6 總結
研究完 CountDownLatch 的源碼,可知其底層結構仍然依賴了 AQS,對其線程所封裝的結點是采用共享模式,而 ReentrantLock 是采用獨占模式。