天天看點

Java多線程之CountDownLatch同步器的使用(六)

3-4、CountDownLatch:同步器

在JKD1.5+環境中,Doug Lea和他的團隊為我們提供了可以很好實作這個要求的工具類:CountDownLatch和CyclicBarrier。我們首先介紹CountDownLatch的基本使用方式:

3-4-1、CountDownLatch基本使用

CountDownLatch是一個同步計數器,能夠保證在其他線程完成某一個業務操作前,目前線程一直處于等待/阻塞狀态。具體來說,這個計數器将會從給定的某一個數值count開始,通過countDown()方法的調用進行倒數。當執行某一次countDown()操作後,計數器的count數值等于0,所有調用了await()方法的線程,就解除等待/阻塞狀态繼續執行。我們來看一段簡單的示例代碼:

package test.thread.countDownLatch;

import java.util.concurrent.CountDownLatch;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class TestCountDownLatch {
    /**
     * 日志
     */
    private static Log LOGGER = LogFactory.getLog(TestCountDownLatch.class);

    static {
        BasicConfigurator.configure();
    }

    public static void main(String[] args) throws Throwable {
        // 同步計數器從5開始計數
        final CountDownLatch countDownLatch = new CountDownLatch();

        // 啟動子線程,處理“其他”業務
        for(int index =  ; index <  ; index++) {
            Thread childThread = new Thread() {
                @Override
                public void run() {
                    //等待,以便模型業務處理過程消耗的時間
                    synchronized (this) {
                        try {
                            this.wait();
                        } catch (InterruptedException e) {
                            TestCountDownLatch.LOGGER.error(e.getMessage(), e);
                        }
                    }

                    // 完成業務處理過程,計數器-1
                    long threadid = Thread.currentThread().getId();
                    TestCountDownLatch.LOGGER.info("子線程(" + threadid + ")執行完成!");
                    countDownLatch.countDown();
                }
            };
            childThread.start();
        }

        // 等待所有子線程的業務都處理完成(計數器的count為0時)
        countDownLatch.await();
        TestCountDownLatch.LOGGER.info("所有子線程的處理都完了,主線程繼續執行...");
    }
}
           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

以下是可能的執行結果(因為每次執行的效果可能有所差別):

[Thread-] INFO test.thread.countDownLatch.TestCountDownLatch  - 子線程()執行完成!
 [Thread-] INFO test.thread.countDownLatch.TestCountDownLatch  - 子線程()執行完成!
 [Thread-] INFO test.thread.countDownLatch.TestCountDownLatch  - 子線程()執行完成!
 [Thread-] INFO test.thread.countDownLatch.TestCountDownLatch  - 子線程()執行完成!
 [Thread-] INFO test.thread.countDownLatch.TestCountDownLatch  - 子線程()執行完成!
 [main] INFO test.thread.countDownLatch.TestCountDownLatch  - 所有子線程的處理都完了,主線程繼續執行...           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

以上代碼片段和執行結果說明了CountDownLatch的最簡單使用,CountDownLatch同步計數器從5開始計數,分别對應5個子線程的業務完成情況。每當一個子線程業務完成後,CountDownLatch同步計數器就countDown一次。直到count等于0時,這時主線程上面的await()方法解除等待/阻塞狀态,繼續執行。這裡要注意一下:

  • 不是說隻能有一次await方法的調用,而是同一時間可以有多個線程調用了await方法。隻要在count還不等于0時,某個線程調用了await方法,它都會進入等待/阻塞狀态。
  • 在調用await時,如果CountDownLatch同步計數器的count已經等于0了,則await方法不會進入等待/阻塞狀态。
  • await調用和countDown調用不是說必須處于不同線程。同一線程中,您可以先調用countDown然後再調用await進入等待/阻塞。CountDownLatch同步計數器會始終遵循上兩條工作原則。
  • 在使用CountDownLatch同步計數器時,您無需考慮髒資料的問題。CountDownLatch同步計數器是線程安全的。

3-4-2、CountDownLatch在“100米賽跑”中的應用

很明顯CountDownLatch在“100米賽跑”中的使用目标是:“等待這組所有的選手全部上跑道”,然後一起開始跑步。是以,CountDownLatch的計數器,需要在選手獲得“跑道”資源後,馬上countDown一次。之後,獲得“跑道”資源的選手要立刻調用await進入等待狀态,等待其他選手也獲得跑道資源。我們給這整個處理邏輯去一個名字叫做:“發令槍”,代碼片段如下:

......
/**
 * 選手所關注的發令槍
 */
private CountDownLatch startingGun;
......

