天天看點

【Java】使用BlockingQueue實作生産者-消費者模式

簡介

生産者-消費者模式是一個經典的多線程設計模式。在生産者-消費者模式中。通常有多個生産者線程和多個消費者模式,生産者線程負責送出使用者請求,消費者線程負責具體處理生産者送出的任務。兩者之間通過共享記憶體緩沖區進行通信。

生産者-消費者模式的核心元件是共享記憶體緩沖區,它的作用是生産者與消費者之間的通信橋梁,避免二者之間的直接通信,有效地降低了二者耦合性。生産者不需要知道消費者的存在,消費者也不需要知道生産者的存在。

衆所周知,緩存的出現是為了平衡不同的處理速度,比如CPU運算速度要高于硬碟讀寫速度,如果沒有緩存,CPU就會有很多的時間在等待硬碟執行完讀寫,發揮不了CPU的高性能。

同樣,這裡的共享記憶體緩沖區也有這個作用,假如生産者和消費者的速度不比對,無論是誰要快一些,都可以在這裡得到緩解。

【Java】使用BlockingQueue實作生産者-消費者模式

生産者-消費者模式的主要角色如下表所示:

角色 作用
生産者 用于送出使用者請求,提取使用者任務,并裝入記憶體緩沖區
消費者 在記憶體緩沖區中提取并處理任務
記憶體緩沖區 緩存生産者送出的任務或資料,供消費者使用
任務 生成者向記憶體緩沖區送出的資料結構
Main 使用生産者和消費者的用戶端

執行個體

這個模式的重點是在共享記憶體緩沖區為空的時候,生産者需要被喚醒,消費者需要進行等待;當共享記憶體緩沖區滿的時候,消費者需要被喚醒,生産者需要進行等待。這就需要在合适的時候對生産者和消費者休眠和喚醒,也就是notify/wait。這裡使用的BlockingQueue内部已經實作了線程的喚醒和休眠,是以在這個代碼中看不到notify/wait的出現。

這裡假設有若幹個生産者進行資料的生成,将生成的資料放到隊列中,然後有若幹個消費者進行資料的取出,生成這個數的平方。

共享記憶體緩沖區的資料類型

/**
 * Created by makersy on 2019
 */

/*
生産者和消費者之間的共享資料
 */
public class Data {

    int num;

    public Data(int num) {
        this.num = num;
    }

    public Data(String str) {
        this.num = Integer.valueOf(str);
    }

    public int getNum() {
        return num;
    }
}

           

生産者

/**
 * Created by makersy on 2019
 */

/*
生産者
 */
public class Producer implements Runnable{

    //這裡的正在運作标記不共享,但是需要用volatile保證可以實時接收到它的更新
    private volatile boolean isRunning = true;
    BlockingQueue<Data> queue;
    //生成的資料,這裡要使用static保證在多個線程之間共享
    private static AtomicInteger count = new AtomicInteger();
    private static final int SLEEPTIME = 1000;


    public Producer(BlockingQueue<Data> queue) {
        this.queue = queue;
    }


    @Override
    public void run() {
        Data data = null;

        System.out.println("Producer Thread id: " + Thread.currentThread().getId() + " started!");
        Random r = new Random();
        try {
            while (isRunning) {
                Thread.sleep(r.nextInt(SLEEPTIME));
                data = new Data(count.incrementAndGet());
                System.out.println(data.getNum() + " is put into bq");
                if (queue.offer(data, 2, TimeUnit.SECONDS)) {
                    //生産者向bq中添加資料
                    System.out.println("producer " + Thread.currentThread().getId() + " put data : " + data.getNum());
                } else {
                    System.out.println("producer put data failed!");
                }
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.interrupted();
        }
    }

    //停止線程
    public void stop() {
        isRunning = false;
    }

}

           

消費者

/**
 * Created by makersy on 2019
 */

/*
消費者
 */
public class Consumer implements Runnable{

    private BlockingQueue<Data> queue;
    private static final int SLEEPTIME = 1000;

    public Consumer(BlockingQueue<Data> queue) {
        this.queue = queue;
    }


    @Override
    public void run() {
        System.out.println("Consumer Thread id: " + Thread.currentThread().getId() + " started!");
        Random r = new Random();
        while (true) {
            try {
                Data data = queue.take();
                if (data != null) {
                    //取出資料成功,輸出計算平方值
                    System.out.println(Thread.currentThread().getId() + "計算平方:" + MessageFormat.format("{0} * {1} = {2}", data.getNum(), data.getNum(), data.getNum() * data.getNum()));
                    Thread.sleep(r.nextInt(SLEEPTIME));  //随機睡眠一定時間,模拟任務執行
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.interrupted();
            }
        }
    }
}

           

Main方法(使用生産者和消費者的用戶端)

/**
 * Created by makersy on 2019
 */

public class Main {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Data> queue = new LinkedBlockingQueue<>(10);
        //建立3個生産者線程和3個消費者線程
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);
        //線程池
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer1);
        service.execute(consumer2);
        service.execute(consumer3);
        Thread.sleep(10 * 1000);  //執行生産者和消費者
        producer1.stop();
        producer2.stop();
        producer3.stop();	//停止生産任務
        Thread.sleep(3 * 1000);  //等待消費者處理完隊列中的任務
        service.shutdown();
        System.out.println("結束");
    }
}