天天看點

CountDownLatch 中AQS的使用

CountDownLatch中AQS的使用

CountDownLatch 是 JAVA JUC包下提供的一個計數器.主要用于多任務進行中需要同步傳回的場景.

主要的方法

方法名 描述
countDown() 利用CAS操作完成計數器減一計算
await() 阻塞線程 直到計數器到0為止
await(long timeout, TimeUnit unit) 阻塞線程 直到計數器為0 或者執行時間到達timeout

要學習AQS主要看下await 和 await(timeout,unit) 2個方法 和 Sync内部類

CountDownLatch 内部類 Sync(繼承了AQS)

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == ) ?  : -;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            /** for循環保證了不停的嘗試cas直到成功更新 */
            for (;;) {
                int c = getState();
                if (c == )
                    return false;
                int nextc = c-;
                /** 利用cas比較記憶體中的state值是否與期望值c一緻
                 *一緻就更新記憶體中的state值為nextc 
                 */
                if (compareAndSetState(c, nextc))
                    return nextc == ;
            }
        }
    }
    //to do...
    }
           

countDown方法

public void countDown() {
        sync.releaseShared();
    }
   /** AbstractQueuedSynchronizer 父類方法 */
   public final boolean releaseShared(int arg) {
           /** tryReleaseShared sync子類實作 */
        if (tryReleaseShared(arg)) {
            /**釋放共享鎖 實際上在countdownLatch中未用到*/
            doReleaseShared();
            return true;
        }
        return false;
    }
           

await方法

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly();
    }
    /** AbstractQueuedSynchronizer類中的方法 */
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            /** tryAcquireShared 子類實作 Sync 
             * 發現計數器未到0 傳回-1 執行
             * doAcquireSharedInterruptibly
             */
        if (tryAcquireShared(arg) < )
            doAcquireSharedInterruptibly(arg);
    }

        private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            /**阻塞線程 不停的嘗試擷取state狀态 直到為0 */
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= ) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           

await(timeout,unit) 方法

public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(, unit.toNanos(timeout));
    }
    /** AbstractQueuedSynchronizer類中的方法 */
     public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            /** 這裡不同的是 多了個或條件
             * 如果state減為0 傳回true
             * 或者執行doAcquireSharedNanos
             */
        return tryAcquireShared(arg) >=  ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

        private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= L)
            return false;
        /** 計算逾時時間 */
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
        /** 在未逾時情況下不停嘗試擷取state狀态 */
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= ) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                /** 逾時情況 不用管state狀态 直接從阻塞狀态跳出 */
                if (nanosTimeout <= L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }