@[toc]
概述
CountDownLatch也是利用的AQS隊列,關于AQS隊列的講述請參考前面兩篇文章:
AQS類是一個模闆類,我們可以根據根據具體的需求通過重寫以下幾個方法來自定義實作同步器
- tryAcquire (排它鎖擷取)
- tryRelease (排它鎖釋放)
- tryAcquireShared (共享鎖擷取)
- tryReleasedShared (共享鎖釋放)
我們看一下官方文檔中的代碼案例:
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
class Driver {
public static int N = 5;
public static void main(String[] args) throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
System.out.println(System.currentTimeMillis() + "done");
}
private static void doSomethingElse() throws InterruptedException {
TimeUnit.SECONDS.sleep(2);
System.out.println(System.currentTimeMillis() + Thread.currentThread().getName() + " 執行任務");
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
}
void doWork() throws InterruptedException {
TimeUnit.SECONDS.sleep(new Random().nextInt(10) + 2);
System.out.println(System.currentTimeMillis() + Thread.currentThread().getName() + " 執行任務");
}
}
輸出結果:
1561552734980main 執行任務
1561552736980main 執行任務
1561552738980Thread-0 執行任務
1561552739980Thread-1 執行任務
1561552740981Thread-2 執行任務
1561552742981Thread-4 執行任務
1561552743981Thread-3 執行任務
1561552743981done
可以看到,CountDownLatch不僅可以實作一個倒計時器,計數器,還可以實作線程間的通信協調。
CountDownLatch初始化
public CountDownLatch(int count) {
if (count < 0) thrownew IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch初始化時會初始化state的值,這裡代表線程數。
private static final class Sync extends AbstractQueuedSynchronizer {
privatestatic final long serialVersionUID = 4982264981922014374L;
Sync(int count) {//初始化state的值
setState(count);
}
int getCount() {//countDownLatch可以擷取目前剩餘的state的數量
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// 遞減state的值
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
從代碼中可以看到,嘗試擷取資源(tryAcquireShared),state等于0時,傳回1,表示可以擷取鎖。其他同步器也是這樣的,隻要tryAcquireShared傳回>=0的值,表示可以擷取資源,CountDownLatch的使用具體後面會講。
釋放資源時(tryReleaseShared),每次都是遞減1,當state等于0時,傳回true。
await
接下來看看主線程await做了什麼事情
//目前線程會一直等到直到計數器減到0,或者目前線程中斷
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();//如果中斷,抛出異常
if (tryAcquireShared(arg) < 0)//tryAcquireShared小于0 說明state的值還沒有遞減到0
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly在父類AQS中:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//把目前等待的線程,以共享模式加入到AQS隊列中
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {//前面已經降到state減到0時tryAcquireShared傳回1,表示目前線程可以擷取鎖了,也就是目前被CountDownLatch阻塞的線程擷取鎖成功可以繼續執行了。
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
整體代碼邏輯與之前文章中講述的基本一緻。這裡隻說明一下CountDownLatch的特殊之處:
在設定了CountDownLatch以後,AQS中的state會有初始值,表示線程數。由于主線程(或者目前線程)會等待state的值減到0,否則一直等待。由于是使用的AQS隊列,是以主線程的等待過程展現在将主線程以共享模式添加到AQS隊列中,挂起,等待被喚醒。喚醒以後檢視state的值為0,說明擷取到了共享鎖,可以進入臨界區執行任務了。
countDown
countDown方法會調用AQS類的releaseShared方法
sync.releaseShared(1);
:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
其中doReleaseShared與
分析的一樣,就不在贅述。
CountDownLatch中tryReleaseShared的實作,其實上面已經講到了,tryReleaseShared隻是将state的值遞減1而已。
當減到0時傳回true,就會執行doReleaseShared方法了,doReleaseShared回去喚醒後繼節點。如此一來,被阻塞的主線程就會被喚醒了。
end.