天天看點

CountDownLatch & Semaphore 實作原理詳解

CountDownLatch 和 Semaphore 是Java中常用的兩個同步器。他們的用法百度一下一大堆,我就不多做介紹了。下面我将從源碼的角度分析一下這兩個類的實作原理。

閱讀本篇文章之前,建議先了解AQS同步器的原理。可以看我之前寫的一篇文章:

Java同步器架構-AQS原理&源碼解析

了解了AQS的底層原理,再來看CountDownLatch 和 Semaphore 的實作原理就會發現很簡單。

CountDownLatch 實作原理

CountDownLatch 内部定義了一個AQS的實作類

private final Sync sync;
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            //判斷state是否為0了,如果為0就傳回1,表示允許通過。傳回-1的話線程就要挂起等待state為0
            return (getState() == ) ?  : -;
        }

        protected boolean tryReleaseShared(int releases) {
            //通過自旋+CAS的操作将state-1。當state減到0的時候AQS就會去喚醒挂起中的線程
            for (;;) {
                int c = getState();
                if (c == )
                    return false;
                int nextc = c-;
                if (compareAndSetState(c, nextc))
                    return nextc == ;
            }
        }
}
public CountDownLatch(int count) {
        if (count < ) throw new IllegalArgumentException("count < 0");
        //初始化AQS同步器
        this.sync = new Sync(count);
}
public void await() throws InterruptedException {
        //嘗試擷取資源,等待的時候支援線程響應中斷
        sync.acquireSharedInterruptibly();
}
public void countDown() {
        //釋放資源
        sync.releaseShared();
}
           

可以看出,countDown()和await()方法的底層其實都是依賴于Sync。

Sync是一個AQS同步器,實作了 tryAcquireShared()和tryReleaseShared(),這是一個共享模式下的同步器。

CountDownLatch在初始化的時候傳入一個同步線程數量count表示AQS的state。然後執行await()的時候,AQS會先判斷state是否為0,state不為0的話就會阻塞目前線程,并讓目前線程進入AQS的CLH隊列中排隊。

當有線程執行countDown的時候,Sync會通過cas+自旋的方式将state減一,然後判斷state是否等于0。等于0的時候傳回true,AQS發現tryReleaseShared()傳回true,就會去喚醒正在CLH隊列中排隊等待的線程,先喚醒排在最前面的那個線程。由于是共享模式,那個線程被喚醒後,檢查state=0了,就結束阻塞,并且會通知下一個排隊線程,下一個線程醒來後,一樣判斷state是否等于0了,然後結束阻塞,通知下一個,一直循環下去,直到所有阻塞中的線程全部被喚醒。

Semaphore 實作原理

也是基于AQS實作的。内部也定義了一個Sync ,AQS的實作類

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = L;
        //定義初始的許可證的數量
        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        //非公平的政策
        final int nonfairTryAcquireShared(int acquires) {
            //用CAS+自旋的方式擷取資源
            for (;;) {
                //直接去擷取資源
                //假設現在CLH隊列中有人在排隊了,然後剛好有線程釋放了一個資源并且喚醒隊列中等待的線程了
                //然後喚醒後還沒來得及擷取到資源,就被人插隊擷取了資源
                //下面的代碼就是嘗試不排隊直接擷取資源,擷取不到資源再去排隊,是以這是一種不公平的算法
                int available = getState();
                int remaining = available - acquires;
                if (remaining <  ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        //釋放資源
        protected final boolean tryReleaseShared(int releases) {
            //用CAS+自旋的方式釋放資源
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current ==  || compareAndSetState(current, ))
                    return current;
            }
        }
    }

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            //采用非公平的方式擷取資源
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            //用CAS+自旋的方式嘗試擷取資源
            for (;;) {
                //判斷CLH隊列中是否有人在排隊等待,如果是的話直接傳回-1,表示也進入隊列等待
                if (hasQueuedPredecessors())
                    return -;
                //如果隊列中沒有人在排隊,就嘗試擷取資源了
                int available = getState();
                int remaining = available - acquires;
                if (remaining <  ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
}
//預設選擇非公平的AQS同步器
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
      //可以選擇公平的還是不公平的同步器
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly();
}

public void release() {
        sync.releaseShared();
}
           

在Semaphore 的例子中,AQS的state就可以直接看成是一種資源(許可證)。

acquire()

方法就是嘗試擷取資源,當資源不足的的時候,就進入CLH隊列排隊等待,也就是阻塞目前線程。

release()

方法就是釋放一些資源,釋放完後就會通知CLH隊列中的線程。CLH隊列中的線程被喚醒後就會再次檢查是否有資源了,有的話就消耗資源,也就是state-n,然後就退出阻塞。由于是共享模式的AQS,線程被喚醒後還會繼續通知下一個排在自己後面的線程,那個線程也醒來檢查資源是否足夠,以此類推下去。

比較有意思的是,Semaphore 内部提供了兩個AQS同步器的實作類,一種是公平模式的,一種是非公平模式的。這兩種同步器其實隻是在擷取資源的行為上有所不同,也就是

tryAcquireShared(int acquires)

的實作不同,而在釋放資源的行為上是一樣的。

在公平模式下,擷取資源前會先去檢查CLH隊列是否有人在排隊了,如果發現有人在排隊了,就會自覺也去排隊。而在非公平模式下,它會直接嘗試先擷取資源,擷取不到再去排隊。

我們可以了解為一間店在賣面包,面包的數量會不斷産生,如果數量不夠了,排隊的人就要在那等着。在公平模式下,有一個人來買面包,發現大家都在排隊,就自覺進入隊列中也排隊等待。在非公平的模式下,來買面包的人就直接先去店主那看有沒有剩餘的面包可以買,這時候可以店主剛生産出一個面包來,還沒來得及交給排在隊伍的第一個人,就被插隊者買走了。是以,這就是不公平的模式,違反了先到先得的規則。

總結

CountDownLatch 和 Semaphore這兩個類的原理其實都很好了解。看這兩個類的源碼也很快,結合AQS的原理一起了解,會對Java線程之間的同步有更深的了解。

最後,如果哪裡有寫的不對的地方,煩請指出,感激不盡!

繼續閱讀