CountDownLatch
- CountDownLatch 允許一個或多個線程等待其他線程完成操作。
- CountDownLatch 可以替代 join 的作用,并提供了更豐富的用法。
- CountDownLatch 的 countDown 方法,N 會減1;CountDownLatch 的 await 方法會阻塞目前線程,直到 N 變成零。
- CountDownLatch 不可能重新初始化或者修改 CountDownLatch 對象的内部計數器的值。
- CountDownLatch 内部由 AQS 共享鎖實作。
public class CountDownLatchTest {
private static final CountDownLatch DOWN_LATCH = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println(1);
DOWN_LATCH.countDown();
System.out.println(2);
DOWN_LATCH.countDown();
}).start();
DOWN_LATCH.await();
System.out.println("3");
}
}
CyclicBarrier
- CyclicBarrier 設定一個屏障(也可以叫同步點),攔截阻塞一組線程,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運作。
- CyclicBarrier 預設的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴 CyclicBarrier 我已經到達了屏障,然後目前線程被阻塞。
- CyclicBarrier 還提供一個更進階的構造函數 CyclicBarrier(int parties,Runnable barrierAction),用于線上程到達屏障時,優先執行 barrierAction,友善處理更複雜的業務場景。
- getNumberWaiting 方法可以獲得 CyclicBarrier 阻塞的線程數量;isBroken()方法用來了解阻塞的線程是否被中斷。
- CyclicBarrier 的計數器可以使用 reset() 方法重置(CountDownLatch 的計數器隻能使用一次)。是以 CyclicBarrier 能處理更為複雜的業務場景。例如,如果計算發生錯誤,可以重置計數器,并讓線程重新執行一次。
- CyclicBarrier 可以用于多線程計算資料,最後合并計算結果的場景。
- CyclicBarrier 内部采用重入鎖 ReentrantLock 實作。
public class BankWaterService implements Runnable {
// 建立4個屏障,處理完之後執行目前類的run方法
private CyclicBarrier barrier = new CyclicBarrier(4, this);
// 假設有4個計算任務,是以隻啟動4個線程
private Executor executor = Executors.newFixedThreadPool(4);
// 儲存每個任務的計算結果
private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private AtomicInteger atomicInteger = new AtomicInteger(1);
private void count() {
for (int i = 0; i < 4; i++) {
Thread thread = new Thread(() -> {
// 目前任務的計算結果,計算過程忽略
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
// 計算完成,插入一個屏障
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "線程" + atomicInteger.getAndIncrement());
executor.execute(thread);
}
}
@Override
public void run() {
int result = 0;
// 彙總每個任務計算出的結果
for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
result += sheet.getValue();
}
//将結果輸出
sheetBankWaterCount.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
BankWaterService bankWaterCount = new BankWaterService();
bankWaterCount.count();
}
}
Semaphore
- Semaphore(信号量)是用來控制同時通路特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。
- Semaphore 可以用于做流量控制,特别是公用資源有限的應用場景,比如資料庫連接配接。
- Semaphore的構造方法 Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。
- 首先線程使用 Semaphore 的 acquire() 方法擷取一個許可證,使用完之後調用 release() 方法歸還許可證。還可以用 tryAcquire() 方法嘗試擷取許可證。
- intavailablePermits():傳回此信号量中目前可用的許可證數。
- intgetQueueLength():傳回正在等待擷取許可證的線程數。
- hasQueuedThreads():是否有線程正在等待擷取許可證。
- Semaphore 内部使用 AQS 共享鎖實作。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore SEMAPHORE = new Semaphore(10);
private static AtomicInteger ATOMICINTEGER = new AtomicInteger(1);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
EXECUTOR.execute(() -> {
try {
SEMAPHORE.acquire();
System.out.println("save data" + ATOMICINTEGER.getAndIncrement());
SEMAPHORE.release();
} catch (InterruptedException e) {
}
});
}
EXECUTOR.shutdown();
}
}
Exchanger
- Exchanger(交換者)是一個用于線程間協作的工具類 —— 用于線程間的資料交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的資料。這兩個線程通過 exchange 方法交換資料,如果第一個線程先執行 exchange() 方法,它會一直等待第二個線程也執行 exchange 方法。
- 可簡單地将 Exchanger 對象了解為一個包含兩個格子的容器,通過 exchanger 方法可以向兩個格子中填充資訊。當兩個格子中的均被填充時,該對象會自動将兩個格子的資訊交換,然後傳回給線程,進而實作兩個線程的資訊交換。
- Exchanger 可用于遺傳算法。(遺傳算法:需要選出兩個人作為交配對象,這時候會交換兩人的資料,并使用交叉規則得出交配結果)
- Exchanger 可用于校對工作,比如一份資料需要兩個人同時進行校對,都校對無誤後,才能進行後續處理。這時,就可以使用 Exchanger 比較兩份校對結果。
- Exchanger 内部采用無鎖 CAS 實作,Exchange 使用了内部對象 Node 的兩個屬性 — item 、match,分布存儲兩個線程的值。
public class ExchangerTest {
private static final Exchanger<String> exchange = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(() -> {
try {
String result = exchange.exchange("資料A");
System.out.println("A的exchange結果:" + result);
} catch (InterruptedException e) {
}
});
threadPool.execute(() -> {
try {
String result = exchange.exchange("資料B");
System.out.println("B的exchange結果:" + result);
} catch (InterruptedException e) {
}
});
threadPool.shutdown();
}
}
Phaser
多階段栅欄,可以在初始時設定參與線程數,也可以中途注冊/登出參與者,當到達的參與者數量滿足栅欄設定的數量後,會進行階段更新(advance)。以下是 Phaser 的一些基礎概念。
- phase(階段):在任意時間點,Phaser 隻處于某一個 phase(階段),初始階段為 0,最大達到 Integerr.MAX_VALUE,然後再次歸零。當所有 parties 參與者都到達後,phase 值會遞增。
- parties(參與者):指參與線程。Phaser 既可以在初始構造時指定參與者的數量,也可以中途通過register、bulkRegister、arriveAndDeregister等方法注冊/登出參與者。
- arrive(到達):Phaser 注冊完 parties(參與者)之後,參與者的初始狀态是 unarrived 的,當參與者到達(arrive)目前階段(phase)後,狀态就會變成 arrived。
- advance(進階):當階段的到達參與者數滿足條件後(注冊的數量等于到達的數量),階段就會發生進階(advance)—— 也就是phase值+1。
- termination(終止):代表目前 Phaser 對象達到終止狀态。
-
Tiering(分層):一種樹形結構,通過構造函數可以指定目前待構造的 Phaser對象的父結點。
6.1 之是以引入 Tiering,是因為當一個 Phaser 有大量參與者(parties)的時候,内部的同步操作會使性能急劇下降,而分層可以降低競争,進而減小因同步導緻的額外開銷。
6.2 在一個分層 Phasers 的樹結構中,注冊和撤銷子Phaser或父Phaser是自動被管理的。當一個Phaser的參與者(parties)數量變成 0 時,如果有該 Phaser 有父結點,就會将它從父結點中溢移除。
public class PhaserTest1 implements Runnable {
private final Phaser phaser;
public PhaserTest1(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": 開始執行任務,目前phase =" + phaser.getPhase() + "");
// 1.等待其他參與者線程到達
// 2.不響應中斷,也就是說即使目前線程被中斷,arriveAndAwaitAdvance方法也不會傳回或抛出異常,而是繼續等待。如果希望能夠響應中斷,可以參考 awaitAdvanceInterruptibly 方法。
int advance = phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + ": 結束執行任務,目前phase =" + advance + "");
}
public static void main(String[] args) {
Phaser phaser = new Phaser();
for (int i = 0; i < 10; i++) {
// 新增參與者數量
phaser.register();
new Thread(new PhaserTest1(phaser), "Thread-" + i).start();
}
}
}