天天看點

多線程程式設計學習九(并發工具類).

CountDownLatch

  1. CountDownLatch 允許一個或多個線程等待其他線程完成操作。
  2. CountDownLatch 可以替代 join 的作用,并提供了更豐富的用法。
  3. CountDownLatch 的 countDown 方法,N 會減1;CountDownLatch 的 await 方法會阻塞目前線程,直到 N 變成零。
  4. CountDownLatch 不可能重新初始化或者修改 CountDownLatch 對象的内部計數器的值。
  5. CountDownLatch 内部由 AQS 共享鎖實作。
public class CountDownLatchTest {

    private static final CountDownLatch DOWN_LATCH = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            System.out.println(1);
            DOWN_LATCH.countDown();
            System.out.println(2);
            DOWN_LATCH.countDown();

        }).start();
        DOWN_LATCH.await();
        System.out.println("3");
    }
}
           

CyclicBarrier

  1. CyclicBarrier 設定一個屏障(也可以叫同步點),攔截阻塞一組線程,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運作。
  2. CyclicBarrier 預設的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴 CyclicBarrier 我已經到達了屏障,然後目前線程被阻塞。
  3. CyclicBarrier 還提供一個更進階的構造函數 CyclicBarrier(int parties,Runnable barrierAction),用于線上程到達屏障時,優先執行 barrierAction,友善處理更複雜的業務場景。
  4. getNumberWaiting 方法可以獲得 CyclicBarrier 阻塞的線程數量;isBroken()方法用來了解阻塞的線程是否被中斷。
  5. CyclicBarrier 的計數器可以使用 reset() 方法重置(CountDownLatch 的計數器隻能使用一次)。是以 CyclicBarrier 能處理更為複雜的業務場景。例如,如果計算發生錯誤,可以重置計數器,并讓線程重新執行一次。
  6. CyclicBarrier 可以用于多線程計算資料,最後合并計算結果的場景。
  7. CyclicBarrier 内部采用重入鎖 ReentrantLock 實作。
public class BankWaterService implements Runnable {
 
    // 建立4個屏障,處理完之後執行目前類的run方法
    private CyclicBarrier barrier = new CyclicBarrier(4, this);
    // 假設有4個計算任務,是以隻啟動4個線程
    private Executor executor = Executors.newFixedThreadPool(4);
    // 儲存每個任務的計算結果
    private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

    private AtomicInteger atomicInteger = new AtomicInteger(1);

    private void count() {
        for (int i = 0; i < 4; i++) {
            Thread thread = new Thread(() -> {
                // 目前任務的計算結果,計算過程忽略
                sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                // 計算完成,插入一個屏障
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

            }, "線程" + atomicInteger.getAndIncrement());
            executor.execute(thread);
        }
    }

    @Override
    public void run() {
        int result = 0;
        // 彙總每個任務計算出的結果
        for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result += sheet.getValue();
        }
        //将結果輸出
        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        BankWaterService bankWaterCount = new BankWaterService();
        bankWaterCount.count();
    }
}
           

Semaphore

  1. Semaphore(信号量)是用來控制同時通路特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。
  2. Semaphore 可以用于做流量控制,特别是公用資源有限的應用場景,比如資料庫連接配接。
  3. Semaphore的構造方法 Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。
  4. 首先線程使用 Semaphore 的 acquire() 方法擷取一個許可證,使用完之後調用 release() 方法歸還許可證。還可以用 tryAcquire() 方法嘗試擷取許可證。
  5. intavailablePermits():傳回此信号量中目前可用的許可證數。
  6. intgetQueueLength():傳回正在等待擷取許可證的線程數。
  7. hasQueuedThreads():是否有線程正在等待擷取許可證。
  8. Semaphore 内部使用 AQS 共享鎖實作。
public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;
    private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore SEMAPHORE = new Semaphore(10);
    private static AtomicInteger ATOMICINTEGER = new AtomicInteger(1);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            EXECUTOR.execute(() -> {
                try {
                    SEMAPHORE.acquire();
                    System.out.println("save data" + ATOMICINTEGER.getAndIncrement());
                    SEMAPHORE.release();
                } catch (InterruptedException e) {
                }

            });
        }
        EXECUTOR.shutdown();
    }
}
           

