文章目錄
-
- 1 引入
- 2 簡單的實作
-
- 2.1 第一版
- 2.2 擴充:增加逾時時間
- 2.3 擴充二:增加隊列
- 2 案例二
1 引入
比如A在幹一件事情,走不開不能中斷,此時快遞員來送快遞,你走不開,隻能等做完這件事,再去拿快遞,讓快遞員先放到快遞櫃。
再比如tomcat中,當Tomcat處理請求到限制,又來請求,處理不了,就會把請求放到一個隊列中等空閑再去處理
要點:
- 有一個結果需要從一個線程傳遞到另一個線程,他們之間需要關聯同一個GuardedObject
- 如果有結果不斷的來,可以講結果放到一個隊列中
2 簡單的實作
2.1 第一版
定義一個GuardedObject,核心方法就是兩個,擷取結果,送出結果,在擷取結果的時候,如果沒有擷取到,那就等待;送出結果的時候,為了防止另一個線程已經進入了等待,需要喚醒那些正在等待的線程
package study.wyy.concurrency.guared.demo1;
public class GuardObject<T> {
private T result;
/*****
* 擷取結果
* @return
*/
public T get(){
synchronized (this){
while (null == result){
// 沒有結果時候,就等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return result;
}
}
/*****
* 送出結果
* @return
*/
public void submit(T result){
synchronized (this){
this.result = result;
// 喚醒等待結果的線程,來擷取結果
this.notifyAll();
}
}
}
測試
package study.wyy.concurrency.guared.demo1;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Test {
public static void main(String[] args) {
GuardObject<String> guardObject = new GuardObject<>();
Random random = new Random();
// 一個線程在從db進行資料的導出,
// 另一個線程負責将導出的資料進行寫入檔案,
new Thread(()->{
System.out.println("寫入檔案前置處理");
// 模拟前置處理的耗時
sleepSecond(random.nextInt(5));
// 擷取另一個線程從資料庫中讀取到的資料
String res = guardObject.get();
System.out.println("開始寫入檔案。。。");
// 模拟寫入資料的耗時
sleepSecond(random.nextInt(5));
System.out.println("寫入檔案結束。。。檔案内容: " + res);
},"t1").start();
new Thread(()->{
// 讀取資料庫資料
System.out.println("讀取資料庫資料開始");
// 模拟讀取資料的耗時
sleepSecond(random.nextInt(5));
String res = "hello word";
System.out.println("讀取資料庫資料結束");
// 送出資料
guardObject.submit(res);
},"t2").start();
}
private static void sleepSecond(int i) {
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出結果
寫入檔案前置處理
讀取資料庫資料開始
讀取資料庫資料結束
開始寫入檔案。。。
寫入檔案結束。。。檔案内容: +hello word
2.2 擴充:增加逾時時間
比如在get等待擷取另一個線程的結果的時候,不能一直等待,有一個逾時時間。
public T get(){
synchronized (this){
while (null == result){
// 不能在這一直等,需要一個逾時時間
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return result;
}
}
wait本身就有一個重載方法,支援逾時時間的,但是不能這麼簡單的就把逾時時間通過wait來簡單的控制:
public T get(long seconds){
synchronized (this){
while (null == result){
// 沒有結果時候,就等待
try {
this.wait(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return result;
}
}
測試:
package study.wyy.concurrency.guared.demo1;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
public class Test2 {
public static void main(String[] args) {
GuardObject<String> guardObject = new GuardObject<>();
new Thread(()->{
try {
log.info("開始擷取結果");
String res = guardObject.get(3);
log.info("拿到結果: => " + res);
} catch (TimeoutException e) {
e.printStackTrace();
}
},"t1").start();
new Thread(()->{
log.info("開始準備結果");
sleepSecond(5);
guardObject.submit("哈哈哈哈哈");
},"t2").start();
}
private static void sleepSecond(int i) {
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出:
20:58:03.374 [t1] INFO study.wyy.concurrency.guared.demo1.Test2 - 開始擷取結果
20:58:03.377 [t2] INFO study.wyy.concurrency.guared.demo1.Test2 - 開始準備結果
20:58:08.382 [t1] INFO study.wyy.concurrency.guared.demo1.Test2 - 拿到結果: => 哈哈哈哈哈
- 可以發現:
- 3s的時候開始擷取結果
- 拿到結果的時候卻是8s,已經逾時了
- 分析原因:
- wait在等待3s後,寫入檔案的線程會繼續get,發現還是null,又繼續wait了,和之前的沒有實質的差別 是以這裡需要我們自己計算等待時間:
25 Guarded Suspension設計模式和生産者消費者模式
- wait在等待3s後,寫入檔案的線程會繼續get,發現還是null,又繼續wait了,和之前的沒有實質的差別
public T get(long timeoutSeconds) throws TimeoutException {
synchronized (this) {
// 開始時間
long beginTime = System.currentTimeMillis();
// 經曆的時間
long passedTime = 0;
while (null == result) {
// 我設定的機關是秒,乘以1000是為了機關轉換
if (passedTime >= timeoutSeconds * 1000 ) {
// 經曆的時間已經大于我們的逾時時間,就跳出循環
throw new TimeoutException();
}
try {
// 沒有結果時候,就等待
// 注意的是這裡的wait時間,逾時時間減去已經等待的時間,防止虛假喚醒
// 比如,逾時時間是3s
// 經曆了1s,還沒逾時,被别人喚醒,但是get結果還是null,
// 這個時候,我們隻能隻需等待2s(逾時時間-經曆的時間)
this.wait(timeoutSeconds * 1000-passedTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 計算經曆的時間
passedTime = System.currentTimeMillis() - beginTime;
}
return result;
}
}
簡化一下代碼
public T get(long timeoutSeconds) throws TimeoutException {
synchronized (this) {
// 開始時間
long beginTime = System.currentTimeMillis();
// 經曆的時間
long passedTime = 0;
long waitTime = timeoutSeconds * 1000 - passedTime;
while (null == result) {
// passedTime大等于timeoutSeconds就是逾時
// 根據waitTime = timeoutSeconds * 1000 - passedTime,也就是說
// waitTime <=0 也就是逾時了
if (waitTime <= 0) {
// 經曆的時間已經大于我們的逾時時間,就跳出循環
throw new TimeoutException();
}
try {
// 沒有結果時候,就等待
// 這裡等到
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 計算經曆的時間
passedTime = System.currentTimeMillis() - beginTime;
}
return result;
}
}
測試:
package study.wyy.concurrency.guared.demo1;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
public class Test2 {
public static void main(String[] args) {
GuardObject<String> guardObject = new GuardObject<>();
new Thread(()->{
try {
log.info("開始擷取結果");
String res = guardObject.get(3);
log.info("拿到結果: => " + res);
} catch (TimeoutException e) {
log.error("time out,",e);
}
},"t1").start();
new Thread(()->{
log.info("開始準備結果");
sleepSecond(5);
guardObject.submit("哈哈哈哈哈");
},"t2").start();
}
private static void sleepSecond(int i) {
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出:
21:22:33.522 [t1] INFO study.wyy.concurrency.guared.demo1.Test2 - 開始擷取結果
21:22:33.522 [t2] INFO study.wyy.concurrency.guared.demo1.Test2 - 開始準備結果
21:22:36.538 [t1] ERROR study.wyy.concurrency.guared.demo1.Test2 - time out,
java.util.concurrent.TimeoutException: null
at study.wyy.concurrency.guared.demo1.GuardObject.get(GuardObject.java:43)
at study.wyy.concurrency.guared.demo1.Test2.lambda$main$0(Test2.java:23)
at java.lang.Thread.run(Thread.java:748)
2.3 擴充二:增加隊列
如果産生了過多結果,但是沒有足夠的消費者,就會丢失資料,就可以增加一個緩沖隊列。
package study.wyy.concurrency.guared.demo1;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
/**
* @author by wyaoyao
* @Description
* @Date 2021/1/20 8:22 下午
*/
@Slf4j
public class Test3 {
public static void main(String[] args) {
GuardObject<String> guardObject = new GuardObject<>();
Random random = new Random();
new Thread(()->{
while (true){
log.info("開始擷取結果");
String res = guardObject.get();
sleepSecond(2);
log.info("拿到結果: => " + res);
}
},"t1").start();
// 10個線程來生産資料
IntStream.range(1,10).forEach(i->
new Thread(()->{
guardObject.submit("第"+ i + "個資料");
},Integer.toString(i)).start()
);
}
private static void sleepSecond(int i) {
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
21:23:07.775 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
21:23:09.785 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 拿到結果: => 第1個資料
21:23:09.785 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
21:23:11.789 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 拿到結果: => 第9個資料
21:23:11.789 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
21:23:13.794 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 拿到結果: => 第9個資料
21:23:13.795 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
21:23:15.799 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 拿到結果: => 第9個資料
21:23:15.799 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
顯然丢失資料了。
- 解決:加一個緩沖隊列,當隊列資料滿了不能放資料。
2 案例二
- tomcat中,當Tomcat處理請求到限制,又來請求,處理不了,就會把請求放到一個隊列中等空閑再去處理
- 定義一個請求實體:
package study.wyy.concurrency.guared;
/**
* @author: wyaoyao
* @Date: 2020/9/10 9:47 下午
* @Description: 請求
*/
public class Request {
// 簡單模拟請求參數中的屬性
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public Request(String value) {
this.value = value;
}
}
- 定義隊列
package study.wyy.concurrency.guared;
import java.util.LinkedList;
/**
* @author: wyaoyao
* @Date: 2020/9/10 9:48 下午
* @Description: 請求隊列
*/
public class RequestQueue {
LinkedList<Request> queue = new LinkedList<>();
/**
* @author: wyaoyao
* @Date: 2020/9/10 9:41 下午
* @Description: 服務端端通過該方法向隊列中get請求
*/
public Request getRequest(){
synchronized (queue){
while (queue.size()<=0){
// 請求隊列中沒有請求,那就等着
try {
queue.wait();
} catch (InterruptedException e) {
// 當被喚醒的時候,需要break出去,進行工作,比如用戶端請求來臨的時候
return null;
}
}
// 請求隊列中有請求資料,那就傳回出去
return queue.removeFirst();
}
}
/**
* @author: wyaoyao
* @Date: 2020/9/10 9:41 下午
* @Description: 用戶端通過該方法向隊列中送出請求
*
*/
public void putRequest(Request request){
synchronized (queue){
queue.addLast(request);
// 喚醒其他線程 起來工作,在get的時候,可能沒有請求資料,給wait了,一旦放入請求資料,自然就要喚醒
queue.notifyAll();
}
}
}
- client
package study.wyy.concurrency.guared;
import java.util.Random;
/**
* @author: wyaoyao
* @Date: 2020/9/10 9:47 下午
* @Description: 模拟用戶端
*/
public class ClientThread extends Thread{
private final RequestQueue queue;
private final Random random;
public ClientThread(RequestQueue queue) {
this.queue = queue;
this.random = new Random(System.currentTimeMillis());
}
@Override
public void run() {
for(int i =0;i<10;i++){
// 模拟put 請求 10次
Request login_request = new Request("login request");
System.out.println("client -> request " + login_request.getValue());
queue.putRequest(login_request);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- server
package study.wyy.concurrency.guared;
import java.util.Objects;
import java.util.Random;
public class ServerThread extends Thread{
private final RequestQueue queue;
private final Random random;
/****
* 标記:是否關掉
*/
private volatile Boolean closed = false;
public ServerThread(RequestQueue queue) {
this.queue = queue;
this.random = new Random(System.currentTimeMillis());
}
@Override
public void run() {
while (!closed){
Request request = queue.getRequest();
if(Objects.isNull(request)){
// 忽略掉這個為null,這裡不能break,那就跳出循環,服務就停了
continue;
}
System.out.println("server -> request " + request.getValue());
try {
// 模拟處理請求的耗時
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
//e.printStackTrace();
// 這裡收到中斷信号(調用了closed),就return,關閉服務
return;
}
}
}
/**
* @Description 關閉服務
* @Author wyaoyao
* @Date 2020/9/10 9:57 下午
* @Param
* @Return
* @Exception
*/
public void closed(){
closed = true;
// 這裡要打斷,因為在get請求的時候可能因為隊列中沒有請求而wait,是無法判斷到這個标記的
this.interrupt();
System.out.println("server closed");
}
}
- 測試
package study.wyy.concurrency.guared;
public class Test {
public static void main(String[] args) throws InterruptedException {
final RequestQueue requestQueue = new RequestQueue();
ServerThread serverThread = new ServerThread(requestQueue);
serverThread.start();
new ClientThread(requestQueue).start();
Thread.sleep(10000);
serverThread.closed();
}
}
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
client -> request login request
server -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
server closed