天天看點

25 Guarded Suspension設計模式和生産者消費者模式

文章目錄

    • 1 引入
    • 2 簡單的實作
      • 2.1 第一版
      • 2.2 擴充:增加逾時時間
      • 2.3 擴充二:增加隊列
    • 2 案例二

1 引入

比如A在幹一件事情,走不開不能中斷,此時快遞員來送快遞,你走不開,隻能等做完這件事,再去拿快遞,讓快遞員先放到快遞櫃。

再比如tomcat中,當Tomcat處理請求到限制,又來請求,處理不了,就會把請求放到一個隊列中等空閑再去處理

要點:

  • 有一個結果需要從一個線程傳遞到另一個線程,他們之間需要關聯同一個GuardedObject
  • 如果有結果不斷的來,可以講結果放到一個隊列中

2 簡單的實作

2.1 第一版

定義一個GuardedObject,核心方法就是兩個,擷取結果,送出結果,在擷取結果的時候,如果沒有擷取到,那就等待;送出結果的時候,為了防止另一個線程已經進入了等待,需要喚醒那些正在等待的線程
package study.wyy.concurrency.guared.demo1;

public class GuardObject<T> {

    private T result;

    /*****
     * 擷取結果
     * @return
     */
    public T get(){
        synchronized (this){
            while (null == result){
                // 沒有結果時候,就等待
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    }

    /*****
     * 送出結果
     * @return
     */
    public void submit(T result){
        synchronized (this){
            this.result = result;
            // 喚醒等待結果的線程,來擷取結果
            this.notifyAll();
        }
    }
}

           

測試

package study.wyy.concurrency.guared.demo1;

import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@Slf4j
public class Test {

    public static void main(String[] args) {
        GuardObject<String> guardObject = new GuardObject<>();
        Random random = new Random();
        // 一個線程在從db進行資料的導出,
        // 另一個線程負責将導出的資料進行寫入檔案,
        new Thread(()->{
            System.out.println("寫入檔案前置處理");
            // 模拟前置處理的耗時
            sleepSecond(random.nextInt(5));
            // 擷取另一個線程從資料庫中讀取到的資料
            String res = guardObject.get();
            System.out.println("開始寫入檔案。。。");
            // 模拟寫入資料的耗時
            sleepSecond(random.nextInt(5));
            System.out.println("寫入檔案結束。。。檔案内容: " + res);
        },"t1").start();
        new Thread(()->{
            // 讀取資料庫資料
            System.out.println("讀取資料庫資料開始");
            // 模拟讀取資料的耗時
            sleepSecond(random.nextInt(5));
            String res = "hello word";
            System.out.println("讀取資料庫資料結束");
            // 送出資料
            guardObject.submit(res);
        },"t2").start();


    }

    private static void sleepSecond(int i)  {
        try {
            TimeUnit.SECONDS.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

輸出結果

寫入檔案前置處理
讀取資料庫資料開始
讀取資料庫資料結束
開始寫入檔案。。。
寫入檔案結束。。。檔案内容: +hello word
           

2.2 擴充:增加逾時時間

比如在get等待擷取另一個線程的結果的時候,不能一直等待,有一個逾時時間。

public T get(){
    synchronized (this){
        while (null == result){
            // 不能在這一直等,需要一個逾時時間
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return result;
    }
}
           

wait本身就有一個重載方法,支援逾時時間的,但是不能這麼簡單的就把逾時時間通過wait來簡單的控制:

public T get(long seconds){
        synchronized (this){
            while (null == result){
                // 沒有結果時候,就等待
                try {
                    this.wait(seconds);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    }
           

測試:

package study.wyy.concurrency.guared.demo1;

import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Slf4j
public class Test2 {
    public static void main(String[] args) {
        GuardObject<String> guardObject = new GuardObject<>();
        new Thread(()->{
            try {
                log.info("開始擷取結果");
                String res = guardObject.get(3);
                log.info("拿到結果: => " + res);
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        },"t1").start();
        new Thread(()->{
            log.info("開始準備結果");
            sleepSecond(5);
            guardObject.submit("哈哈哈哈哈");
        },"t2").start();
    }
    private static void sleepSecond(int i)  {
        try {
            TimeUnit.SECONDS.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

           

輸出:

20:58:03.374 [t1] INFO study.wyy.concurrency.guared.demo1.Test2 - 開始擷取結果
20:58:03.377 [t2] INFO study.wyy.concurrency.guared.demo1.Test2 - 開始準備結果
20:58:08.382 [t1] INFO study.wyy.concurrency.guared.demo1.Test2 - 拿到結果: => 哈哈哈哈哈
           
  • 可以發現:
    • 3s的時候開始擷取結果
    • 拿到結果的時候卻是8s,已經逾時了
  • 分析原因:
    • wait在等待3s後,寫入檔案的線程會繼續get,發現還是null,又繼續wait了,和之前的沒有實質的差別
      25 Guarded Suspension設計模式和生産者消費者模式
      是以這裡需要我們自己計算等待時間:
public T get(long timeoutSeconds) throws TimeoutException {
        synchronized (this) {
            // 開始時間
            long beginTime = System.currentTimeMillis();
            // 經曆的時間
            long passedTime = 0;
            while (null == result) {
            	// 我設定的機關是秒,乘以1000是為了機關轉換
                if (passedTime >= timeoutSeconds * 1000 ) {
                    // 經曆的時間已經大于我們的逾時時間,就跳出循環
                    throw new TimeoutException();
                }
                try {
                    // 沒有結果時候,就等待
                    // 注意的是這裡的wait時間,逾時時間減去已經等待的時間,防止虛假喚醒
                    // 比如,逾時時間是3s
                    // 經曆了1s,還沒逾時,被别人喚醒,但是get結果還是null,
                    // 這個時候,我們隻能隻需等待2s(逾時時間-經曆的時間)
                    this.wait(timeoutSeconds * 1000-passedTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 計算經曆的時間
                passedTime = System.currentTimeMillis() - beginTime;
            }
            return result;
        }
    }
           

簡化一下代碼

public T get(long timeoutSeconds) throws TimeoutException {
      synchronized (this) {
          // 開始時間
          long beginTime = System.currentTimeMillis();
          // 經曆的時間
          long passedTime = 0;
          long waitTime = timeoutSeconds * 1000 - passedTime;
          while (null == result) {
          		// passedTime大等于timeoutSeconds就是逾時
          		// 根據waitTime = timeoutSeconds * 1000 - passedTime,也就是說
          		// waitTime <=0 也就是逾時了
              if (waitTime <= 0) {
                  // 經曆的時間已經大于我們的逾時時間,就跳出循環
                  throw new TimeoutException();
              }
              try {
                  // 沒有結果時候,就等待
                  // 這裡等到
                  this.wait(waitTime);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              // 計算經曆的時間
              passedTime = System.currentTimeMillis() - beginTime;
          }
          return result;
      }
  }
           

測試:

package study.wyy.concurrency.guared.demo1;

import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
public class Test2 {
    public static void main(String[] args) {
        GuardObject<String> guardObject = new GuardObject<>();
        new Thread(()->{
            try {
                log.info("開始擷取結果");
                String res = guardObject.get(3);
                log.info("拿到結果: => " + res);
            } catch (TimeoutException e) {
                log.error("time out,",e);
            }
        },"t1").start();
        new Thread(()->{
            log.info("開始準備結果");
            sleepSecond(5);
            guardObject.submit("哈哈哈哈哈");
        },"t2").start();
    }
    private static void sleepSecond(int i)  {
        try {
            TimeUnit.SECONDS.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

輸出:

21:22:33.522 [t1] INFO study.wyy.concurrency.guared.demo1.Test2 - 開始擷取結果
21:22:33.522 [t2] INFO study.wyy.concurrency.guared.demo1.Test2 - 開始準備結果
21:22:36.538 [t1] ERROR study.wyy.concurrency.guared.demo1.Test2 - time out,
java.util.concurrent.TimeoutException: null
	at study.wyy.concurrency.guared.demo1.GuardObject.get(GuardObject.java:43)
	at study.wyy.concurrency.guared.demo1.Test2.lambda$main$0(Test2.java:23)
	at java.lang.Thread.run(Thread.java:748)
           

2.3 擴充二:增加隊列

如果産生了過多結果,但是沒有足夠的消費者,就會丢失資料,就可以增加一個緩沖隊列。

package study.wyy.concurrency.guared.demo1;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

/**
 * @author by wyaoyao
 * @Description
 * @Date 2021/1/20 8:22 下午
 */
@Slf4j
public class Test3 {
    public static void main(String[] args) {
        GuardObject<String> guardObject = new GuardObject<>();
        Random random = new Random();

        new Thread(()->{
            while (true){
                log.info("開始擷取結果");
                String res = guardObject.get();
                 sleepSecond(2);
                log.info("拿到結果: => " + res);
            }
        },"t1").start();

        // 10個線程來生産資料
        IntStream.range(1,10).forEach(i->
            new Thread(()->{
                guardObject.submit("第"+ i + "個資料");
            },Integer.toString(i)).start()
        );
    }

    private static void sleepSecond(int i)  {
        try {
            TimeUnit.SECONDS.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

           
21:23:07.775 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
21:23:09.785 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 拿到結果: => 第1個資料
21:23:09.785 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
21:23:11.789 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 拿到結果: => 第9個資料
21:23:11.789 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
21:23:13.794 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 拿到結果: => 第9個資料
21:23:13.795 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
21:23:15.799 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 拿到結果: => 第9個資料
21:23:15.799 [t1] INFO study.wyy.concurrency.guared.demo1.Test3 - 開始擷取結果
           

顯然丢失資料了。

  • 解決:加一個緩沖隊列,當隊列資料滿了不能放資料。

2 案例二

  • tomcat中,當Tomcat處理請求到限制,又來請求,處理不了,就會把請求放到一個隊列中等空閑再去處理
  • 定義一個請求實體:
package study.wyy.concurrency.guared;


/**
 *  @author: wyaoyao
 *  @Date: 2020/9/10 9:47 下午
 *  @Description: 請求
 */
public class Request {
    // 簡單模拟請求參數中的屬性
    private String value;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public Request(String value) {
        this.value = value;
    }
}
           
  • 定義隊列
package study.wyy.concurrency.guared;


import java.util.LinkedList;

/**
 *  @author: wyaoyao
 *  @Date: 2020/9/10 9:48 下午
 *  @Description: 請求隊列
 */
public class RequestQueue {

    LinkedList<Request> queue = new LinkedList<>();

    /**
     *  @author: wyaoyao
     *  @Date: 2020/9/10 9:41 下午
     *  @Description: 服務端端通過該方法向隊列中get請求
     */
    public Request getRequest(){
        synchronized (queue){
            while (queue.size()<=0){
                // 請求隊列中沒有請求,那就等着
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    // 當被喚醒的時候,需要break出去,進行工作,比如用戶端請求來臨的時候
                    return null;
                }
            }
            // 請求隊列中有請求資料,那就傳回出去
            return queue.removeFirst();
        }
    }

    /**
     *  @author: wyaoyao
     *  @Date: 2020/9/10 9:41 下午
     *  @Description: 用戶端通過該方法向隊列中送出請求
     *
     */
    public void putRequest(Request request){
        synchronized (queue){
           queue.addLast(request);
           // 喚醒其他線程 起來工作,在get的時候,可能沒有請求資料,給wait了,一旦放入請求資料,自然就要喚醒
            queue.notifyAll();
        }
    }

}

           
  • client
package study.wyy.concurrency.guared;


import java.util.Random;

/**
 *  @author: wyaoyao
 *  @Date: 2020/9/10 9:47 下午
 *  @Description: 模拟用戶端
 */
public class ClientThread extends Thread{
    private final RequestQueue queue;

    private final Random random;

    public ClientThread(RequestQueue queue) {
        this.queue = queue;
        this.random = new Random(System.currentTimeMillis());
    }

    @Override
    public void run() {
        for(int i =0;i<10;i++){
            // 模拟put 請求 10次
            Request login_request = new Request("login request");
            System.out.println("client -> request " + login_request.getValue());
            queue.putRequest(login_request);
            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

           
  • server
package study.wyy.concurrency.guared;

import java.util.Objects;
import java.util.Random;

public class ServerThread extends Thread{

    private final RequestQueue queue;

    private final Random random;

    /****
     * 标記:是否關掉
     */
    private volatile Boolean closed = false;

    public ServerThread(RequestQueue queue) {
        this.queue = queue;
        this.random = new Random(System.currentTimeMillis());
    }

    @Override
    public void run() {
        while (!closed){
            Request request = queue.getRequest();
            if(Objects.isNull(request)){
                // 忽略掉這個為null,這裡不能break,那就跳出循環,服務就停了
                continue;
            }
            System.out.println("server -> request " + request.getValue());
            try {
                // 模拟處理請求的耗時
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                //e.printStackTrace();
                // 這裡收到中斷信号(調用了closed),就return,關閉服務
                return;
            }
        }
    }


    /**
    * @Description 關閉服務
    * @Author  wyaoyao
    * @Date   2020/9/10 9:57 下午
    * @Param
    * @Return
    * @Exception
    */
    public void closed(){
        closed = true;
        // 這裡要打斷,因為在get請求的時候可能因為隊列中沒有請求而wait,是無法判斷到這個标記的
        this.interrupt();
        System.out.println("server closed");
    }
}

           
  • 測試
package study.wyy.concurrency.guared;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        final RequestQueue requestQueue = new RequestQueue();
        ServerThread serverThread = new ServerThread(requestQueue);
        serverThread.start();
        new ClientThread(requestQueue).start();
        Thread.sleep(10000);
        serverThread.closed();
    }
}

           
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
client -> request login request
server -> request login request
server -> request login request
client -> request login request
server -> request login request
client -> request login request
server -> request login request
server closed