public Result call() throws Exception {
    ......
    try {
        // 申請上跑道(這個沒有變化)
        this.runway.acquire();
        // 等待可能的發令槍
        if(this.startingGun != null) {
            // 執行到這裡,說明這個選手已經拿到了跑到資源;
            // 向發令槍表達“我已準備好”,即計數器-1
            this.startingGun.countDown();
            System.out.println("選手" + name + "[" + number + "],已登上跑道,等待發令!");
            // 接下來進入“等待”狀态
            // 以便等這個發令槍所管理的所有選手上跑道了,再一起跑步
            this.startingGun.await();
            System.out.println("選手" + name + "[" + number + "],跑!");
        }

        // 開始正式跑步
        return this.result = this.doRun();
    } catch(Exception e) {
        e.printStackTrace(System.out);
    } finally {
        // 都要進入初賽結果排序(中途退賽的成績就為0)
        this.runway.release();
        System.out.println("選手" + name + "[" + number + "],比賽正常完成!");
    }
    ......
}
......           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

那麼如何進行分組呢?我們可以讓一組選手,關注同一把“發令槍”(有多少把發令槍,就有多少個組、就有多少個CountDownLatch對象)。如下圖所示:

Java多線程之CountDownLatch同步器的使用(六)

另外,分組時一定要考慮一個問題:由于報名人數不一定是5的整數倍,是以最後一組不一定有5個人。考慮到實作的代碼片段如下:

......
// 這是發令槍
CountDownLatch startingGun = null;
......
// signupPlayers 是報名隊列
// runwayCount 是跑道數量
for (int index =  ; index < this.signupPlayers.size() ; index++) {
    /*
     * 這是發令槍,發令槍的使用規則是:
     * 1、最多5位選手聽從一把發令槍的指令(因為跑道最多就是5條)
     * 2、如果剩餘的沒有比賽的選手不足5人,則這些人聽從一把發令槍的指令
     * */
    if(index % runwayCount == ) {
        startingGun =  this.signupPlayers.size() - index > runwayCount?
            new CountDownLatch(runwayCount):
            new CountDownLatch(this.signupPlayers.size() - index);
    }

    // 擷取這個選手(signupPlayers是報名隊列)
    Player player = this.signupPlayers.get(index);
    // 設定選手關注的發令槍
    player.setStartingGun(startingGun);
    // 送出給裁判組協調執行
    Future<Result> future = refereeService.submit(player);

    // 開始一個選手的跑步動作狀态監控
    new FutureThread(future, player, this.preliminaries).start();
}
......           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

這樣我們就完成了對第一次“100米賽跑”實作代碼的優化:增加了選手分組控制功能。以下給出相對完整的代碼。注意,由于Result類沒有做任何更改,是以就不需要贅述了。

  • Player選手類的的更改,主要是加入了“關注的發令槍”的關注
public class Player implements Callable<Result> , Comparable<Player>{
    ......
    /**
     * 跑道
     */
    private Semaphore runway;

    /**
     * 選手所關注的發令槍
     */
    private CountDownLatch startingGun;
    ......

    /**
     * @param startingGun the startingGun to set
     */
    public void setStartingGun(CountDownLatch startingGun) {
        this.startingGun = startingGun;
    }

    /**
     * @return the startingGun
     */
    public CountDownLatch getStartingGun() {
        return startingGun;
    }

    ......
    /* (non-Javadoc)
     * @see java.util.concurrent.Callable#call()
     */
    @Override
    public Result call() throws Exception {
        this.result = null;
        try {
            // 申請上跑道
            this.runway.acquire();
            // 等待可能的發令槍
            if(this.startingGun != null) {
                // 執行到這裡,說明這個選手已經拿到了跑到資源;
                // 首先向發令槍表達“我已準備好”,即計數器-1
                this.startingGun.countDown();
                System.out.println("選手" + name + "[" + number + "],已登上跑道,等待發令槍!");
                // 接下來進入“等待”狀态
                // 以便這個發令槍所管理的所有選手登上跑道了,再一起跑步
                this.startingGun.await();
                System.out.println("選手" + name + "[" + number + "],跑!");
            }

            // 開始正式跑步
            return this.result = this.doRun();
        } catch(Exception e) {
            e.printStackTrace(System.out);
        } finally {
            // 都要進入初賽結果排序(中途退賽的成績就為0)
            this.runway.release();
            System.out.println("選手" + name + "[" + number + "],比賽正常完成!");
        }

        // 如果執行到這裡,說明異常發生了
        this.result = new Result(Float.MAX_VALUE);
        return this.result;
    }