Exchanger

  1. Exchanger(交換者)是一個用于線程間協作的工具類 —— 用于線程間的資料交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的資料。這兩個線程通過 exchange 方法交換資料,如果第一個線程先執行 exchange() 方法,它會一直等待第二個線程也執行 exchange 方法。
  2. 可簡單地将 Exchanger 對象了解為一個包含兩個格子的容器,通過 exchanger 方法可以向兩個格子中填充資訊。當兩個格子中的均被填充時,該對象會自動将兩個格子的資訊交換,然後傳回給線程,進而實作兩個線程的資訊交換。
  3. Exchanger 可用于遺傳算法。(遺傳算法:需要選出兩個人作為交配對象,這時候會交換兩人的資料,并使用交叉規則得出交配結果)
  4. Exchanger 可用于校對工作,比如一份資料需要兩個人同時進行校對,都校對無誤後,才能進行後續處理。這時,就可以使用 Exchanger 比較兩份校對結果。
  5. Exchanger 内部采用無鎖 CAS 實作,Exchange 使用了内部對象 Node 的兩個屬性 — item 、match,分布存儲兩個線程的值。
public class ExchangerTest {

    private static final Exchanger<String> exchange = new Exchanger<>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(() -> {
            try {
                String result = exchange.exchange("資料A");
                System.out.println("A的exchange結果:" + result);
            } catch (InterruptedException e) {
            }

        });
        threadPool.execute(() -> {
            try {
                String result = exchange.exchange("資料B");
                System.out.println("B的exchange結果:" + result);
            } catch (InterruptedException e) {
            }
        });
        threadPool.shutdown();
    }
}
           

Phaser

多階段栅欄,可以在初始時設定參與線程數,也可以中途注冊/登出參與者,當到達的參與者數量滿足栅欄設定的數量後,會進行階段更新(advance)。以下是 Phaser 的一些基礎概念。

  1. phase(階段):在任意時間點,Phaser 隻處于某一個 phase(階段),初始階段為 0,最大達到 Integerr.MAX_VALUE,然後再次歸零。當所有 parties 參與者都到達後,phase 值會遞增。
  2. parties(參與者):指參與線程。Phaser 既可以在初始構造時指定參與者的數量,也可以中途通過register、bulkRegister、arriveAndDeregister等方法注冊/登出參與者。
  3. arrive(到達):Phaser 注冊完 parties(參與者)之後,參與者的初始狀态是 unarrived 的,當參與者到達(arrive)目前階段(phase)後,狀态就會變成 arrived。
  4. advance(進階):當階段的到達參與者數滿足條件後(注冊的數量等于到達的數量),階段就會發生進階(advance)—— 也就是phase值+1。
  5. termination(終止):代表目前 Phaser 對象達到終止狀态。
  6. Tiering(分層):一種樹形結構,通過構造函數可以指定目前待構造的 Phaser對象的父結點。

    6.1 之是以引入 Tiering,是因為當一個 Phaser 有大量參與者(parties)的時候,内部的同步操作會使性能急劇下降,而分層可以降低競争,進而減小因同步導緻的額外開銷。

    6.2 在一個分層 Phasers 的樹結構中,注冊和撤銷子Phaser或父Phaser是自動被管理的。當一個Phaser的參與者(parties)數量變成 0 時,如果有該 Phaser 有父結點,就會将它從父結點中溢移除。

public class PhaserTest1 implements Runnable {

    private final Phaser phaser;

    public PhaserTest1(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ": 開始執行任務,目前phase =" + phaser.getPhase() + "");
        // 1.等待其他參與者線程到達
        // 2.不響應中斷,也就是說即使目前線程被中斷,arriveAndAwaitAdvance方法也不會傳回或抛出異常,而是繼續等待。如果希望能夠響應中斷,可以參考 awaitAdvanceInterruptibly 方法。
        int advance = phaser.arriveAndAwaitAdvance();
        System.out.println(Thread.currentThread().getName() + ": 結束執行任務,目前phase =" + advance + "");
    }

    public static void main(String[] args) {
        Phaser phaser = new Phaser();
        for (int i = 0; i < 10; i++) {
            // 新增參與者數量
            phaser.register();
            new Thread(new PhaserTest1(phaser), "Thread-" + i).start();
        }
    }
}