天天看點

網際網路面試-多線程之間如何實作線程通信?

網際網路面試-多線程之間如何實作線程通信?

前面我們提到過,當多個線程在争奪同一個資源的時候,為了能夠讓這些線程能夠更加高效的協作,來提升CPU的使用效率,我們可以采取線程之間通信的方式來實作,下面我們就來看看具體的操作。

使用線程通信來解決多線程協作問題

多線程之間的協作,可以通過Java提供的一些函數包括wait()、notifyAll()、notify()等實作。

wait()

使得目前線程處于等待狀态,直到其他線程調用了notify()方法或者是notifyAll()方法。

public final native void wait(long timeout) throws InterruptedException;           

notify()

這個方法用來喚醒處于等待狀态的單個線程,如果有多個線程等待,則随機喚醒一個。

public final native void notify();           

notifyAll()

喚醒處于螢幕上的所有等待線程。

public final native void notifyAll();           

簡單來講,wait()方法用來阻塞線程,而notify()和notifyAll()則是用來喚醒線程,成為就緒狀态。這三個方法都是定義在Object類中的native方法,并不是由Thread類提供,這個是因為Java對于鎖的使用是對象級的,這個在之前的介紹中提到過。并且這三個方法必須都在以Synchroinzed關鍵字修飾的方法或者是代碼塊中進行使用,否則就會抛出IllegalMonitorStateException 異常。

下面,我們就來通過一個小例子來看看,線程之間是如何實作通信管理的。模拟生産者不斷得生産消息,而消費者不斷的消費消息,并且消息緩沖區的容量是20。

1、當緩沖區的容量達到20的時候,生産者将無法在生産消息,生産者就會通過wait()方法進入到阻塞狀态,一直到容量減小到20以下,生産者則開始繼續生産消息,這個時候就通過notify()方法或者是notifyAll()方法進行喚醒然後再去生産消息。

2、當容量為0的時候,消費者将無法在消費消息,這個時候消費者調用wati()方法進入阻塞狀态,等待生産者生産消息之後,再調用notify()或者是notifyAll()方法來消費消息。

代碼如下

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Message {
    //生産消息總量
    int count = 20;

    //生産者
    public synchronized void productMessage() {
        try {
            if (count<20) {
                System.out.println("生産 消息"+count);
                Thread.currentThread().sleep(100);
              	count++;
                notifyAll();
            } else {
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //消費者
    public synchronized void resumeMessage() {
        try {
            if (count>0) {
              	System.out.println("消費 消息"+ count);
                Thread.currentThread().sleep(100);
                count--;
                notifyAll();
                
            } else {
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

消息生産者

public class MessageProducter implements Runnable {
    //共享緩存區
    private Message message;
    //多線程的執行狀态,用于控制線程的啟停
    private volatile boolean isRunning = true;

    public MessageProducter(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        while (isRunning) {
            message.productMessage();
        }
    }

    //停止目前線程
    public void stop() {
        this.isRunning = false;
    }
}
           

消息消費者

//消費者
public class MessageConsumer implements Runnable {
    //共享緩存區
    private Message message;

    public MessageConsumer(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        while (true) {
            message.resumeMessage();
        }
    }
}
           

測試主方法

public class TestProducerAndConsumer {

    public static void main(String[] args) throws Exception {
        Message message = new Message();
        //生産者
        MessageProducter messageProducer1 = new MessageProducter(message);
       	MessageProducter messageProducer2 = new MessageProducter(message);
        //消費者
        MessageConsumer messageConsumer1 = new MessageConsumer(message);
        MessageConsumer messageConsumer2 = new MessageConsumer(message);
        
      	messageProducer1.start();
      	messageProducer2.start();
      
        messageConsumer1.start();
        messageConsumer2.start();
    }
}           

上面的例子是一個非常簡單的生産者消費者的例子,但是在實際的操作中,我們知道,并不能保證生産多少就消費多少,并且JVM中頻繁的建立和銷毀線程也是對資源的浪費非常大。是以就出現了隊列、線程池等實作方案。下面我們通過一個例子來看看隊列和線程池來實作的線程協作

使用隊列和線程池來實作線程通信

首先來建立一個消息實體類

public class Message {
	private int id ; 
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
}
           

建立一個消息存儲類

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MessageStock {
    //統計一共生産了多少消息
    private static int count = 0;
    //存放Message對象的共享緩沖區
    private BlockingQueue<Message> queue;

    public MessageStock(BlockingQueue<Message> queue) {
        this.queue = queue;
    }

    //生産者
    public synchronized void productMessage() {
        try {
            Message message = new Message();
            //向Message隊列增加一個Message對象
            boolean success = this.queue.offer(message, 2, TimeUnit.SECONDS);
            if (success) {
                int id = ++count;
                message.setId(id);
                System.out.println("生産消息,編号:" + id + ",資料庫存:" + queue.size());
                Thread.sleep((int) (1000 * Math.random()));
                notifyAll();
            } else {
                System.out.println("生産失敗....");
            }
            if (queue.size() < 100) {
            } else {

                System.out.println("資料庫存已滿,等待消費...");
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //消費車
    public synchronized void resumeMessage() {
        try {
            // 從Message隊列中,拿走一個Message對象
            Message message = this.queue.poll(2, TimeUnit.SECONDS);
            if (message != null) {
                Thread.sleep((int) (1000 * Math.random()));
                notifyAll();
                System.out.println("消費Message,編号:" + message.getId() + ",資料庫存: " + queue.size());
            } else {
                System.out.println("消費Message失敗....");
            }
            if (queue.size() > 0) {

            } else {
                System.out.println("資料為空,等待生産...");
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

消息生産者

public class MessageProducter implements Runnable {
    //共享緩存區
    private MessageStock messagePool;
    //多線程的執行狀态,用于控制線程的啟停
    private volatile boolean isRunning = true;

    public MessageProducter(MessageStock messagePool) {
        this.messagePool = messagePool;
    }

    @Override
    public void run() {
        while (isRunning) {
            messagePool.productMessage();
        }
    }

    //停止目前線程
    public void stop() {
        this.isRunning = false;
    }
}
           

消息消費者

//消費者
public class MessageConsumer implements Runnable {
    //共享緩存區:Message隊列
    private MessageStock carPool;

    public MessageConsumer(MessageStock messagePool) {
        this.messagePool = messagePool;
    }

    @Override
    public void run() {
        while (true) {
            messagePool.resumeMessage();
        }
    }
}
           

測試類

public class TestProducerAndConsumer {

    public static void main(String[] args) throws Exception {
        //共享緩存區:Message隊列
        BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>(100);
        //Message庫存,包含了queue隊列
        MessageStock messageStock = new MessageStock(queue);
        //生産者
        MessageProducter messageProducter1 = new MessageProducter(messageStock);
        MessageProducter messageProducter2 = new MessageProducter(messageStock);
        //消費者
        MessageConsumer messageConsumer1 = new MessageConsumer(messageStock);
        MessageConsumer messageConsumer2 = new MessageConsumer(messageStock);
        //将生産者和消費者加入線程池運作
        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(messageProducter1);
        cachePool.execute(messageProducter2);
        cachePool.execute(messageConsumer1);
        cachePool.execute(messageConsumer2);
    }
}           

繼續閱讀