6.1 等待/通知機制
概念:A線程在運作時需要某個位址中的值,但是該位址還沒有值,是以A等待。當B線程往該位址處寫入了值後,B線程通知A線程,于是A線程繼續執行。上面這樣的一個過程就是等待/通知機制。
實作:
Object類中的wait()方法,可以使執行目前代碼的線程等待,暫停執行。直到接到通知或被中斷為止。注意:wait()方法隻能在同步代碼塊中由鎖對象調用,且調用wait()方法後,目前線程會釋放鎖。
Object類的notify()方法可以喚醒處于等待的線程,該方法也必須在同步代碼塊中由鎖對象調用。如果有多個等待的線程,notify()隻能喚醒其中一個,具體喚醒哪一個是不知道的。被notify()的線程需要重新去競争鎖才能被執行。
沒有使用鎖對象就調用wait()/notify()方法會産生異常:IllegalMonitorStateException。
wait()代碼執行個體:
package wait;
public class Test01 {
public static void main(String[] args) throws InterruptedException {
String test = "abc";
String another = "def";
System.out.println("同步代碼塊前的代碼");
synchronized (test) {
try {
System.out.println("wait前的代碼");
// another.wait(); 隻有被鎖住的對象才能調用wait()方法
test.wait();
System.out.println("wait後的代碼");
} catch (IllegalMonitorStateException e) {
e.printStackTrace();
}
}
System.out.println("同步代碼塊後的代碼");
}
}
wait()/notify()代碼示例:
package wait;
/**
* 需要通過notify喚醒線程
*/
public class Test02 {
public static void main(String[] args) {
String str = "wa";
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (str) {
System.out.println("線程1開始等待");
try {
str.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程1被喚醒并執行結束了");
}
}
}, "Thread1");
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (str) {
System.out.println("線程2喚醒線程1");
str.notify();
}
}
}, "Thread2");
thread1.start();
thread2.start();
}
}
執行了notify()的線程并不會立即釋放鎖,而是執行完同步代碼塊的所有代碼後才會釋放鎖
interrupt()方法會中斷wait():
當線程調用wait()處于等待狀态時,調用線程對象的interrupt()方法會中斷線程的等待狀态,産生InterruptedException異常。
package wait;
import java.util.concurrent.TimeUnit;
/**
* interrupt()會中斷線程的wait狀态
*/
public class Test04 {
public static void main(String[] args) throws InterruptedException {
SubThread subThread = new SubThread();
subThread.start();
TimeUnit.SECONDS.sleep(1);
subThread.interrupt();
}
private static final Object lock = new Object();
static class SubThread extends Thread {
@Override
public void run() {
synchronized (lock) {
System.out.println("subThread wait");
try {
lock.wait();
} catch (InterruptedException e) {
System.out.println("wait等待被中斷了");
}
System.out.println("subThread end wait");
}
}
}
}
notify()和notifyAll()的差別:
notify()隻能喚醒一個線程,如果有多個線程處于等待狀态,那麼隻能會有一個會被喚醒。notifyAll()可以喚醒所有等待的線程。
wait(long)的使用:
如果在指定時間内沒有被喚醒,那麼線程會自動喚醒。
package wait;
public class Test06 {
public static void main(String[] args) {
SubThread subThread = new SubThread();
subThread.start();
}
static final Object lock = new Object();
static class SubThread extends Thread{
@Override
public void run() {
synchronized (lock) {
try {
System.out.println("開始等待");
lock.wait(5000);
System.out.println("等待結束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
6.2 生産者-消費者模式
在Java中,負責生産資料的是生産者,負責使用資料的是消費者。沒有資料時,消費者等待;資料滿時,生産者等待。
package producerdata;
public class ValueOP {
private String value = "";
//定義方法修改value字段的值
public void setValue() throws InterruptedException {
//如果value不是空串
synchronized (this) {
while(!value.equalsIgnoreCase("")) {
this.wait();
}
//是空串
value = System.currentTimeMillis() + "";
System.out.println("setValue設定的值是:" + value);
this.notify();
}
}
//定義方法讀取字段值
public String getValue() throws InterruptedException {
synchronized (this) {
while(value.equalsIgnoreCase("")) {
this.wait();
}
//不是空串,讀取值
System.out.println("value的值是:" + value);
value = "";
this.notify();
}
return value;
}
}
假設隻有一個生産者和消費者:那麼生産者和消費者将按順序執行。
如果有多個生産者和消費者,那麼可能出現假死現象:
- 一個消費者喚醒了一個生産者,但是在這個生産者拿到鎖之前,另一個消費者搶先拿到了鎖
- 三個生産者全部等待,某個消費者喚醒的不是生産者,而是另一個消費者。
解決上述假死現象的方法是:将notify()改成notifyAll(),保證消費者喚醒了生産者,生産者喚醒了消費者。
操作棧:
package producerstack;
import java.util.List;
import java.util.ArrayList;
public class MyStack {
private List<Integer> list = new ArrayList<>();
private static final int MAX_SIZE = 2;
//定義方法模拟入棧
public synchronized void push(int value) throws InterruptedException {
//當棧中的資料已滿,等待
while(list.size() >= MAX_SIZE) {
System.out.println(Thread.currentThread().getName() + " begin wait...");
this.wait();
}
list.add(value);
this.notifyAll();
System.out.println(Thread.currentThread().getName() + "添加了資料:" + value);
}
//定義方法模拟出棧
public synchronized void pop() throws InterruptedException {
//當棧中的資料為空,等待
while(list.size() == 0) {
System.out.println(Thread.currentThread().getName() + " begin wait...");
this.wait();
}
this.notifyAll();
System.out.println(Thread.currentThread().getName() + "拿到了資料:" + list.remove(0));
}
}
6.3 通過管道實作線程間通信
Java.io包的PipeStream管道流用于線上程之間傳遞資料,一個線程通過管道輸出資料,另一個線程從管道中輸入資料。相關類包括PipedInputStream、PipedOutputStream、PipedReader和PipedWriter。
package pipeStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
/**
* 使用PipedInputStream和PipedOutputStream線上程間傳遞位元組流
*/
public class Test {
public static void main(String[] args) throws IOException {
//定義管道位元組流
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream();
//建立管道之間的關系
in.connect(out);
//建立兩個線程,分别往管道裡寫資料,和讀資料
new Thread(new Runnable() {
@Override
public void run() {
try {
writeData(out);
} catch (IOException e) {
e.printStackTrace();
}
}
}, "Thread1").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
readData(in);
} catch (IOException e) {
e.printStackTrace();
}
}
}, "Thread2").start();
}
//向管道流中寫入資料
public static void writeData(PipedOutputStream out) throws IOException {
//分别把0~100的資料寫入管道
try {
for(int i = 0; i <= 10000; i++) {
out.write(("" + i).getBytes(StandardCharsets.UTF_8)); //把位元組數組寫入到輸出管道流中
}
} catch (IOException e) {
e.printStackTrace();
} finally {
out.close();
}
}
//從管道流中讀取資料
public static void readData(PipedInputStream in) throws IOException {
int count = 0;
byte[] bytes = new byte[1024];
int len = 0;
try {
while((len = in.read(bytes)) != -1) {
System.out.println(new String(bytes, 0, len));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
in.close();
}
}
}