    /**
     * 開始跑步(跑步的處理過程沒有變化)
     * @return
     * @throws Exception
     */
    private Result doRun()  throws Exception {
        /*
         * 為了表現一個選手每一次跑步都有不同的狀态(但是都不會低于其最低狀态),
         * 是以每一次跑步,系統都會為這個選手配置設定一個即時速度。
         * 
         * 這個即時速度不會低于其最小速度,但是也不會高于 14米/秒(否則就是‘超人’咯)
         * */
        // 生成即時速度
        float presentSpeed = f;
        presentSpeed = this.minSpeed * (f + new Random().nextFloat());
        if(presentSpeed > f) {
            presentSpeed = f;
        }

        // 計算跑步結果(BigDecimal的使用可自行查閱資料)
        BigDecimal calculation =  new BigDecimal().divide(new BigDecimal(presentSpeed) , , RoundingMode.HALF_UP);
        float presentTime = calculation.floatValue();

        // 讓線程等待presentSpeed的時間,模拟該選手跑步的過程
        synchronized (this) {
            this.wait((long)(presentTime * f));
        }

        // 傳回跑步結果
        this.result = new Result(presentTime);
        return result;
    }

    ......
}           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • TwoTrack比賽主要制類,主要加入了對“選手分組”的支援
/**
 * 這是第二個比賽程式。
 * @author yinwenjie
 *
 */
public class TwoTrack {
    ......
    public void track() {
        /*
         * 賽跑分為以下幾個階段進行;
         * 
         * 1、報名
         * 2、初賽,11名選手,分成兩組,每組最多5名選手。
         * 因為場地隻有5條賽道,隻有拿到進場許可的才能使用賽道,進行比賽
         * 
         * 3、決賽:初賽結果将被寫入到一個隊列中進行排序,隻有成績最好的前五名選手,可以參加決賽。
         * 
         * 4、決賽結果的前三名将分别作為冠亞季軍被公布出來
         * */

        //1、================報名
        // 這就是跑道,需求上說了隻有5條跑道,是以隻有5個permits。
        Semaphore runway = new Semaphore();
        this.signupPlayers.clear();
        for(int index =  ; index < TwoTrack.PLAYERNAMES.length ; ) {
            Player player = new Player(TwoTrack.PLAYERNAMES[index], ++index , runway);
            this.signupPlayers.add(player);
        }

        //2、================進行初賽
        // 這是賽道的數量
        int runwayCount = ;
        // 這是裁判
        ExecutorService refereeService = Executors.newFixedThreadPool();
        // 這是發令槍
        CountDownLatch startingGun = null;
        for (int index =  ; index < this.signupPlayers.size() ; index++) {
            /*
             * 這是發令槍,發令槍的使用規則是:
             * 1、最多5位選手聽從一把發令槍的指令(因為跑道最多就是5條)
             * 2、如果剩餘的沒有比賽的選手不足5人,則這些人聽從一把發令槍的指令
             * */
            if(index % runwayCount == ) {
                startingGun =  this.signupPlayers.size() - index > runwayCount?
                    new CountDownLatch(runwayCount):
                    new CountDownLatch(this.signupPlayers.size() - index);
            }

            // 擷取這個選手
            Player player = this.signupPlayers.get(index);
            // 設定選手關注的發令槍
            player.setStartingGun(startingGun);
            // 送出給裁判組準備執行
            Future<Result> future = refereeService.submit(player);

            // 開始一個選手的跑步動作狀态監控
            new FutureThread(future, player, this.preliminaries).start();
        }
        //! 隻有當PLAYERNAMES.length位選手的成績都産生了,才能進入決賽,這很重要
        while(this.preliminaries.size() < TwoTrack.PLAYERNAMES.length) {
            try {
                synchronized (this.preliminaries) {
                    this.preliminaries.wait();
                }
            } catch(InterruptedException e) {
                e.printStackTrace(System.out);
            }
        }

        // 3、============決賽(隻有初賽結果的前5名可以參見)
        // 決賽的發令槍
        startingGun = new CountDownLatch();
        for(int index =  ; index <  ; index++) {
            Player player = this.preliminaries.poll();
            // 重新設定選手關注的發令槍
            player.setStartingGun(startingGun);
            // 送出給裁判組準備執行
            Future<Result> future = refereeService.submit(player);

            // 開始一個選手的跑步動作狀态監控
            new FutureThread(future, player, this.finals).start();
        }
        //! 隻有當5位選手的決賽成績都産生了,才能到下一步:公布成績
        while(this.finals.size() < ) {
            try {
                synchronized (this.finals) {
                    this.finals.wait();
                }
            } catch(InterruptedException e) {
                e.printStackTrace(System.out);
            }
        }

        // 4、============公布決賽成績(前三名)
        for(int index =  ; index <  ; index++) {
            Player player = this.finals.poll();
            switch (index) {
            case :
                System.out.println("第一名:"  + player.getName() + "[" + player.getNumber() + "],成績:" + player.getResult().getTime() + "秒");
                break;
            case :
                System.out.println("第二名:"  + player.getName() + "[" + player.getNumber() + "],成績:" + player.getResult().getTime() + "秒");
                break;
            case :
                System.out.println("第三名:"  + player.getName() + "[" + player.getNumber() + "],成績:" + player.getResult().getTime() + "秒");
                break;
            default:
                break;
            }
        }
    }

