3-4、CountDownLatch:同步器
在JKD1.5+環境中,Doug Lea和他的團隊為我們提供了可以很好實作這個要求的工具類:CountDownLatch和CyclicBarrier。我們首先介紹CountDownLatch的基本使用方式:
3-4-1、CountDownLatch基本使用
CountDownLatch是一個同步計數器,能夠保證在其他線程完成某一個業務操作前,目前線程一直處于等待/阻塞狀态。具體來說,這個計數器将會從給定的某一個數值count開始,通過countDown()方法的調用進行倒數。當執行某一次countDown()操作後,計數器的count數值等于0,所有調用了await()方法的線程,就解除等待/阻塞狀态繼續執行。我們來看一段簡單的示例代碼:
package test.thread.countDownLatch;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
public class TestCountDownLatch {
/**
* 日志
*/
private static Log LOGGER = LogFactory.getLog(TestCountDownLatch.class);
static {
BasicConfigurator.configure();
}
public static void main(String[] args) throws Throwable {
// 同步計數器從5開始計數
final CountDownLatch countDownLatch = new CountDownLatch();
// 啟動子線程,處理“其他”業務
for(int index = ; index < ; index++) {
Thread childThread = new Thread() {
@Override
public void run() {
//等待,以便模型業務處理過程消耗的時間
synchronized (this) {
try {
this.wait();
} catch (InterruptedException e) {
TestCountDownLatch.LOGGER.error(e.getMessage(), e);
}
}
// 完成業務處理過程,計數器-1
long threadid = Thread.currentThread().getId();
TestCountDownLatch.LOGGER.info("子線程(" + threadid + ")執行完成!");
countDownLatch.countDown();
}
};
childThread.start();
}
// 等待所有子線程的業務都處理完成(計數器的count為0時)
countDownLatch.await();
TestCountDownLatch.LOGGER.info("所有子線程的處理都完了,主線程繼續執行...");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
以下是可能的執行結果(因為每次執行的效果可能有所差別):
[Thread-] INFO test.thread.countDownLatch.TestCountDownLatch - 子線程()執行完成!
[Thread-] INFO test.thread.countDownLatch.TestCountDownLatch - 子線程()執行完成!
[Thread-] INFO test.thread.countDownLatch.TestCountDownLatch - 子線程()執行完成!
[Thread-] INFO test.thread.countDownLatch.TestCountDownLatch - 子線程()執行完成!
[Thread-] INFO test.thread.countDownLatch.TestCountDownLatch - 子線程()執行完成!
[main] INFO test.thread.countDownLatch.TestCountDownLatch - 所有子線程的處理都完了,主線程繼續執行...
- 1
- 2
- 3
- 4
- 5
- 6
以上代碼片段和執行結果說明了CountDownLatch的最簡單使用,CountDownLatch同步計數器從5開始計數,分别對應5個子線程的業務完成情況。每當一個子線程業務完成後,CountDownLatch同步計數器就countDown一次。直到count等于0時,這時主線程上面的await()方法解除等待/阻塞狀态,繼續執行。這裡要注意一下:
- 不是說隻能有一次await方法的調用,而是同一時間可以有多個線程調用了await方法。隻要在count還不等于0時,某個線程調用了await方法,它都會進入等待/阻塞狀态。
- 在調用await時,如果CountDownLatch同步計數器的count已經等于0了,則await方法不會進入等待/阻塞狀态。
- await調用和countDown調用不是說必須處于不同線程。同一線程中,您可以先調用countDown然後再調用await進入等待/阻塞。CountDownLatch同步計數器會始終遵循上兩條工作原則。
- 在使用CountDownLatch同步計數器時,您無需考慮髒資料的問題。CountDownLatch同步計數器是線程安全的。
3-4-2、CountDownLatch在“100米賽跑”中的應用
很明顯CountDownLatch在“100米賽跑”中的使用目标是:“等待這組所有的選手全部上跑道”,然後一起開始跑步。是以,CountDownLatch的計數器,需要在選手獲得“跑道”資源後,馬上countDown一次。之後,獲得“跑道”資源的選手要立刻調用await進入等待狀态,等待其他選手也獲得跑道資源。我們給這整個處理邏輯去一個名字叫做:“發令槍”,代碼片段如下:
......
/**
* 選手所關注的發令槍
*/
private CountDownLatch startingGun;
......
public Result call() throws Exception {
......
try {
// 申請上跑道(這個沒有變化)
this.runway.acquire();
// 等待可能的發令槍
if(this.startingGun != null) {
// 執行到這裡,說明這個選手已經拿到了跑到資源;
// 向發令槍表達“我已準備好”,即計數器-1
this.startingGun.countDown();
System.out.println("選手" + name + "[" + number + "],已登上跑道,等待發令!");
// 接下來進入“等待”狀态
// 以便等這個發令槍所管理的所有選手上跑道了,再一起跑步
this.startingGun.await();
System.out.println("選手" + name + "[" + number + "],跑!");
}
// 開始正式跑步
return this.result = this.doRun();
} catch(Exception e) {
e.printStackTrace(System.out);
} finally {
// 都要進入初賽結果排序(中途退賽的成績就為0)
this.runway.release();
System.out.println("選手" + name + "[" + number + "],比賽正常完成!");
}
......
}
......
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
那麼如何進行分組呢?我們可以讓一組選手,關注同一把“發令槍”(有多少把發令槍,就有多少個組、就有多少個CountDownLatch對象)。如下圖所示:
另外,分組時一定要考慮一個問題:由于報名人數不一定是5的整數倍,是以最後一組不一定有5個人。考慮到實作的代碼片段如下:
......
// 這是發令槍
CountDownLatch startingGun = null;
......
// signupPlayers 是報名隊列
// runwayCount 是跑道數量
for (int index = ; index < this.signupPlayers.size() ; index++) {
/*
* 這是發令槍,發令槍的使用規則是:
* 1、最多5位選手聽從一把發令槍的指令(因為跑道最多就是5條)
* 2、如果剩餘的沒有比賽的選手不足5人,則這些人聽從一把發令槍的指令
* */
if(index % runwayCount == ) {
startingGun = this.signupPlayers.size() - index > runwayCount?
new CountDownLatch(runwayCount):
new CountDownLatch(this.signupPlayers.size() - index);
}
// 擷取這個選手(signupPlayers是報名隊列)
Player player = this.signupPlayers.get(index);
// 設定選手關注的發令槍
player.setStartingGun(startingGun);
// 送出給裁判組協調執行
Future<Result> future = refereeService.submit(player);
// 開始一個選手的跑步動作狀态監控
new FutureThread(future, player, this.preliminaries).start();
}
......
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
這樣我們就完成了對第一次“100米賽跑”實作代碼的優化:增加了選手分組控制功能。以下給出相對完整的代碼。注意,由于Result類沒有做任何更改,是以就不需要贅述了。
- Player選手類的的更改,主要是加入了“關注的發令槍”的關注
public class Player implements Callable<Result> , Comparable<Player>{
......
/**
* 跑道
*/
private Semaphore runway;
/**
* 選手所關注的發令槍
*/
private CountDownLatch startingGun;
......
/**
* @param startingGun the startingGun to set
*/
public void setStartingGun(CountDownLatch startingGun) {
this.startingGun = startingGun;
}
/**
* @return the startingGun
*/
public CountDownLatch getStartingGun() {
return startingGun;
}
......
/* (non-Javadoc)
* @see java.util.concurrent.Callable#call()
*/
@Override
public Result call() throws Exception {
this.result = null;
try {
// 申請上跑道
this.runway.acquire();
// 等待可能的發令槍
if(this.startingGun != null) {
// 執行到這裡,說明這個選手已經拿到了跑到資源;
// 首先向發令槍表達“我已準備好”,即計數器-1
this.startingGun.countDown();
System.out.println("選手" + name + "[" + number + "],已登上跑道,等待發令槍!");
// 接下來進入“等待”狀态
// 以便這個發令槍所管理的所有選手登上跑道了,再一起跑步
this.startingGun.await();
System.out.println("選手" + name + "[" + number + "],跑!");
}
// 開始正式跑步
return this.result = this.doRun();
} catch(Exception e) {
e.printStackTrace(System.out);
} finally {
// 都要進入初賽結果排序(中途退賽的成績就為0)
this.runway.release();
System.out.println("選手" + name + "[" + number + "],比賽正常完成!");
}
// 如果執行到這裡,說明異常發生了
this.result = new Result(Float.MAX_VALUE);
return this.result;
}
/**
* 開始跑步(跑步的處理過程沒有變化)
* @return
* @throws Exception
*/
private Result doRun() throws Exception {
/*
* 為了表現一個選手每一次跑步都有不同的狀态(但是都不會低于其最低狀态),
* 是以每一次跑步,系統都會為這個選手配置設定一個即時速度。
*
* 這個即時速度不會低于其最小速度,但是也不會高于 14米/秒(否則就是‘超人’咯)
* */
// 生成即時速度
float presentSpeed = f;
presentSpeed = this.minSpeed * (f + new Random().nextFloat());
if(presentSpeed > f) {
presentSpeed = f;
}
// 計算跑步結果(BigDecimal的使用可自行查閱資料)
BigDecimal calculation = new BigDecimal().divide(new BigDecimal(presentSpeed) , , RoundingMode.HALF_UP);
float presentTime = calculation.floatValue();
// 讓線程等待presentSpeed的時間,模拟該選手跑步的過程
synchronized (this) {
this.wait((long)(presentTime * f));
}
// 傳回跑步結果
this.result = new Result(presentTime);
return result;
}
......
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- TwoTrack比賽主要制類,主要加入了對“選手分組”的支援
/**
* 這是第二個比賽程式。
* @author yinwenjie
*
*/
public class TwoTrack {
......
public void track() {
/*
* 賽跑分為以下幾個階段進行;
*
* 1、報名
* 2、初賽,11名選手,分成兩組,每組最多5名選手。
* 因為場地隻有5條賽道,隻有拿到進場許可的才能使用賽道,進行比賽
*
* 3、決賽:初賽結果将被寫入到一個隊列中進行排序,隻有成績最好的前五名選手,可以參加決賽。
*
* 4、決賽結果的前三名将分别作為冠亞季軍被公布出來
* */
//1、================報名
// 這就是跑道,需求上說了隻有5條跑道,是以隻有5個permits。
Semaphore runway = new Semaphore();
this.signupPlayers.clear();
for(int index = ; index < TwoTrack.PLAYERNAMES.length ; ) {
Player player = new Player(TwoTrack.PLAYERNAMES[index], ++index , runway);
this.signupPlayers.add(player);
}
//2、================進行初賽
// 這是賽道的數量
int runwayCount = ;
// 這是裁判
ExecutorService refereeService = Executors.newFixedThreadPool();
// 這是發令槍
CountDownLatch startingGun = null;
for (int index = ; index < this.signupPlayers.size() ; index++) {
/*
* 這是發令槍,發令槍的使用規則是:
* 1、最多5位選手聽從一把發令槍的指令(因為跑道最多就是5條)
* 2、如果剩餘的沒有比賽的選手不足5人,則這些人聽從一把發令槍的指令
* */
if(index % runwayCount == ) {
startingGun = this.signupPlayers.size() - index > runwayCount?
new CountDownLatch(runwayCount):
new CountDownLatch(this.signupPlayers.size() - index);
}
// 擷取這個選手
Player player = this.signupPlayers.get(index);
// 設定選手關注的發令槍
player.setStartingGun(startingGun);
// 送出給裁判組準備執行
Future<Result> future = refereeService.submit(player);
// 開始一個選手的跑步動作狀态監控
new FutureThread(future, player, this.preliminaries).start();
}
//! 隻有當PLAYERNAMES.length位選手的成績都産生了,才能進入決賽,這很重要
while(this.preliminaries.size() < TwoTrack.PLAYERNAMES.length) {
try {
synchronized (this.preliminaries) {
this.preliminaries.wait();
}
} catch(InterruptedException e) {
e.printStackTrace(System.out);
}
}
// 3、============決賽(隻有初賽結果的前5名可以參見)
// 決賽的發令槍
startingGun = new CountDownLatch();
for(int index = ; index < ; index++) {
Player player = this.preliminaries.poll();
// 重新設定選手關注的發令槍
player.setStartingGun(startingGun);
// 送出給裁判組準備執行
Future<Result> future = refereeService.submit(player);
// 開始一個選手的跑步動作狀态監控
new FutureThread(future, player, this.finals).start();
}
//! 隻有當5位選手的決賽成績都産生了,才能到下一步:公布成績
while(this.finals.size() < ) {
try {
synchronized (this.finals) {
this.finals.wait();
}
} catch(InterruptedException e) {
e.printStackTrace(System.out);
}
}
// 4、============公布決賽成績(前三名)
for(int index = ; index < ; index++) {
Player player = this.finals.poll();
switch (index) {
case :
System.out.println("第一名:" + player.getName() + "[" + player.getNumber() + "],成績:" + player.getResult().getTime() + "秒");
break;
case :
System.out.println("第二名:" + player.getName() + "[" + player.getNumber() + "],成績:" + player.getResult().getTime() + "秒");
break;
case :
System.out.println("第三名:" + player.getName() + "[" + player.getNumber() + "],成績:" + player.getResult().getTime() + "秒");
break;
default:
break;
}
}
}
......
//其他諸如FutureThread、main函數的代碼都沒有變化
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
3-4-3、相似工具類CyclicBarrier
在JDK1.5+中還有一個和CountDownLatch類似的同步計數工具:CyclicBarrier。不同的是CyclicBarrier的計數是循環進行的,而且也不需要向CountDownLatch那樣顯示的調用countDown進行減一操作。
如何了解CyclicBarrier計數器的循環工作方式呢?我們先來看看一個比較簡單的示例代碼:
public class TestCyclicBarrier {
static {
BasicConfigurator.configure();
}
/**
* 日志
*/
private static Log LOGGER = LogFactory.getLog(TestCyclicBarrier.class);
public static void main(String[] args) throws Throwable {
// 同步計數器的技術周期為3
final CyclicBarrier cyclicBarrier = new CyclicBarrier();
// 啟動子線程,處理“其他”業務
for(int index = ; index < ; index++) {
Thread childThread = new Thread() {
@Override
public void run() {
// 可獲得設定的屏障數值
// int parties = cyclicBarrier.getParties();
// 可擷取目前已經進入等待狀态的任務數量
// int numberWaiting = cyclicBarrier.getNumberWaiting();
TestCyclicBarrier.LOGGER.info("本線程已準備好處理業務......");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
TestCyclicBarrier.LOGGER.error(e.getMessage() , e);
}
TestCyclicBarrier.LOGGER.info("開始處理業務......");
}
};
childThread.start();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
我們可以用下圖表示以上代碼的工作過程:
在上圖中,CyclicBarrier的parties屏障設定為3,其意義是隻要有通過CyclicBarrier的await方法進入阻塞等待的線程數量達到了3,則CyclicBarrier就解除這些線程的阻塞狀态讓他們可以繼續執行。是以可以了解為CyclicBarrier的計數功能是可重複使用的,當等待的線程數量達到了設定的屏障值就放行這些線程。
3-4-4、使用CyclicBarrier改寫“比賽”
下面我們将“比賽”中使用CountDownLatch實作的發令槍改寫成使用CyclicBarrier來實作。改寫發令槍不會使發令槍的工作職責發生任何變化,是以改寫量是比較小的。另外由于這個小節中我們已經給出了很多代碼了,為了節約篇幅這裡隻給出最小化的代碼片段。
- Player選手類中,關于發令槍的定義要修改:
......
/**
* 選手所關注的發令槍
*/
private CyclicBarrier startingGun;
/**
* @param startingGun the startingGun to set
*/
public void setStartingGun(CyclicBarrier startingGun) {
this.startingGun = startingGun;
}
/**
* @return the startingGun
*/
public CyclicBarrier getStartingGun() {
return startingGun;
}
......
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- Player選手類中的發令槍使用部分需要改寫。使用CyclicBarrier後就不需要顯示調用countDown()方法了:
......
// 申請上跑道
this.runway.acquire();
// 等待可能的發令槍
if(this.startingGun != null) {
// 執行到這裡,說明這個選手已經拿到了跑到資源;
System.out.println("選手" + name + "[" + number + "],已登上跑道,等待發令槍!");
// 接下來進入“等待”狀态
// 以便這個發令槍所管理的所有選手登上跑道了,再一起跑步
this.startingGun.await();
System.out.println("選手" + name + "[" + number + "],跑!");
}
// 開始正式跑步
return this.result = this.doRun();
......
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- TwoTrack主操作類中,關于發令槍的定義要進行變更:從CountDownLatch變成CyclicBarrier:
......
// 這是發令槍
CyclicBarrier startingGun = null;
......
- 1
- 2
- 3
- 4
- TwoTrack主操作類中,根據條件決定CyclicBarrier中parties屏障值的代碼業務要進行調整。從之前确定CountDownLatch計數初值變化而來:
......
if(index % runwayCount == 0) {
startingGun = this.signupPlayers.size() - index > runwayCount?
new CyclicBarrier(runwayCount):
new CyclicBarrier(this.signupPlayers.size() - index);
}
......
- 1
- 2
- 3
- 4
- 5
- 6
- 7
這裡我們就不再贅述代碼的工作效果了,因為工作效果不會有任何變化。
來源:http://blog.csdn.net/yinwenjie(未經允許嚴禁用于商業用途!)