AQS的具體實作二:
一、簡介
CountDownLatch(線程計數器)與CyclicBarrier(可重複使用的栅欄),線程計數器和循環栅欄是兩個用來進行同步的類,都是通過await方法來進行線程的阻塞,當執行線程數達到具體的數量時才會執行釋放阻塞隊列的方法,都是依賴于AbstractQueuedSynchronizer架構進行實作,存在于java.util.concurrent包下。
二、差別
1、但CountDownLatch内部Sync通過實作AbstractQueuedSynchronizer來實作然後通過state關鍵字來計算具體的線程實作數量,CyclicBarrier是通過ReentrantLock類來實作,然後通過count變量計算線程數量。
2、CyclicBarrier可重複使用,通過調用nextGeneration方法實作,CountDownLatch不能夠實作。
3、CyclicBarrier中await方法調用然後增加count的值來實作,而CountDownLatch是countDown方法實作的線程計數,增加state關鍵字的數量,不會去阻塞線程。
4、CyclicBarrier内部初始化時可以傳入runnable實作類,然後進行調用。
三、具體實作
CountDownLatch類具體實作
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//這裡是實作線程計數器的關鍵,隻有線程數達到初始化的時候才會去将挂起的線程重新運作
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//當線程占用數減至0時才會傳回成功,然後才能進行後續的線程執行操作
protected boolean tryReleaseShared(int releases) {
// 循環release知道state為0或者設定nextc成功
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
//初始化時設定state值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//等待釋放線程
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//設定等待時間的await函數
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//每次countdown将線程占用數減去一
public void countDown() {
sync.releaseShared(1);
}
//等待線程數
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
CyclicBarrier具體實作。
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
//下一個generation
private void nextGeneration() {
//釋放所有的等待線程
trip.signalAll();
//需要的線程數重新初始化
count = parties;
//新的generation
generation = new Generation();
}
//中斷目前generation
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
//等待方法具體實作
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
//這裡和後面的設定中斷狀态配合,保證同一線程隻能等待一次
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//初始化的任務
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
//若是等于false卻未調用nextGeneration,手動調用breakBarrier。保證不會重複調用
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
//加入等待隊列
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 這裡是新的genertion或者broken為true表示目前genertion結束,直接加上 //中斷标志,跟前面檢測比對進行處理
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//釋放鎖
lock.unlock();
}
}
//初始化,可以包含任務的初始化
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//僅包含最多等待線程的初始化
public CyclicBarrier(int parties) {
this(parties, null);
}
//擷取初始化的等待最大條數
public int getParties() {
return parties;
}
//線程同步等待塞方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
//帶時間的等待方法
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
//目前generation是否中斷
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
//開啟新的generation
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
//等待線程數
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}
}