前面我们提到过,当多个线程在争夺同一个资源的时候,为了能够让这些线程能够更加高效的协作,来提升CPU的使用效率,我们可以采取线程之间通信的方式来实现,下面我们就来看看具体的操作。
使用线程通信来解决多线程协作问题
多线程之间的协作,可以通过Java提供的一些函数包括wait()、notifyAll()、notify()等实现。
wait()
使得当前线程处于等待状态,直到其他线程调用了notify()方法或者是notifyAll()方法。
public final native void wait(long timeout) throws InterruptedException;
notify()
这个方法用来唤醒处于等待状态的单个线程,如果有多个线程等待,则随机唤醒一个。
public final native void notify();
notifyAll()
唤醒处于监视器上的所有等待线程。
public final native void notifyAll();
简单来讲,wait()方法用来阻塞线程,而notify()和notifyAll()则是用来唤醒线程,成为就绪状态。这三个方法都是定义在Object类中的native方法,并不是由Thread类提供,这个是因为Java对于锁的使用是对象级的,这个在之前的介绍中提到过。并且这三个方法必须都在以Synchroinzed关键字修饰的方法或者是代码块中进行使用,否则就会抛出IllegalMonitorStateException 异常。
下面,我们就来通过一个小例子来看看,线程之间是如何实现通信管理的。模拟生产者不断得生产消息,而消费者不断的消费消息,并且消息缓冲区的容量是20。
1、当缓冲区的容量达到20的时候,生产者将无法在生产消息,生产者就会通过wait()方法进入到阻塞状态,一直到容量减小到20以下,生产者则开始继续生产消息,这个时候就通过notify()方法或者是notifyAll()方法进行唤醒然后再去生产消息。
2、当容量为0的时候,消费者将无法在消费消息,这个时候消费者调用wati()方法进入阻塞状态,等待生产者生产消息之后,再调用notify()或者是notifyAll()方法来消费消息。
代码如下
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Message {
//生产消息总量
int count = 20;
//生产者
public synchronized void productMessage() {
try {
if (count<20) {
System.out.println("生产 消息"+count);
Thread.currentThread().sleep(100);
count++;
notifyAll();
} else {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消费者
public synchronized void resumeMessage() {
try {
if (count>0) {
System.out.println("消费 消息"+ count);
Thread.currentThread().sleep(100);
count--;
notifyAll();
} else {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消息生产者
public class MessageProducter implements Runnable {
//共享缓存区
private Message message;
//多线程的执行状态,用于控制线程的启停
private volatile boolean isRunning = true;
public MessageProducter(Message message) {
this.message = message;
}
@Override
public void run() {
while (isRunning) {
message.productMessage();
}
}
//停止当前线程
public void stop() {
this.isRunning = false;
}
}
消息消费者
//消费者
public class MessageConsumer implements Runnable {
//共享缓存区
private Message message;
public MessageConsumer(Message message) {
this.message = message;
}
@Override
public void run() {
while (true) {
message.resumeMessage();
}
}
}
测试主方法
public class TestProducerAndConsumer {
public static void main(String[] args) throws Exception {
Message message = new Message();
//生产者
MessageProducter messageProducer1 = new MessageProducter(message);
MessageProducter messageProducer2 = new MessageProducter(message);
//消费者
MessageConsumer messageConsumer1 = new MessageConsumer(message);
MessageConsumer messageConsumer2 = new MessageConsumer(message);
messageProducer1.start();
messageProducer2.start();
messageConsumer1.start();
messageConsumer2.start();
}
}
上面的例子是一个非常简单的生产者消费者的例子,但是在实际的操作中,我们知道,并不能保证生产多少就消费多少,并且JVM中频繁的创建和销毁线程也是对资源的浪费非常大。所以就出现了队列、线程池等实现方案。下面我们通过一个例子来看看队列和线程池来实现的线程协作
使用队列和线程池来实现线程通信
首先来创建一个消息实体类
public class Message {
private int id ;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
}
创建一个消息存储类
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MessageStock {
//统计一共生产了多少消息
private static int count = 0;
//存放Message对象的共享缓冲区
private BlockingQueue<Message> queue;
public MessageStock(BlockingQueue<Message> queue) {
this.queue = queue;
}
//生产者
public synchronized void productMessage() {
try {
Message message = new Message();
//向Message队列增加一个Message对象
boolean success = this.queue.offer(message, 2, TimeUnit.SECONDS);
if (success) {
int id = ++count;
message.setId(id);
System.out.println("生产消息,编号:" + id + ",数据库存:" + queue.size());
Thread.sleep((int) (1000 * Math.random()));
notifyAll();
} else {
System.out.println("生产失败....");
}
if (queue.size() < 100) {
} else {
System.out.println("数据库存已满,等待消费...");
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消费车
public synchronized void resumeMessage() {
try {
// 从Message队列中,拿走一个Message对象
Message message = this.queue.poll(2, TimeUnit.SECONDS);
if (message != null) {
Thread.sleep((int) (1000 * Math.random()));
notifyAll();
System.out.println("消费Message,编号:" + message.getId() + ",数据库存: " + queue.size());
} else {
System.out.println("消费Message失败....");
}
if (queue.size() > 0) {
} else {
System.out.println("数据为空,等待生产...");
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消息生产者
public class MessageProducter implements Runnable {
//共享缓存区
private MessageStock messagePool;
//多线程的执行状态,用于控制线程的启停
private volatile boolean isRunning = true;
public MessageProducter(MessageStock messagePool) {
this.messagePool = messagePool;
}
@Override
public void run() {
while (isRunning) {
messagePool.productMessage();
}
}
//停止当前线程
public void stop() {
this.isRunning = false;
}
}
消息消费者
//消费者
public class MessageConsumer implements Runnable {
//共享缓存区:Message队列
private MessageStock carPool;
public MessageConsumer(MessageStock messagePool) {
this.messagePool = messagePool;
}
@Override
public void run() {
while (true) {
messagePool.resumeMessage();
}
}
}
测试类
public class TestProducerAndConsumer {
public static void main(String[] args) throws Exception {
//共享缓存区:Message队列
BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>(100);
//Message库存,包含了queue队列
MessageStock messageStock = new MessageStock(queue);
//生产者
MessageProducter messageProducter1 = new MessageProducter(messageStock);
MessageProducter messageProducter2 = new MessageProducter(messageStock);
//消费者
MessageConsumer messageConsumer1 = new MessageConsumer(messageStock);
MessageConsumer messageConsumer2 = new MessageConsumer(messageStock);
//将生产者和消费者加入线程池运行
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(messageProducter1);
cachePool.execute(messageProducter2);
cachePool.execute(messageConsumer1);
cachePool.execute(messageConsumer2);
}
}