天天看點

異步程式設計CompletableFuture實作高并發系統優化之請求合并

異步程式設計CompletableFuture實作高并發系統優化之請求合并

  先說場景:

  根據Redis官網介紹,單機版Redis的讀寫性能是12萬/秒,批量處理可以達到70萬/秒。不管是緩存或者是資料庫,都有批量處理的功能。當我們的系統達到瓶頸的時候,我們考慮充分的壓榨緩存和資料庫的性能,應對更大的并發請求。适用于電商促銷雙十一,等特定高并發的場景,讓系統可以支撐更高的并發。

  思路:

一個使用者請求到背景,我沒有立即去處理,而是把請求堆積到隊列中,堆積10毫秒的時間,由于是高并發場景,就堆積了一定數量的請求。

我定義一個定時任務,把隊列中的請求,按批處理的方式,向後端的Redis緩存,或者資料庫發起批量的請求,拿到批量的結果,再把結果分發給對應的請求使用者。

對于單個使用者而言,他的請求變慢了10毫秒是無感覺的。但是對于我們系統,卻可以提高幾倍的抗并發能力。

這個請求合并,結果分發的功能,就要用到一個類CompletableFuture 實作異步程式設計,不同線程之間的資料互動。

  線程1 如何建立異步任務?

//建立異步任務

CompletableFuture> future = new CompletableFuture<>();

//阻塞等待擷取結果。

Map result = future.get();

  線程2 如何把資料指派給線程1 ?

// 線程2的處理結果

Object result = "結果";

//線程2 的結果,指派 給 線程1

future.complete(result);

  CompletableFuture 是由大牛 Doug Lea 在JDK1.8 提供的類,我們來看看complete()方法的源碼。

複制代碼

/**
 * If not already completed, sets the value returned by {@link
 * #get()} and related methods to the given value.
 *
 * @param value the result value
 * @return {@code true} if this invocation caused this CompletableFuture
 * to transition to a completed state, else {@code false}
 */
public boolean complete(T value) {
    boolean triggered = completeValue(value);
    postComplete();
    return triggered;
}           

  翻譯:

      如果尚未完成,則将傳回的值和相關方法get()設定為給定值。

  也就是說,

    線程1 的get() 方法,拿到的就是線程 2 的complete() 方法給的值。

看到這裡,應該基本明白這個異常程式設計的意思了。它的核心就是線程通信,資料傳輸。直接上代碼:

package www.itbac.com;

import javax.annotation.PostConstruct;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.concurrent.*;

public class CompletableFutureTest {

//并發安全的阻塞隊列,積攢請求。(每隔N毫秒批量處理一次)
LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();

// 定時任務的實作,每隔開N毫秒處理一次資料。
@PostConstruct
public void init() {
    // 定時任務線程池
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {           

// 捕獲異常

try {
                //1.從阻塞隊列中取出queue的請求,生成一次批量查詢。
                int size = queue.size();
                if (size == 0) {
                    return;
                }
                List<Request> requests = new ArrayList<>(size);
                for (int i = 0; i < size; i++) {
                    // 移出隊列,并傳回。
                    Request poll = queue.poll();
                    requests.add(poll);
                }
                //2.組裝一個批量查詢請求參數。
                List<String> movieCodes = new ArrayList<>();
                for (Request request : requests) {
                    movieCodes.add(request.getMovieCode());
                }
                //3. http 請求,或者 dubbo 請求。批量請求,得到結果list。
                System.out.println("本次合并請求數量:"+movieCodes.size());
                List<Map<String, Object>> responses = new ArrayList<>();

                //4.把list轉成map友善快速查找。
                HashMap<String, Map<String, Object>> responseMap = new HashMap<>();
                for (Map<String, Object> respons : responses) {
                    String code = respons.get("code").toString();
                    responseMap.put(code,respons);
                }
                //4.将結果響應給每一個單獨的使用者請求。
                for (Request request : requests) {
                    //根據請求中攜帶的能表示唯一參數,去批量查詢的結果中找響應。
                    Map<String, Object> result = responseMap.get(request.getMovieCode());

                    //将結果傳回到對應的請求線程。2個線程通信,異步程式設計指派。
                    //complete(),源碼注釋翻譯:如果尚未完成,則将由方法和相關方法傳回的值設定為給定值
                    request.getFuture().complete(result);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        }
        // 立即執行任務,并間隔10 毫秒重複執行。
    }, 0, 10, TimeUnit.MILLISECONDS);

}

// 1萬個使用者請求,1萬個并發,查詢電影資訊
public Map<String, Object> queryMovie(String movieCode) throws ExecutionException, InterruptedException {
    //請求合并,減少接口調用次數,提升性能。
    //思路:将不同使用者的同類請求,合并起來。
    //并非立刻發起接口調用,請求 。是先收集起來,再進行批量請求。
    Request request = new Request();
    //請求參數
    request.setMovieCode(movieCode);
    //異步程式設計,建立目前線程的任務,由其他線程異步運算,擷取異步處理的結果。
    CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
    request.setFuture(future);

    //請求參數放入隊列中。定時任務去消化請求。
    queue.add(request);

    //阻塞等待擷取結果。
    Map<String, Object> stringObjectMap = future.get();
    return stringObjectMap;
}
           

}

//請求包裝類
class Request {

//請求參數: 電影id。
private String movieCode;

// 多線程的future接收傳回值。
//每一個請求對象中都有一個future接收請求。
private CompletableFuture<Map<String, Object>> future;
           
public CompletableFuture<Map<String, Object>> getFuture() {
    return future;
}

public void setFuture(CompletableFuture<Map<String, Object>> future) {
    this.future = future;
}

public Request() {
}

public Request(String movieCode) {
    this.movieCode = movieCode;
}

public String getMovieCode() {
    return movieCode;
}

public void setMovieCode(String movieCode) {
    this.movieCode = movieCode;
}           

  這樣就實作了請求合并,批量處理,結果分發響應。讓系統支撐更高的并發量。

當然,因為不是天天雙十一,沒有那麼大的并發量,就添加一個動态的配置,隻有當特定的時間,才進行請求堆積。其他時間還是正常的處理。這部分邏輯就不寫出來了。

原文位址

https://www.cnblogs.com/itbac/p/11298626.html