天天看點

Java中的并發工具類CountDownLatch同步屏障CyclicBarrier控制線程并發線程數的Semaphore線程間交換資料的Exchanger

文章目錄

  • CountDownLatch
    • 代碼示例
  • 同步屏障CyclicBarrier
    • 代碼示例
    • CyclicBarrier和CountDownLatch的差別
  • 控制線程并發線程數的Semaphore
    • 代碼示例
  • 線程間交換資料的Exchanger
    • 代碼示例

CountDownLatch

它允許一個或多個線程等待其他線程完成操作,相當于join()的功能,但比join()的功能更多。

代碼示例

import java.util.concurrent.CountDownLatch;

/**
 * @author 南風
 * @date 2020/2/6-12:53
 */
public class CountDownLatchTest {

    /**
     * 1、構造函數接收一個int類型的參數作為計數器,想等待N個點完成,就傳入N
     * 2、這裡的N可以是N個線程,也可以是一個線程裡面的N個執行步驟
     * 3、如果用在多個線程,隻需要把這個CountDownLatch引用傳遞到線程裡即可
     * 4、N必須大于0,如果等于0,則調用await()方法不會阻塞目前線程
     */
    
    static CountDownLatch downLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                //調用countDown()方法時,N就會減1
                downLatch.countDown();
                System.out.println(2);
                downLatch.countDown();
            }
        }).start();
        //調用await()方法就會阻塞目前線程,直到N變成0
        //可以使用await(long time, TimeUnit unit)方法來等待指定時間後就不再等待
        downLatch.await();
        System.out.println(3);
    }
}

           

執行結果

1
2
3
           

同步屏障CyclicBarrier

讓一組線程達到一個屏障(也叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續執行。

代碼示例

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @author 南風
 * @date 2020/2/6-13:07
 */
public class CyclicBarrierTest {

    /**
     * 1、預設參數是int類型的,表示屏障攔截的線程數量
     * 2、還有一個構造函數 CyclicBarrier(int parties, Runnable barrierAction),用于線上程到達屏障時,優先執行barrierAction線程
     * 3、如果這裡參數為3,則主線程和子線程會永遠等待,因為沒有第三個線程執行await()方法
     */
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //調用此方法告訴CyclicBarrier我已經到達了屏障,然後目前線程被阻塞
                    //還有個方法await(long timeout, TimeUnit unit),等待指定時間後不再等待
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(1);
            }
        }).start();
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

           

運作結果

1
2
或
2
1
           

應用場景

用于多線程計算資料,最後合并計算結果的場景。

CyclicBarrier和CountDownLatch的差別

  1. CountDownLatch

    的計數器隻能使用一次,而

    CyclicBarrier

    的計數器可以使用reset()方法重置
  2. CyclicBarrier

    還提供其他有用方法,比如

    getNumberWaiting()

    方法可以獲得

    CyclicBarrier

    阻塞的線程數量;

    isBroken()

    方法用來了解阻塞的線程是否被中斷。

控制線程并發線程數的Semaphore

Semaphore(信号量)是用來控制同時通路特定資源的線程數量,通過協調各個線程,以保證合理的使用公共資源。

代碼示例

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
* @author 南風
* @date 2020/2/6-13:28
*/
public class SemaphoreTest {

   /**
    * 建立容量為30的線程池
    */
   static ExecutorService service = Executors.newFixedThreadPool(30);

   /**
    * 建立允許同時執行10個線程的信号量,接收整型參數,表示可用的許可證數量
    */
   static Semaphore s = new Semaphore(10);

   public static void main(String[] args) {
       //建立30個線程
       for(int i=0; i<30; i++){
           service.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       //使用該方法擷取一個許可證
                       //可以使用acquire(int permits)方法擷取指定個許可證
                       //可以使用tryAcquire()方法嘗試擷取許可證
                       s.acquire();
                       System.out.println("save data");
                       //使用該方法歸還許可證
                       //可以使用release(int permits)方法歸還指定個許可證
                       s.release();
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }
           });
       }
       //關閉線程池
       service.shutdown();
   }
}

           

應用場景: 用作流量控制,比如資料庫連接配接

其他方法

  • int availablePermits()

    :傳回此信号量中目前可用的許可證數
  • int getQueueLength()

    :傳回正在等待擷取許可證的線程數
  • boolean hasQueuedThreads()

    :是否有線程正在等待擷取許可證

線程間交換資料的Exchanger

進行線程間的資料交換,提供一個同步點,兩個線程可以在同步點交換彼此的資料。

代碼示例

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author 南風
 * @date 2020/2/6-13:47
 */
public class ExchangerTest {

    /**
     * 建立交換資料為String類型的Exchanger
     */
    static final Exchanger<String> ex = new Exchanger<String>();

    static ExecutorService service = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        service.execute(new Runnable() {
            @Override
            public void run() {
                String A = "銀行流水A";
                try {
                    // 第一個線程執行exchange(String str)方法,它會一直等待第二個線程也執行exchange(String str)方法
                    // 當第二個線程執行交換資料的方法後,将從第二個線程交換的資料傳回,同時将自己的資料交換給第二個線程
                    // 參數表示要交換的值
                    // 可以使用exchange(V x, long timeout, TimeUnit unit)方法,設定最大等待時長
                    String str = ex.exchange(A);
                    System.out.println(Thread.currentThread().getName()+":"+"從第二個線程交換回來的值:"+str);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        service.execute(new Runnable() {
            @Override
            public void run() {
                String B = "銀行流水B";
                try {
                    //第二個線程執行完exchange方法後,即到達同步點時,這兩個線程交換資料,将本線程的資料傳遞給對方
                    String str = ex.exchange(B);
                    System.out.println(Thread.currentThread().getName()+":"+"從第一個線程交換回來的值:"+str);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //關閉線程池
        service.shutdown();
    }
}

           

運作結果

pool-1-thread-1:從第二個線程交換回來的值:銀行流水B
pool-1-thread-2:從第一個線程交換回來的值:銀行流水A
           

應用場景

  • 遺傳算法,使用交叉規則進行遺傳
  • 校對工作,比如銀行的AB崗電子銀行流水錄入