天天看點

Java多線程精講:6、線程間通信

6.1 等待/通知機制

概念:A線程在運作時需要某個位址中的值,但是該位址還沒有值,是以A等待。當B線程往該位址處寫入了值後,B線程通知A線程,于是A線程繼續執行。上面這樣的一個過程就是等待/通知機制。

實作:

Object類中的wait()方法,可以使執行目前代碼的線程等待,暫停執行。直到接到通知或被中斷為止。注意:wait()方法隻能在同步代碼塊中由鎖對象調用,且調用wait()方法後,目前線程會釋放鎖。

Object類的notify()方法可以喚醒處于等待的線程,該方法也必須在同步代碼塊中由鎖對象調用。如果有多個等待的線程,notify()隻能喚醒其中一個,具體喚醒哪一個是不知道的。被notify()的線程需要重新去競争鎖才能被執行。

沒有使用鎖對象就調用wait()/notify()方法會産生異常:IllegalMonitorStateException。

wait()代碼執行個體:

package wait;

public class Test01 {

    public static void main(String[] args) throws InterruptedException {
        String test = "abc";
        String another = "def";
        System.out.println("同步代碼塊前的代碼");
        synchronized (test) {
            try {
                System.out.println("wait前的代碼");
//                another.wait();  隻有被鎖住的對象才能調用wait()方法
                test.wait();
                System.out.println("wait後的代碼");
            } catch (IllegalMonitorStateException e) {
                e.printStackTrace();
            }
        }
        System.out.println("同步代碼塊後的代碼");
    }
}
           

wait()/notify()代碼示例:

package wait;

/**
 * 需要通過notify喚醒線程
 */
public class Test02 {

    public static void main(String[] args) {
        String str = "wa";

        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (str) {
                    System.out.println("線程1開始等待");
                    try {
                        str.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("線程1被喚醒并執行結束了");
                }
            }
        }, "Thread1");

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (str) {
                    System.out.println("線程2喚醒線程1");
                    str.notify();
                }
            }
        }, "Thread2");

        thread1.start();
        thread2.start();
    }
}
           

執行了notify()的線程并不會立即釋放鎖,而是執行完同步代碼塊的所有代碼後才會釋放鎖

interrupt()方法會中斷wait():

當線程調用wait()處于等待狀态時,調用線程對象的interrupt()方法會中斷線程的等待狀态,産生InterruptedException異常。

package wait;

import java.util.concurrent.TimeUnit;

/**
 * interrupt()會中斷線程的wait狀态
 */
public class Test04 {
    public static void main(String[] args) throws InterruptedException {
        SubThread subThread = new SubThread();
        subThread.start();
        TimeUnit.SECONDS.sleep(1);
        subThread.interrupt();
    }

    private static final Object lock = new Object();

    static class SubThread extends Thread {
        @Override
        public void run() {
            synchronized (lock) {
                System.out.println("subThread wait");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    System.out.println("wait等待被中斷了");
                }
                System.out.println("subThread end wait");
            }
        }
    }
}
           

notify()和notifyAll()的差別:

notify()隻能喚醒一個線程,如果有多個線程處于等待狀态,那麼隻能會有一個會被喚醒。notifyAll()可以喚醒所有等待的線程。

wait(long)的使用:

如果在指定時間内沒有被喚醒,那麼線程會自動喚醒。

package wait;

public class Test06 {
    public static void main(String[] args) {
        SubThread subThread = new SubThread();
        subThread.start();
    }

    static final Object lock = new Object();

    static class SubThread extends Thread{
        @Override
        public void run() {
            synchronized (lock) {
                try {
                    System.out.println("開始等待");
                    lock.wait(5000);
                    System.out.println("等待結束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           

6.2 生産者-消費者模式

在Java中,負責生産資料的是生産者,負責使用資料的是消費者。沒有資料時,消費者等待;資料滿時,生産者等待。

package producerdata;

public class ValueOP {

    private String value = "";

    //定義方法修改value字段的值
    public void setValue() throws InterruptedException {
        //如果value不是空串
        synchronized (this) {
            while(!value.equalsIgnoreCase("")) {
                this.wait();
            }
            //是空串
            value = System.currentTimeMillis() + "";
            System.out.println("setValue設定的值是:" + value);
            this.notify();
        }
    }

    //定義方法讀取字段值
    public String getValue() throws InterruptedException {
        synchronized (this) {
            while(value.equalsIgnoreCase("")) {
                this.wait();
            }
            //不是空串,讀取值
            System.out.println("value的值是:" + value);
            value = "";
            this.notify();
        }
        return value;
    }
}
           

假設隻有一個生産者和消費者:那麼生産者和消費者将按順序執行。

如果有多個生産者和消費者,那麼可能出現假死現象:

  1. 一個消費者喚醒了一個生産者,但是在這個生産者拿到鎖之前,另一個消費者搶先拿到了鎖
  2. 三個生産者全部等待,某個消費者喚醒的不是生産者,而是另一個消費者。

解決上述假死現象的方法是:将notify()改成notifyAll(),保證消費者喚醒了生産者,生産者喚醒了消費者。

操作棧:

package producerstack;

import java.util.List;

import java.util.ArrayList;

public class MyStack {
    private List<Integer> list = new ArrayList<>();
    private static final int MAX_SIZE = 2;

    //定義方法模拟入棧
    public synchronized void push(int value) throws InterruptedException {
        //當棧中的資料已滿,等待
        while(list.size() >= MAX_SIZE) {
            System.out.println(Thread.currentThread().getName() + " begin wait...");
            this.wait();
        }
        list.add(value);
        this.notifyAll();
        System.out.println(Thread.currentThread().getName() + "添加了資料:" + value);
    }

    //定義方法模拟出棧
    public synchronized void pop() throws InterruptedException {
        //當棧中的資料為空,等待
        while(list.size() == 0) {
            System.out.println(Thread.currentThread().getName() + " begin wait...");
            this.wait();
        }
        this.notifyAll();
        System.out.println(Thread.currentThread().getName() + "拿到了資料:" + list.remove(0));
    }
}
           

6.3 通過管道實作線程間通信

Java.io包的PipeStream管道流用于線上程之間傳遞資料,一個線程通過管道輸出資料,另一個線程從管道中輸入資料。相關類包括PipedInputStream、PipedOutputStream、PipedReader和PipedWriter。

package pipeStream;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;

/**
 * 使用PipedInputStream和PipedOutputStream線上程間傳遞位元組流
 */

public class Test {
    public static void main(String[] args) throws IOException {
        //定義管道位元組流
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream();
        //建立管道之間的關系
        in.connect(out);

        //建立兩個線程,分别往管道裡寫資料,和讀資料
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    writeData(out);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    readData(in);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread2").start();
    }

    //向管道流中寫入資料
    public static void writeData(PipedOutputStream out) throws IOException {
        //分别把0~100的資料寫入管道
        try {
            for(int i = 0; i <= 10000; i++) {
                out.write(("" + i).getBytes(StandardCharsets.UTF_8));  //把位元組數組寫入到輸出管道流中
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            out.close();
        }
    }

    //從管道流中讀取資料
    public static void readData(PipedInputStream in) throws IOException {
        int count = 0;
        byte[] bytes = new byte[1024];
        int len = 0;
        try {
            while((len = in.read(bytes)) != -1) {
                System.out.println(new String(bytes, 0, len));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            in.close();
        }
    }

}