CountDownLatch 和 Semaphore 是Java中常用的兩個同步器。他們的用法百度一下一大堆,我就不多做介紹了。下面我将從源碼的角度分析一下這兩個類的實作原理。
閱讀本篇文章之前,建議先了解AQS同步器的原理。可以看我之前寫的一篇文章:
Java同步器架構-AQS原理&源碼解析
了解了AQS的底層原理,再來看CountDownLatch 和 Semaphore 的實作原理就會發現很簡單。
CountDownLatch 實作原理
CountDownLatch 内部定義了一個AQS的實作類
private final Sync sync;
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
//判斷state是否為0了,如果為0就傳回1,表示允許通過。傳回-1的話線程就要挂起等待state為0
return (getState() == ) ? : -;
}
protected boolean tryReleaseShared(int releases) {
//通過自旋+CAS的操作将state-1。當state減到0的時候AQS就會去喚醒挂起中的線程
for (;;) {
int c = getState();
if (c == )
return false;
int nextc = c-;
if (compareAndSetState(c, nextc))
return nextc == ;
}
}
}
public CountDownLatch(int count) {
if (count < ) throw new IllegalArgumentException("count < 0");
//初始化AQS同步器
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
//嘗試擷取資源,等待的時候支援線程響應中斷
sync.acquireSharedInterruptibly();
}
public void countDown() {
//釋放資源
sync.releaseShared();
}
可以看出,countDown()和await()方法的底層其實都是依賴于Sync。
Sync是一個AQS同步器,實作了 tryAcquireShared()和tryReleaseShared(),這是一個共享模式下的同步器。
CountDownLatch在初始化的時候傳入一個同步線程數量count表示AQS的state。然後執行await()的時候,AQS會先判斷state是否為0,state不為0的話就會阻塞目前線程,并讓目前線程進入AQS的CLH隊列中排隊。
當有線程執行countDown的時候,Sync會通過cas+自旋的方式将state減一,然後判斷state是否等于0。等于0的時候傳回true,AQS發現tryReleaseShared()傳回true,就會去喚醒正在CLH隊列中排隊等待的線程,先喚醒排在最前面的那個線程。由于是共享模式,那個線程被喚醒後,檢查state=0了,就結束阻塞,并且會通知下一個排隊線程,下一個線程醒來後,一樣判斷state是否等于0了,然後結束阻塞,通知下一個,一直循環下去,直到所有阻塞中的線程全部被喚醒。
Semaphore 實作原理
也是基于AQS實作的。内部也定義了一個Sync ,AQS的實作類
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = L;
//定義初始的許可證的數量
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
//非公平的政策
final int nonfairTryAcquireShared(int acquires) {
//用CAS+自旋的方式擷取資源
for (;;) {
//直接去擷取資源
//假設現在CLH隊列中有人在排隊了,然後剛好有線程釋放了一個資源并且喚醒隊列中等待的線程了
//然後喚醒後還沒來得及擷取到資源,就被人插隊擷取了資源
//下面的代碼就是嘗試不排隊直接擷取資源,擷取不到資源再去排隊,是以這是一種不公平的算法
int available = getState();
int remaining = available - acquires;
if (remaining < ||
compareAndSetState(available, remaining))
return remaining;
}
}
//釋放資源
protected final boolean tryReleaseShared(int releases) {
//用CAS+自旋的方式釋放資源
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == || compareAndSetState(current, ))
return current;
}
}
}
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
//采用非公平的方式擷取資源
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
//用CAS+自旋的方式嘗試擷取資源
for (;;) {
//判斷CLH隊列中是否有人在排隊等待,如果是的話直接傳回-1,表示也進入隊列等待
if (hasQueuedPredecessors())
return -;
//如果隊列中沒有人在排隊,就嘗試擷取資源了
int available = getState();
int remaining = available - acquires;
if (remaining < ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
//預設選擇非公平的AQS同步器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//可以選擇公平的還是不公平的同步器
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly();
}
public void release() {
sync.releaseShared();
}
在Semaphore 的例子中,AQS的state就可以直接看成是一種資源(許可證)。
acquire()
方法就是嘗試擷取資源,當資源不足的的時候,就進入CLH隊列排隊等待,也就是阻塞目前線程。
release()
方法就是釋放一些資源,釋放完後就會通知CLH隊列中的線程。CLH隊列中的線程被喚醒後就會再次檢查是否有資源了,有的話就消耗資源,也就是state-n,然後就退出阻塞。由于是共享模式的AQS,線程被喚醒後還會繼續通知下一個排在自己後面的線程,那個線程也醒來檢查資源是否足夠,以此類推下去。
比較有意思的是,Semaphore 内部提供了兩個AQS同步器的實作類,一種是公平模式的,一種是非公平模式的。這兩種同步器其實隻是在擷取資源的行為上有所不同,也就是
tryAcquireShared(int acquires)
的實作不同,而在釋放資源的行為上是一樣的。
在公平模式下,擷取資源前會先去檢查CLH隊列是否有人在排隊了,如果發現有人在排隊了,就會自覺也去排隊。而在非公平模式下,它會直接嘗試先擷取資源,擷取不到再去排隊。
我們可以了解為一間店在賣面包,面包的數量會不斷産生,如果數量不夠了,排隊的人就要在那等着。在公平模式下,有一個人來買面包,發現大家都在排隊,就自覺進入隊列中也排隊等待。在非公平的模式下,來買面包的人就直接先去店主那看有沒有剩餘的面包可以買,這時候可以店主剛生産出一個面包來,還沒來得及交給排在隊伍的第一個人,就被插隊者買走了。是以,這就是不公平的模式,違反了先到先得的規則。
總結
CountDownLatch 和 Semaphore這兩個類的原理其實都很好了解。看這兩個類的源碼也很快,結合AQS的原理一起了解,會對Java線程之間的同步有更深的了解。
最後,如果哪裡有寫的不對的地方,煩請指出,感激不盡!