    ......
    //其他諸如FutureThread、main函數的代碼都沒有變化
}           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115

3-4-3、相似工具類CyclicBarrier

在JDK1.5+中還有一個和CountDownLatch類似的同步計數工具:CyclicBarrier。不同的是CyclicBarrier的計數是循環進行的,而且也不需要向CountDownLatch那樣顯示的調用countDown進行減一操作。

如何了解CyclicBarrier計數器的循環工作方式呢?我們先來看看一個比較簡單的示例代碼:

public class TestCyclicBarrier {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日志
     */
    private static Log LOGGER = LogFactory.getLog(TestCyclicBarrier.class);

    public static void main(String[] args) throws Throwable {
        // 同步計數器的技術周期為3
        final CyclicBarrier cyclicBarrier = new CyclicBarrier();

        // 啟動子線程,處理“其他”業務
        for(int index =  ; index <  ; index++) {
            Thread childThread = new Thread() {
                @Override
                public void run() {
                    // 可獲得設定的屏障數值
                    // int parties = cyclicBarrier.getParties();
                    // 可擷取目前已經進入等待狀态的任務數量
                    // int numberWaiting = cyclicBarrier.getNumberWaiting();
                    TestCyclicBarrier.LOGGER.info("本線程已準備好處理業務......");
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        TestCyclicBarrier.LOGGER.error(e.getMessage() , e);
                    } 
                    TestCyclicBarrier.LOGGER.info("開始處理業務......");
                }
            };
            childThread.start();
        }
    }
}           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

我們可以用下圖表示以上代碼的工作過程:

Java多線程之CountDownLatch同步器的使用(六)

在上圖中,CyclicBarrier的parties屏障設定為3,其意義是隻要有通過CyclicBarrier的await方法進入阻塞等待的線程數量達到了3,則CyclicBarrier就解除這些線程的阻塞狀态讓他們可以繼續執行。是以可以了解為CyclicBarrier的計數功能是可重複使用的,當等待的線程數量達到了設定的屏障值就放行這些線程。

3-4-4、使用CyclicBarrier改寫“比賽”

下面我們将“比賽”中使用CountDownLatch實作的發令槍改寫成使用CyclicBarrier來實作。改寫發令槍不會使發令槍的工作職責發生任何變化,是以改寫量是比較小的。另外由于這個小節中我們已經給出了很多代碼了,為了節約篇幅這裡隻給出最小化的代碼片段。

  • Player選手類中,關于發令槍的定義要修改:
......
/**
 * 選手所關注的發令槍
 */
private CyclicBarrier startingGun;

/**
 * @param startingGun the startingGun to set
 */
public void setStartingGun(CyclicBarrier startingGun) {
    this.startingGun = startingGun;
}

/**
 * @return the startingGun
 */
public CyclicBarrier getStartingGun() {
    return startingGun;
}
......           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • Player選手類中的發令槍使用部分需要改寫。使用CyclicBarrier後就不需要顯示調用countDown()方法了:
......
// 申請上跑道
this.runway.acquire();
// 等待可能的發令槍
if(this.startingGun != null) {
    // 執行到這裡,說明這個選手已經拿到了跑到資源;
    System.out.println("選手" + name + "[" + number + "],已登上跑道,等待發令槍!");
    // 接下來進入“等待”狀态
    // 以便這個發令槍所管理的所有選手登上跑道了,再一起跑步
    this.startingGun.await();
    System.out.println("選手" + name + "[" + number + "],跑!");
}

// 開始正式跑步
return this.result = this.doRun();
......           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • TwoTrack主操作類中,關于發令槍的定義要進行變更:從CountDownLatch變成CyclicBarrier:
......
// 這是發令槍
CyclicBarrier startingGun = null;
......           
  • 1
  • 2
  • 3
  • 4
  • TwoTrack主操作類中,根據條件決定CyclicBarrier中parties屏障值的代碼業務要進行調整。從之前确定CountDownLatch計數初值變化而來:
......
if(index % runwayCount == 0) {
    startingGun =  this.signupPlayers.size() - index > runwayCount?
        new CyclicBarrier(runwayCount):
        new CyclicBarrier(this.signupPlayers.size() - index);
}
......           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

這裡我們就不再贅述代碼的工作效果了,因為工作效果不會有任何變化。

來源:http://blog.csdn.net/yinwenjie(未經允許嚴禁用于商業用途!)

繼續閱讀