CountDownLatch中AQS的使用
CountDownLatch 是 JAVA JUC包下提供的一個計數器.主要用于多任務進行中需要同步傳回的場景.
主要的方法
方法名 | 描述 |
---|---|
countDown() | 利用CAS操作完成計數器減一計算 |
await() | 阻塞線程 直到計數器到0為止 |
await(long timeout, TimeUnit unit) | 阻塞線程 直到計數器為0 或者執行時間到達timeout |
要學習AQS主要看下await 和 await(timeout,unit) 2個方法 和 Sync内部類
CountDownLatch 内部類 Sync(繼承了AQS)
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == ) ? : -;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
/** for循環保證了不停的嘗試cas直到成功更新 */
for (;;) {
int c = getState();
if (c == )
return false;
int nextc = c-;
/** 利用cas比較記憶體中的state值是否與期望值c一緻
*一緻就更新記憶體中的state值為nextc
*/
if (compareAndSetState(c, nextc))
return nextc == ;
}
}
}
//to do...
}
countDown方法
public void countDown() {
sync.releaseShared();
}
/** AbstractQueuedSynchronizer 父類方法 */
public final boolean releaseShared(int arg) {
/** tryReleaseShared sync子類實作 */
if (tryReleaseShared(arg)) {
/**釋放共享鎖 實際上在countdownLatch中未用到*/
doReleaseShared();
return true;
}
return false;
}
await方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly();
}
/** AbstractQueuedSynchronizer類中的方法 */
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
/** tryAcquireShared 子類實作 Sync
* 發現計數器未到0 傳回-1 執行
* doAcquireSharedInterruptibly
*/
if (tryAcquireShared(arg) < )
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
/**阻塞線程 不停的嘗試擷取state狀态 直到為0 */
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= ) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
await(timeout,unit) 方法
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(, unit.toNanos(timeout));
}
/** AbstractQueuedSynchronizer類中的方法 */
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
/** 這裡不同的是 多了個或條件
* 如果state減為0 傳回true
* 或者執行doAcquireSharedNanos
*/
return tryAcquireShared(arg) >= ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= L)
return false;
/** 計算逾時時間 */
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
/** 在未逾時情況下不停嘗試擷取state狀态 */
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= ) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
/** 逾時情況 不用管state狀态 直接從阻塞狀态跳出 */
if (nanosTimeout <= L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}