簡介
生産者-消費者模式是一個經典的多線程設計模式。在生産者-消費者模式中。通常有多個生産者線程和多個消費者模式,生産者線程負責送出使用者請求,消費者線程負責具體處理生産者送出的任務。兩者之間通過共享記憶體緩沖區進行通信。
生産者-消費者模式的核心元件是共享記憶體緩沖區,它的作用是生産者與消費者之間的通信橋梁,避免二者之間的直接通信,有效地降低了二者耦合性。生産者不需要知道消費者的存在,消費者也不需要知道生産者的存在。
衆所周知,緩存的出現是為了平衡不同的處理速度,比如CPU運算速度要高于硬碟讀寫速度,如果沒有緩存,CPU就會有很多的時間在等待硬碟執行完讀寫,發揮不了CPU的高性能。
同樣,這裡的共享記憶體緩沖區也有這個作用,假如生産者和消費者的速度不比對,無論是誰要快一些,都可以在這裡得到緩解。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0zazMWeWJTYoFjMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL5cTO0QzM1ETM0ETNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
生産者-消費者模式的主要角色如下表所示:
角色 | 作用 |
---|---|
生産者 | 用于送出使用者請求,提取使用者任務,并裝入記憶體緩沖區 |
消費者 | 在記憶體緩沖區中提取并處理任務 |
記憶體緩沖區 | 緩存生産者送出的任務或資料,供消費者使用 |
任務 | 生成者向記憶體緩沖區送出的資料結構 |
Main | 使用生産者和消費者的用戶端 |
執行個體
這個模式的重點是在共享記憶體緩沖區為空的時候,生産者需要被喚醒,消費者需要進行等待;當共享記憶體緩沖區滿的時候,消費者需要被喚醒,生産者需要進行等待。這就需要在合适的時候對生産者和消費者休眠和喚醒,也就是notify/wait。這裡使用的BlockingQueue内部已經實作了線程的喚醒和休眠,是以在這個代碼中看不到notify/wait的出現。
這裡假設有若幹個生産者進行資料的生成,将生成的資料放到隊列中,然後有若幹個消費者進行資料的取出,生成這個數的平方。
共享記憶體緩沖區的資料類型
/**
* Created by makersy on 2019
*/
/*
生産者和消費者之間的共享資料
*/
public class Data {
int num;
public Data(int num) {
this.num = num;
}
public Data(String str) {
this.num = Integer.valueOf(str);
}
public int getNum() {
return num;
}
}
生産者
/**
* Created by makersy on 2019
*/
/*
生産者
*/
public class Producer implements Runnable{
//這裡的正在運作标記不共享,但是需要用volatile保證可以實時接收到它的更新
private volatile boolean isRunning = true;
BlockingQueue<Data> queue;
//生成的資料,這裡要使用static保證在多個線程之間共享
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void run() {
Data data = null;
System.out.println("Producer Thread id: " + Thread.currentThread().getId() + " started!");
Random r = new Random();
try {
while (isRunning) {
Thread.sleep(r.nextInt(SLEEPTIME));
data = new Data(count.incrementAndGet());
System.out.println(data.getNum() + " is put into bq");
if (queue.offer(data, 2, TimeUnit.SECONDS)) {
//生産者向bq中添加資料
System.out.println("producer " + Thread.currentThread().getId() + " put data : " + data.getNum());
} else {
System.out.println("producer put data failed!");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.interrupted();
}
}
//停止線程
public void stop() {
isRunning = false;
}
}
消費者
/**
* Created by makersy on 2019
*/
/*
消費者
*/
public class Consumer implements Runnable{
private BlockingQueue<Data> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("Consumer Thread id: " + Thread.currentThread().getId() + " started!");
Random r = new Random();
while (true) {
try {
Data data = queue.take();
if (data != null) {
//取出資料成功,輸出計算平方值
System.out.println(Thread.currentThread().getId() + "計算平方:" + MessageFormat.format("{0} * {1} = {2}", data.getNum(), data.getNum(), data.getNum() * data.getNum()));
Thread.sleep(r.nextInt(SLEEPTIME)); //随機睡眠一定時間,模拟任務執行
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.interrupted();
}
}
}
}
Main方法(使用生産者和消費者的用戶端)
/**
* Created by makersy on 2019
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Data> queue = new LinkedBlockingQueue<>(10);
//建立3個生産者線程和3個消費者線程
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
//線程池
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(10 * 1000); //執行生産者和消費者
producer1.stop();
producer2.stop();
producer3.stop(); //停止生産任務
Thread.sleep(3 * 1000); //等待消費者處理完隊列中的任務
service.shutdown();
System.out.println("結束");
}
}