前面我們提到過,當多個線程在争奪同一個資源的時候,為了能夠讓這些線程能夠更加高效的協作,來提升CPU的使用效率,我們可以采取線程之間通信的方式來實作,下面我們就來看看具體的操作。
使用線程通信來解決多線程協作問題
多線程之間的協作,可以通過Java提供的一些函數包括wait()、notifyAll()、notify()等實作。
wait()
使得目前線程處于等待狀态,直到其他線程調用了notify()方法或者是notifyAll()方法。
public final native void wait(long timeout) throws InterruptedException;
notify()
這個方法用來喚醒處于等待狀态的單個線程,如果有多個線程等待,則随機喚醒一個。
public final native void notify();
notifyAll()
喚醒處于螢幕上的所有等待線程。
public final native void notifyAll();
簡單來講,wait()方法用來阻塞線程,而notify()和notifyAll()則是用來喚醒線程,成為就緒狀态。這三個方法都是定義在Object類中的native方法,并不是由Thread類提供,這個是因為Java對于鎖的使用是對象級的,這個在之前的介紹中提到過。并且這三個方法必須都在以Synchroinzed關鍵字修飾的方法或者是代碼塊中進行使用,否則就會抛出IllegalMonitorStateException 異常。
下面,我們就來通過一個小例子來看看,線程之間是如何實作通信管理的。模拟生産者不斷得生産消息,而消費者不斷的消費消息,并且消息緩沖區的容量是20。
1、當緩沖區的容量達到20的時候,生産者将無法在生産消息,生産者就會通過wait()方法進入到阻塞狀态,一直到容量減小到20以下,生産者則開始繼續生産消息,這個時候就通過notify()方法或者是notifyAll()方法進行喚醒然後再去生産消息。
2、當容量為0的時候,消費者将無法在消費消息,這個時候消費者調用wati()方法進入阻塞狀态,等待生産者生産消息之後,再調用notify()或者是notifyAll()方法來消費消息。
代碼如下
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Message {
//生産消息總量
int count = 20;
//生産者
public synchronized void productMessage() {
try {
if (count<20) {
System.out.println("生産 消息"+count);
Thread.currentThread().sleep(100);
count++;
notifyAll();
} else {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消費者
public synchronized void resumeMessage() {
try {
if (count>0) {
System.out.println("消費 消息"+ count);
Thread.currentThread().sleep(100);
count--;
notifyAll();
} else {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消息生産者
public class MessageProducter implements Runnable {
//共享緩存區
private Message message;
//多線程的執行狀态,用于控制線程的啟停
private volatile boolean isRunning = true;
public MessageProducter(Message message) {
this.message = message;
}
@Override
public void run() {
while (isRunning) {
message.productMessage();
}
}
//停止目前線程
public void stop() {
this.isRunning = false;
}
}
消息消費者
//消費者
public class MessageConsumer implements Runnable {
//共享緩存區
private Message message;
public MessageConsumer(Message message) {
this.message = message;
}
@Override
public void run() {
while (true) {
message.resumeMessage();
}
}
}
測試主方法
public class TestProducerAndConsumer {
public static void main(String[] args) throws Exception {
Message message = new Message();
//生産者
MessageProducter messageProducer1 = new MessageProducter(message);
MessageProducter messageProducer2 = new MessageProducter(message);
//消費者
MessageConsumer messageConsumer1 = new MessageConsumer(message);
MessageConsumer messageConsumer2 = new MessageConsumer(message);
messageProducer1.start();
messageProducer2.start();
messageConsumer1.start();
messageConsumer2.start();
}
}
上面的例子是一個非常簡單的生産者消費者的例子,但是在實際的操作中,我們知道,并不能保證生産多少就消費多少,并且JVM中頻繁的建立和銷毀線程也是對資源的浪費非常大。是以就出現了隊列、線程池等實作方案。下面我們通過一個例子來看看隊列和線程池來實作的線程協作
使用隊列和線程池來實作線程通信
首先來建立一個消息實體類
public class Message {
private int id ;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
}
建立一個消息存儲類
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MessageStock {
//統計一共生産了多少消息
private static int count = 0;
//存放Message對象的共享緩沖區
private BlockingQueue<Message> queue;
public MessageStock(BlockingQueue<Message> queue) {
this.queue = queue;
}
//生産者
public synchronized void productMessage() {
try {
Message message = new Message();
//向Message隊列增加一個Message對象
boolean success = this.queue.offer(message, 2, TimeUnit.SECONDS);
if (success) {
int id = ++count;
message.setId(id);
System.out.println("生産消息,編号:" + id + ",資料庫存:" + queue.size());
Thread.sleep((int) (1000 * Math.random()));
notifyAll();
} else {
System.out.println("生産失敗....");
}
if (queue.size() < 100) {
} else {
System.out.println("資料庫存已滿,等待消費...");
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消費車
public synchronized void resumeMessage() {
try {
// 從Message隊列中,拿走一個Message對象
Message message = this.queue.poll(2, TimeUnit.SECONDS);
if (message != null) {
Thread.sleep((int) (1000 * Math.random()));
notifyAll();
System.out.println("消費Message,編号:" + message.getId() + ",資料庫存: " + queue.size());
} else {
System.out.println("消費Message失敗....");
}
if (queue.size() > 0) {
} else {
System.out.println("資料為空,等待生産...");
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消息生産者
public class MessageProducter implements Runnable {
//共享緩存區
private MessageStock messagePool;
//多線程的執行狀态,用于控制線程的啟停
private volatile boolean isRunning = true;
public MessageProducter(MessageStock messagePool) {
this.messagePool = messagePool;
}
@Override
public void run() {
while (isRunning) {
messagePool.productMessage();
}
}
//停止目前線程
public void stop() {
this.isRunning = false;
}
}
消息消費者
//消費者
public class MessageConsumer implements Runnable {
//共享緩存區:Message隊列
private MessageStock carPool;
public MessageConsumer(MessageStock messagePool) {
this.messagePool = messagePool;
}
@Override
public void run() {
while (true) {
messagePool.resumeMessage();
}
}
}
測試類
public class TestProducerAndConsumer {
public static void main(String[] args) throws Exception {
//共享緩存區:Message隊列
BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>(100);
//Message庫存,包含了queue隊列
MessageStock messageStock = new MessageStock(queue);
//生産者
MessageProducter messageProducter1 = new MessageProducter(messageStock);
MessageProducter messageProducter2 = new MessageProducter(messageStock);
//消費者
MessageConsumer messageConsumer1 = new MessageConsumer(messageStock);
MessageConsumer messageConsumer2 = new MessageConsumer(messageStock);
//将生産者和消費者加入線程池運作
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(messageProducter1);
cachePool.execute(messageProducter2);
cachePool.execute(messageConsumer1);
cachePool.execute(messageConsumer2);
}
}