天天看点

互联网面试-多线程之间如何实现线程通信?

互联网面试-多线程之间如何实现线程通信?

前面我们提到过,当多个线程在争夺同一个资源的时候,为了能够让这些线程能够更加高效的协作,来提升CPU的使用效率,我们可以采取线程之间通信的方式来实现,下面我们就来看看具体的操作。

使用线程通信来解决多线程协作问题

多线程之间的协作,可以通过Java提供的一些函数包括wait()、notifyAll()、notify()等实现。

wait()

使得当前线程处于等待状态,直到其他线程调用了notify()方法或者是notifyAll()方法。

public final native void wait(long timeout) throws InterruptedException;           

notify()

这个方法用来唤醒处于等待状态的单个线程,如果有多个线程等待,则随机唤醒一个。

public final native void notify();           

notifyAll()

唤醒处于监视器上的所有等待线程。

public final native void notifyAll();           

简单来讲,wait()方法用来阻塞线程,而notify()和notifyAll()则是用来唤醒线程,成为就绪状态。这三个方法都是定义在Object类中的native方法,并不是由Thread类提供,这个是因为Java对于锁的使用是对象级的,这个在之前的介绍中提到过。并且这三个方法必须都在以Synchroinzed关键字修饰的方法或者是代码块中进行使用,否则就会抛出IllegalMonitorStateException 异常。

下面,我们就来通过一个小例子来看看,线程之间是如何实现通信管理的。模拟生产者不断得生产消息,而消费者不断的消费消息,并且消息缓冲区的容量是20。

1、当缓冲区的容量达到20的时候,生产者将无法在生产消息,生产者就会通过wait()方法进入到阻塞状态,一直到容量减小到20以下,生产者则开始继续生产消息,这个时候就通过notify()方法或者是notifyAll()方法进行唤醒然后再去生产消息。

2、当容量为0的时候,消费者将无法在消费消息,这个时候消费者调用wati()方法进入阻塞状态,等待生产者生产消息之后,再调用notify()或者是notifyAll()方法来消费消息。

代码如下

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Message {
    //生产消息总量
    int count = 20;

    //生产者
    public synchronized void productMessage() {
        try {
            if (count<20) {
                System.out.println("生产 消息"+count);
                Thread.currentThread().sleep(100);
              	count++;
                notifyAll();
            } else {
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //消费者
    public synchronized void resumeMessage() {
        try {
            if (count>0) {
              	System.out.println("消费 消息"+ count);
                Thread.currentThread().sleep(100);
                count--;
                notifyAll();
                
            } else {
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

消息生产者

public class MessageProducter implements Runnable {
    //共享缓存区
    private Message message;
    //多线程的执行状态,用于控制线程的启停
    private volatile boolean isRunning = true;

    public MessageProducter(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        while (isRunning) {
            message.productMessage();
        }
    }

    //停止当前线程
    public void stop() {
        this.isRunning = false;
    }
}
           

消息消费者

//消费者
public class MessageConsumer implements Runnable {
    //共享缓存区
    private Message message;

    public MessageConsumer(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        while (true) {
            message.resumeMessage();
        }
    }
}
           

测试主方法

public class TestProducerAndConsumer {

    public static void main(String[] args) throws Exception {
        Message message = new Message();
        //生产者
        MessageProducter messageProducer1 = new MessageProducter(message);
       	MessageProducter messageProducer2 = new MessageProducter(message);
        //消费者
        MessageConsumer messageConsumer1 = new MessageConsumer(message);
        MessageConsumer messageConsumer2 = new MessageConsumer(message);
        
      	messageProducer1.start();
      	messageProducer2.start();
      
        messageConsumer1.start();
        messageConsumer2.start();
    }
}           

上面的例子是一个非常简单的生产者消费者的例子,但是在实际的操作中,我们知道,并不能保证生产多少就消费多少,并且JVM中频繁的创建和销毁线程也是对资源的浪费非常大。所以就出现了队列、线程池等实现方案。下面我们通过一个例子来看看队列和线程池来实现的线程协作

使用队列和线程池来实现线程通信

首先来创建一个消息实体类

public class Message {
	private int id ; 
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
}
           

创建一个消息存储类

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MessageStock {
    //统计一共生产了多少消息
    private static int count = 0;
    //存放Message对象的共享缓冲区
    private BlockingQueue<Message> queue;

    public MessageStock(BlockingQueue<Message> queue) {
        this.queue = queue;
    }

    //生产者
    public synchronized void productMessage() {
        try {
            Message message = new Message();
            //向Message队列增加一个Message对象
            boolean success = this.queue.offer(message, 2, TimeUnit.SECONDS);
            if (success) {
                int id = ++count;
                message.setId(id);
                System.out.println("生产消息,编号:" + id + ",数据库存:" + queue.size());
                Thread.sleep((int) (1000 * Math.random()));
                notifyAll();
            } else {
                System.out.println("生产失败....");
            }
            if (queue.size() < 100) {
            } else {

                System.out.println("数据库存已满,等待消费...");
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //消费车
    public synchronized void resumeMessage() {
        try {
            // 从Message队列中,拿走一个Message对象
            Message message = this.queue.poll(2, TimeUnit.SECONDS);
            if (message != null) {
                Thread.sleep((int) (1000 * Math.random()));
                notifyAll();
                System.out.println("消费Message,编号:" + message.getId() + ",数据库存: " + queue.size());
            } else {
                System.out.println("消费Message失败....");
            }
            if (queue.size() > 0) {

            } else {
                System.out.println("数据为空,等待生产...");
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

消息生产者

public class MessageProducter implements Runnable {
    //共享缓存区
    private MessageStock messagePool;
    //多线程的执行状态,用于控制线程的启停
    private volatile boolean isRunning = true;

    public MessageProducter(MessageStock messagePool) {
        this.messagePool = messagePool;
    }

    @Override
    public void run() {
        while (isRunning) {
            messagePool.productMessage();
        }
    }

    //停止当前线程
    public void stop() {
        this.isRunning = false;
    }
}
           

消息消费者

//消费者
public class MessageConsumer implements Runnable {
    //共享缓存区:Message队列
    private MessageStock carPool;

    public MessageConsumer(MessageStock messagePool) {
        this.messagePool = messagePool;
    }

    @Override
    public void run() {
        while (true) {
            messagePool.resumeMessage();
        }
    }
}
           

测试类

public class TestProducerAndConsumer {

    public static void main(String[] args) throws Exception {
        //共享缓存区:Message队列
        BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>(100);
        //Message库存,包含了queue队列
        MessageStock messageStock = new MessageStock(queue);
        //生产者
        MessageProducter messageProducter1 = new MessageProducter(messageStock);
        MessageProducter messageProducter2 = new MessageProducter(messageStock);
        //消费者
        MessageConsumer messageConsumer1 = new MessageConsumer(messageStock);
        MessageConsumer messageConsumer2 = new MessageConsumer(messageStock);
        //将生产者和消费者加入线程池运行
        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(messageProducter1);
        cachePool.execute(messageProducter2);
        cachePool.execute(messageConsumer1);
        cachePool.execute(messageConsumer2);
    }
}           

继续阅读