天天看點

CompletableFuture 組合式異步程式設計本節内容:Future接口使用 CompletableFuture 建構異步應用使用工廠方法 supplyAsync 建立 CompletableFuture 對象.讓你的代碼免受阻塞之苦尋找更好的方案小結

本節内容:

  • 建立異步計算并擷取計算結果.
  • 使用非阻塞操作提升吞吐量.
  • 設計和實作異步API.
  • 如何以異步的方式使用同步的API.
  • 如何對兩個或多個異步操作進行流水線和合并操作.
  • 如何處理異步操作的完成狀态.

現在,很少有網站或者網絡應用會以完全隔離的方式工作。更多的時候,我們看到 的下一代網絡應用都采用“混聚”(mash-up)的方式:它會使用來自多個來源的内容,将這些内 容聚合在一起,友善使用者的生活。

可是,你并不希望因為等待某 些服務的響應,阻塞應用程式的運作,浪費數十億寶貴的CPU時鐘周期。

那麼其實你真正想做的是避免因為等待遠端服務的傳回,或者對資料庫的查詢等,而阻塞 目前線程的執行,浪費寶貴的CPU資源,因為這種等待的時間很坑相當長.

通過本文你将會了解到java5中的Future接口和jav8中對它更加完善的CompletableFuture接口,

Future接口

要使用Future,通常你隻需要把耗時操作封裝在一個Callable對象中,再把它交給 ExecutorService 就行了,下面這段代碼展示了Java 8之前使用Future 的一個例子。

ExecutorService executor = Executors.newCachedThreadPool();
    Future<Double> future = executor.submit(new Callable<Double>() {
        public Double call() {
            return doSomeLongComputation();
            }});
        doSomethingElse();
        try {
        Double result = future.get(1, TimeUnit.SECONDS);
        } catch (ExecutionException ee) {
        // 計算抛出一個異常
        } catch (InterruptedException ie) {
        // 目前線程在等待過程中被中斷
        } catch (TimeoutException te) {
        // 在Future對象完成之前超過已過期
}
           

通過第一個例子,我們知道 Future 接口提供了方法來檢測異步計算是否已經結束(使用 isDone 方法),等待異步操作結束,以及擷取計算的結果。但是這些特性還不足以讓你編寫簡潔 的并發代碼。比如,我們很難表述 Future 結果之間的依賴性;從文字描述上這很簡單,“當長時 間計算任務完成時,請将該計算的結果通知到另一個長時間運作的計算任務,這兩個計算任務都 完成後,将計算的結果與另一個查詢操作結果合并”。但是,使用 Future 中提供的方法完成這樣 的操作又是另外一回事。這也是我們需要更具描述能力的特性的原因.

CompletableFuture 組合式異步程式設計本節内容:Future接口使用 CompletableFuture 建構異步應用使用工廠方法 supplyAsync 建立 CompletableFuture 對象.讓你的代碼免受阻塞之苦尋找更好的方案小結

接下來,你會了解新的 CompletableFuture 類(它實作了 Future 接口)如何利用Java 8 的新特性以更直覺的方式将上述需求都變為可能。 Stream 和 CompletableFuture 的設計都遵循 了類似的模式:它們都使用了Lambda表達式以及流水線的思想。從這個角度,你可以說 CompletableFuture 和 Future 的關系就跟 Stream 和 Collection 的關系一樣。

使用 CompletableFuture 建構異步應用

為了展示 CompletableFuture 的強大特性,我們會建立一個名為“最佳價格查詢器” (best-price-finder)的應用,它會查詢多個線上商店,依據給定的産品或服務找出最低的價格。

實作異步 API

為了實作最佳價格查詢器應用,讓我們從每個商店都應該提供的API定義入手。首先,商店 應該聲明依據指定産品名稱傳回價格的方法:

package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
 * 每個商店都提供的對外通路的API
 * @author itguang
 * @create 2017-11-22 11:05
 **/
public class Shop {

    /**
     * 商店名稱
     */
    private  String name;


    public Shop(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    /**
     * (阻塞式)通過名稱查詢價格
     * @param product
     * @return
     */
    public double getPrice(String product) {
           return calculatePrice(product);
    }

   


    /**
     * 計算價格
     * @param product
     * @return
     */
    private double calculatePrice(String product){
        delay();
        //數字*字元=數字
        return 10*product.charAt(0);

    }


    /**
     * 模拟耗時操作,阻塞1秒
     */
    private void delay(){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
           

很明顯,這個API的使用者(這個例子中為最佳價格查詢器)調用該方法時,它會被 阻塞。為等待同步事件完成而等待1秒鐘,這是無法接受的,尤其是考慮到最佳價格查詢器對 網絡中的所有商店都要重複這種操作。

本章接下來的小節中,你會了解如何以異步方式使用同步API解決這個問題。

将同步方法轉化為異步方法

在上面的Shop服務類中添加異步計算價格的方法:

/**
     * (非阻塞式)異步擷取價格
     * @param product
     * @return
     */
    public Future<Double> getPriceAsync(String product){
        CompletableFuture<Double> future = new CompletableFuture<>();
        new Thread(()->{
            double price = calculatePrice(product);
            //需要長時間計算的任務結束并傳回結果時,設定Future傳回值
            future.complete(price);
        }).start();

        //無需等待還沒結束的計算,直接傳回Future對象
        return future;
    }
           

上面的代碼我們可以看出,我們模拟了一個計算價格的耗時操作 calculatePrice(String product),接收一個商品名稱作為參數, 還有一個異步的API getPriceAsync(String product),改方法建立了一個代表異步計算的 CompletableFuture 對象執行個體,它在計算完成後會包含計算結果. 當請求的産品價格最終計算得出時,你可以使用 complete 方法,結束CompletableFuture 對象的運作,并設定變量的值.

接下來我們開始調用上面的API服務實作我們的商品比價器:

package future;
 
 import completablefuture.Shop;
 import org.junit.Test;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 /**
  * @author itguang
  * @create 2017-11-22 10:54
  **/
 public class FutureDemo {
 
 
     /**
      * 測試同步API
      */
     @Test
     public void testgetPrice(){
         Shop shop = new Shop("Best Shop");
         long start = System.nanoTime();
         double price = shop.getPrice("mac book pro");
         System.out.printf(shop.getName()+" Price is %.2f%n",price);
         long invocationTime = (System.nanoTime()-start)/1_000_000;
         System.out.println("同步方法調用花費時間:--- "+invocationTime+" --- msecs");
 
 
         //...其他操作
         doSomethingElse();
 
 
         long retrievalTime = (System.nanoTime()-start)/1_000_000;
         System.out.println("同步方法傳回價格所需時間: --- "+retrievalTime+" ---msecs");
 
     }
 
 
     /**
      * 測試異步API
      */
     @Test
     public void testAsync(){
         Shop shop = new Shop("Best Shop");
         long start = System.nanoTime();
         Future<Double> futurePrice = shop.getPriceAsync("mac book pro");
         long invocationTime = (System.nanoTime()-start)/1_000_000;
         System.out.println("異步方法調用花費時間: --- "+invocationTime+" --- msecs");
 
 
         //...其他操作
         doSomethingElse();
 
         //從future對象中讀取價格,如果價格未知,則發生阻塞.
         try {
             Double price = futurePrice.get();
             System.out.printf(shop.getName()+" Price is %.2f%n",price);
         } catch (InterruptedException e) {
             e.printStackTrace();
         } catch (ExecutionException e) {
             e.printStackTrace();
         }
 
         long retrievalTime = (System.nanoTime()-start)/1_000_000;
         System.out.println("異步方法傳回價格所需時間: --- "+retrievalTime+" ---msecs");
 
     }
 
 
 
 
     /**
      * 其它操作
      */
     public static void doSomethingElse(){
          try {
                     Thread.sleep(1_000);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
 
     }
 
 
 }
           

測試結果:

同步方法調用花費時間:--- 1000 --- msecs
同步方法傳回價格所需時間: --- 2000 ---msecs

----------------

異步方法調用花費時間: --- 29 --- msecs
異步方法傳回價格所需時間: --- 1035 ---msecs
           

由此可見:同步方法使用阻塞時程式設計擷取商品價格,異步方法采用非阻塞式程式設計擷取商品價格,在調用異步方法 getPriceAsync後,立即傳回一個Future對象, 通過該對象在将來的某個時刻取得商品的價格,這種情況下,客戶在進行商品價格查詢的同時,還能執行一些其它的操作.而同步方法調用,必須等到同步方法完成後才能進行其它操作

錯誤處理

現在我們考慮這樣一種情況,假設計算價格的方法出現了錯誤,會怎樣呢?

  • 對于同步線程來說會直接抛出一個異常,并終止目前線程的運作.
  • 對于異步線程來說,就非常糟糕了,異常會被限制在計算價格的那個異步線程中,當異步線程抛出 未經捕獲的異常 時, 該線程就會被殺死,而這會導緻等待get()方法傳回結果的用戶端永久性被阻塞.

    接下來我們測試一下:修改Shop類的 calculatePrice方法如下,

/**
     * 計算價格(模拟一個産生價格的方法)
     * @param product
     * @return
     */
    private double calculatePrice(String product){
        delay();

            int i=  1/0;//故意抛出 java.lang.ArithmeticException: / by zero 異常

        //數字*字元=數字(産生價格的方法)
        return 10*product.charAt(0);

    }

           

再次測試會發現,用戶端會收到一個異常資訊:

java.lang.ArithmeticException: / by zero
        at completablefuture.Shop.calculatePrice(Shop.java:70)
        at completablefuture.Shop.lambda$getPriceAsync$0(Shop.java:49)
        at java.lang.Thread.run(Thread.java:748)
           

使用工廠方法 supplyAsync 建立 CompletableFuture 對象.

CompletableFuture 類自 身提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個流程,還不用擔心實作的細節。

你可以用一行語句重寫方法 getPriceAsync

/**
     * 使用靜态工廠supplyAsync(非阻塞式)異步擷取價格
     * @param product
     * @return
     */
    public Future<Double> getPriceAsync(String product){
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> calculatePrice(product));

        //無需等待還沒結束的計算,直接傳回Future對象
        return future;
    }
           

supplyAsync 方法接受一個生産者( Supplier )作為參數,傳回一個 CompletableFuture 對象,該對象完成異步執行後會讀取調用生産者方法的傳回值。生産者方法會交由 ForkJoinPool 池中的某個執行線程( Executor )運作,但是你也可以使用 supplyAsync 方法的重載版本,傳 遞第二個參數指定不同的執行線程執行生産者方法。一般而言,向 CompletableFuture 的工廠 方法傳遞可選參數,指定生産者方法的執行線程是可行的,在後面我們會使用這一方法.

讓你的代碼免受阻塞之苦

接下來,為了介紹如何拜托代碼阻塞帶來的煩惱,我們假設Shop API的代碼實作你是不能更改的.其實這樣也合理,我們肯定不能控制服務提供方修改他們的代碼.

現在假設Shop APi提供的方法都是同步阻塞式的,這也是當你試圖使用 HTTP API 提供的服務最長發生的事情.

下面你會學到如何以異步的方式查詢多個商店,避免被單一的請求所阻塞,并由此提升你的“最佳價格查詢器”的性能和吞吐量

現在我們重修改Shop類

package completablefuture;

import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
 * 每個商店都提供的對外通路的API
 * @author itguang
 * @create 2017-11-22 11:05
 **/
public class Shop {

    /**
     * 商店名稱
     */
    private  String name;
    private Random random = new Random();


    public Shop(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    /**
     * (阻塞式)通過名稱查詢價格
     * @param product
     * @return
     */
    public double getPrice(String product) {
           return calculatePrice(product);
    }

    /**
     * 計算價格(模拟一個産生價格的方法)
     * @param product
     * @return
     */
    private double calculatePrice(String product){
        delay();
        //數字*字元=數字(産生價格的方法)
        return random.nextDouble()*product.charAt(0)*product.charAt(1);
    }


    /**
     * 模拟耗時操作,阻塞1秒
     */
    private void delay(){
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
用戶端代碼:

/**
 * 讓你的代碼免受阻塞之苦
 *
 * @author itguang
 * @create 2017-11-21 16:50
 **/
public class ClientTest {

    List<Shop> shops;

    @Before
    public void before() {
        shops = Arrays.asList(new Shop("淘寶"),
                new Shop("天貓"),
                new Shop("京東"),
                new Shop("亞馬遜"));

    }


    

    /**
     * 
     * @param product 商品名稱
     * @return 根據名字傳回每個商店的商品價格
     */
    public List<String> findPrice(String product) {
       

        return list;
    }




}
           

我們有一個商店清單 shops,和一個 findPrice 方法.

采用順序查詢所有商店的方式實作的 findPrices 方法

首先,我們最先想到的應該是采用順序查詢所有商店的方式實作的 findPrices 方法.

/**
 * 讓你的代碼免受阻塞之苦
 *
 * @author itguang
 * @create 2017-11-21 16:50
 **/
public class ClientTest {

    List<Shop> shops;

    @Before
    public void before() {
        shops = Arrays.asList(new Shop("淘寶"),
                new Shop("天貓"),
                new Shop("京東"),
                new Shop("亞馬遜"));

    }

   /**
     * 采用順序查詢所有商店的方式實作的 findPrices 方法
     * @param product
     * @return
     */
    public List<String> findPrice(String product) {
        List<String> list = shops.stream()
                .map(shop ->
                        String.format("%s price is %.2f RMB",
                                shop.getName(),
                                shop.getPrice(product)))

                .collect(toList());

        return list;
    }

    /**
     *采用順序查詢所有商店的方式實作的 findPrices 方法,查詢每個商店裡的 iphone666s
     */
    @Test
    public void test() {
        long start = System.nanoTime();
        
        List<String> list = findPrice("iphone666s");
        
        System.out.println(list);
        System.out.println("Done in "+(System.nanoTime()-start)/1_000_000+" ms");
    }

    
}
           

測試結果:

[淘寶 price is 7013.15 RMB, 天貓 price is 6248.79 RMB, 京東 price is 4902.47 RMB, 亞馬遜 price is 2700.63 RMB]
Done in 4043 ms
           

正如我們所料,由于對每個商店的查詢都是阻塞式的,并且每個查詢都花費大約1秒,4個商店的查詢是順序的,一個查詢操作會阻塞另一個.

使用并行流對請求進行并行操作

為了對以上代碼進行改進,我們很容易想到使用并行流進行并行操作.可能也是最快的改善方法是使用并行流來避免順序計算, 如下所示:

/**
     * 使用并行流對請求進行并行操作
     * @param product
     * @return
     */
    public List<String> findPrice2(String product) {
        List<String> list = shops.parallelStream()
                .map(shop ->
                        String.format("%s price is %.2f RMB",
                                shop.getName(),
                                shop.getPrice(product)))

                .collect(toList());

        return list;
    }
           

測試結果:

[淘寶 price is 7296.22 RMB, 天貓 price is 3504.13 RMB, 京東 price is 8173.09 RMB, 亞馬遜 price is 4015.44 RMB]
    Done in 1037 ms
           

效果立竿見影,看起來改為并行流确實是個簡單有效的的操作:現在對四個不同商店的查詢實行了并行操作,是以完成所有操作的總和隻有1秒多一點.

使用 CompletableFuture 發起異步請求

讓我們嘗試使用剛學過的CompletableFuture ,将 findPrices 方法中對不同商店的同步調用替換為異步調用

/**
     * 使用 CompletableFuture 發起異步請求
     * @param product
     * @return
     */
    public List<String> findPrice3(String product) {
        List<CompletableFuture<String>> futures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> String.format("%s price is %.2f RMB",
                                shop.getName(),
                                shop.getPrice(product)))
                )
                .collect(toList());
        List<String> list = futures.stream()
                .map(CompletableFuture::join)
                .collect(toList());


        return list;
    }
           

在此,我們對上面的代碼解釋一下 CompletableFuture.join() 操作, 其實我們自需要知道CompletableFuture 類中的 join 方法和 Future 接口中的 get 有相同的含義,并且也聲明在Future 接口中, 它們唯一的不同是 join 不會抛出任何檢測到的異常。使用它你不再需要使用try / catch 語句塊讓你傳遞給第二個 map 方法的Lambda表達式變得過于臃腫

還有一點需要注意:這裡使用了兩個不同的 Stream 流水線,而不是在同一個處理流的流水線上一個接一個地放置兩個 map 操作——這其實是有緣由的.

如果你在單一流水線中處理流,發向不同商家的請求隻能以同步,順序的方式才能執行成功. 這樣一來,你的操作就相當于,每個ComletableFuture對象隻能在前一個執行查詢的操作後才能執行自己的查詢操作,即通知join方法傳回結果.下圖清晰的表明了單一流水線和多流水線的執行過程:

圖的上班部分展示了單一流水線的處理過程,我們看到執行的流程是順序的(以虛線辨別),即新的CompletableFuture對象隻能在前一個操作完成之後才能建立. 與此相反,圖的下半部分展示了如何先将CompletableFuture聚集到一個清單中,即以橢圓部分表示,然後轉成另一個流,讓對象們在其他對象完成操作之前就能啟動.

接下來讓我們看看使用 CompletableFuture 發起異步請求的測試結果:

[淘寶 price is 4190.89 RMB, 天貓 price is 10398.09 RMB, 京東 price is 8776.79 RMB, 亞馬遜 price is 2902.69 RMB]
Done in 2044 ms
           

震驚!!!,有麼有? 我們驚訝的發現,這次執行的結果比原生順序執行快兩倍,比并行流執行慢兩倍,再一想到我們隻用了最少最簡單的改動把順序流改為并行流,把時間提高了4倍,就更加讓人沮喪.

但是,這就是全部的真相嗎? CompletableFuture 就這麼不堪一擊嗎?還是我們可能漏掉了什麼重要的東西,繼續往下探究之前,讓我們休息幾分鐘,尤其是想想你測試 代碼的機器是否足以以并行方式運作四個線程。

尋找更好的方案

對于上面出現的問題,我們分析一下:并行版本工作的很好,是因為它能并行的執行四個任務,是以它幾乎能為每個商家配置設定一個線程.

但是,如果你想要增加第五個商家到商店清單中,讓你的“最佳價格查詢”應用 對其進行處理,這時會發生什麼情況?

讓我們看看增加一個商店後的測試結果:

順序

 [淘寶 price is 6596.20 RMB, 天貓 price is 1528.81 RMB, 京東 price is 2649.17 RMB, 亞馬遜 price is 7780.31 RMB, 實體店 price is 2027.03 RMB]
  Done in 5034 ms

并行

[淘寶 price is 9537.24 RMB, 天貓 price is 11233.77 RMB, 京東 price is 6580.67 RMB, 亞馬遜 price is 2622.23 RMB, 實體店 price is 1528.11 RMB]
Done in 2038 ms

CompletableFuture

[淘寶 price is 8466.65 RMB, 天貓 price is 6883.22 RMB, 京東 price is 4990.85 RMB, 亞馬遜 price is 2826.72 RMB, 實體店 price is 8278.86 RMB]
Done in 2043 ms
           

順序執行版本的執行還是需要大約5秒多鐘的時間,

并行流版本的程式這次比之前也多消耗了差不多1秒鐘的時間,因為可以并行運作(通用線程池中處于可用狀态的)的四個線程現在都處于繁忙狀态,都在對前4個商店進行查 詢。第五個查詢隻能等到前面某一個操作完成釋放出空閑線程才能繼續,

CompletableFuture 版本的程式似乎比之前隻慢那麼一點兒,但是最後這個版本也不太令人滿意。比如,如果你試圖讓你的代碼處理9個商店,并行流版本耗時3143毫秒, 而 CompletableFuture 版本耗時3009毫秒。它們看起來不相伯仲,究其原因都一樣:

後兩個版本他們内部采用的都是同樣的線程池,預設都使用固定數目的線程,具體縣城數量取決于 Runtime.getRuntime().availableProcessors() 的傳回值

然而,CompletableFuture 具有一定的優勢,因為他可以讓你對 Execotor (執行器)進行配置,尤其是線程池的大小,讓它以更适合應用需求的方式配置.這是并行流API無法提供的.

讓我們看看你怎樣利用這種配置上的靈活性帶來實際應用程式性能上的提升。

使用定制的 Executor 配置 CompletableFuture

就這個主題而言,明智的選擇似乎是建立一個配有線程池的 Execotor,但是你應該如何選擇合适的線程數目呢?

調整線程池的大小

《Java并發程式設計實戰》(http://mng.bz/979c)一書中,Brian Goetz和合著者們為線程池大小

的優化提供了不少中肯的建議。這非常重要,如果線程池中線程的數量過多,最終他們會競争稀缺的處理器和記憶體資源,浪費大量的時間再上下文切換上.

反之,如果線程數過少,正如我們上面的應用所面臨的問題,處理器的一些核可能就無法充分利用. Brian Goetz建議,線程池大

小與處理器的使用率之比可以使用下面的公式進行估算:

我們的應用99% 的時間都在等待商店的響應,是以估算出 W/C 的比率為99/1,約為100,這意味這你期望CPU的使用率為 100%,如果你的處理器為4核,拿你要建立一個擁有400個線程的線程池, 實際操作中,如果你建立的線程數比商店的數目更多,反而是一種浪費,因為這樣作之後,你線程池中的很大一部分線程根本沒有機會被使用.

出于這種考慮,我們建議你将執行器使用的線程數,與你需要查詢的商店數目設 定為同一個值,這樣每個商店都應該對應一個服務線程。不過,為了避免發生由于商店的數目過 多導緻伺服器超負荷而崩潰,你還是需要設定一個上限,比如100個線程。

/**
     * 使用定制的 Executor 配置 CompletableFuture
     *
     * @param product
     * @return
     */
    public List<String> findPrice4(String product) {

        //為“最優價格查詢器”應用定制的執行器 Execotor
        Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        //使用守護線程,使用這種方式不會組織程式的關停
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );

      //将執行器Execotor 作為第二個參數傳遞給 supplyAsync 工廠方法
        List<CompletableFuture<String>> futures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> String.format("%s price is %.2f RMB",
                                shop.getName(),
                                shop.getPrice(product)), executor)
                )
                .collect(toList());
        List<String> list = futures.stream()
                .map(CompletableFuture::join)
                .collect(toList());


        return list;
    }
           

注意:你現在建立的是一個由守護線程建構的線程池.java程式是沒有辦法終止或者退出一個正在運作的線程的,是以最後剩下的那個線程會由于一直等待無法發生的事件而引發問題 與此相反,如果将線程标記為守護線程,意味着程式退出時,它也會被回收.這二者之間沒有性能之間的差異.

改進之後,我們再來測試一下5個商品的清單:

測試結果:

[淘寶 price is 3228.06 RMB, 天貓 price is 5183.54 RMB, 京東 price is 7699.38 RMB, 亞馬遜 price is 4071.39 RMB, 實體店 price is 5394.59 RMB]
Done in 1046 ms
           

使用 CompletableFuture 方案的程式處理5個商店僅耗時1021秒,處理9個商店 時耗時1022秒。一般而言,這種狀态會一直持續,直到商店的數目達到我們之前計算的門檻值400。

這個例子證明了,要建立更适合你的應用特性的執行器.利用CompletableFuture像其送出任務執行是個不錯的注意.處理需要大量并行的操作時,這幾乎是最有效的政策

思考:并行—使用并行流還是CompletableFuture ???

目前為止,你已經知道了,對于集合進行并行計算有兩種方法:

要麼将其轉化為并行流,利用map這樣的操作開展工作.

要麼枚舉出集合中的每一個元素,建立新的線程,在CompletableFuture内對其進行操作.

後者提供了更多的靈活性,你可以調整線程池的大小,而這能幫助你確定整體的計算不會因為線程都在等待 I/O 操作而發生阻塞.

我們對使用這些API的建議如下:

如果你進行的是計算密集型操作,并且沒有I/O,那麼推薦使用 Stream接口,因為實作簡單,同時效率也可能是最高的, 即:如果所有的線程都是計算密集型操作,那麼就沒有必要建立比處理器核數還多的線程數

反之,如果你并行的計算單元還涉及等待I/O的操作(包括網絡連接配接等待),那麼使用CompletableFuture靈活性更好. 你可以想前文讨論的那樣,根據等待計算,或者 W/C 的比率設定需要使用的線程數. 這種情況不适用并行流的另一個原因是:處理流流水線中如果發生I/O等待,流的延遲特性會讓我們很難判斷,到底什麼時候觸發了等待

現在你已經了解了如何利用 CompletableFuture 為你的使用者提供異步API,以及如何将一 個同步又緩慢的服務轉換為異步的服務。不過到目前為止,我們每個 Future 中進行的都是單次 的操作。下一節中,你會看到如何将多個異步操作結合在一起,以流水線的方式運作,從描述形 式上,它與你在前面學習的Stream API有幾分類似。

對多個異步任務進行流水線操作.

讓我們假設所有的商店都同意使用一個集中式的折扣服務。該折扣服務提供了五個不同的折 扣代碼,每個折扣代碼對應不同的折扣率。你使用一個枚舉型變量 Discount.Code 來實作這一 想法,具體代碼如下所示。

/**
 * @author itguang
 * @create 2017-11-22 17:26
 **/
public class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }

   
}
           

我們還假設所有的商店都同意修改 getPrice 方法的傳回格式。 getPrice 現在以 Shop- Name:price:DiscountCode 的格式傳回一個 String 類型的值

/**
     * (阻塞式)通過名稱查詢價格
     * @param product
     * @return 傳回  Shop-Name:price:DiscountCode 的格式字元串
     */
    public String getPrice(String product) {

     double price = calculatePrice(product);
     //随機得到一個折扣碼
        Discount.Code code = Discount.Code.values()[
                random.nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s",name,price,code);
    }
           

調用 getPrice 方法可能會傳回像下面這樣一個 String 值:

天貓:7360.13:PLATINUM
           

我們已經将對商店傳回字元串的解析操作封裝到了下面的 Quote 類之中:

/**
 * @author itguang
 * @create 2017-11-22 17:26
 **/
public class Quote {

    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}
           

通過傳遞 shop 對象傳回的字元串給靜态工廠方法 parse ,你可以得到 Quote 類的一個執行個體, 它包含了 shop 的名稱、折扣之前的價格,以及折扣代碼。

Discount 服務還提供了一個 applyDiscount 方法,它接收一個 Quote 對象,傳回一個字元 串,表示生成該 Quote 的 shop 中的折扣價格

/**
 * @author itguang
 * @create 2017-11-22 17:26
 **/
public class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }


    /**
     * 根據一個Quote傳回一個折扣資訊
     * @param quote
     * @return
     */
    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }

    /**
     * 根據價格和折扣計算折扣後的價格
     * @param price
     * @param code
     * @return
     */
    private static double apply(double price, Code code) {
        Util.delay(1000);//模拟Discount服務的響應延遲
        return Util.format(price * (100 - code.percentage) / 100);
    }

}
           

現在準備工作已經完成,我們開始使用 Discount服務

首先我麼以最簡單的方式實作findPrice方法

/**
     * 得到折扣商店資訊(已經被解析過)
     */
    public List<String> findPrice1(String product){
        List<String> list = discountShops.stream()
                .map(discountShop -> discountShop.getPrice(product))
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(toList());

        return list;
    }
           

測試結果:

[淘寶 price is 2190.59, 天貓 price is 6820.3, 京東 price is 9653.26, 亞馬遜 price is 8620.79, 實體店 price is 3973.24]
Done in 10055 ms
           

毫無意外,這次執行耗時10秒,因為順序查詢5個商店耗時大約5秒,現在又加上了 Discount 服務為5個商店傳回的價格申請折扣所消耗的5秒鐘.

你已經知道把流轉換為并行方式,非常容易提升程式的性能.不過,我們也知道當商店的數目多餘預設線程池中線程的數目時,效果并不好.因為 Stream 底層依賴的是線程數量固定的通用線程池

相反,我們也知道,對于非計算密集型操作(大部分時間都在等待,CPU并沒有執行運算),我們使用自定義 CompletableFuture 的執行器 Executor,能更充分的利用CPU資源.

讓我們再次使用 CompletableFuture 提供的特性,以異步方式重新實作 findPrices 方法。 詳細代碼如下所示。如果你發現有些内容不太熟悉,不用太擔心,我們很快會進行針對性的介紹。

/**
     * 使用 CompletableFuture 實作 findPrices 方法
     */
    public List<String> findPrice2(String product) {
        //為“最優價格查詢器”應用定制的執行器 Execotor
        Executor executor = Executors.newFixedThreadPool(Math.min(discountShops.size(), 100),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        //使用守護線程,使用這種方式不會阻止程式的關停
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );

        List<CompletableFuture<String>> futureList = discountShops.stream()
                .map(discountShop -> CompletableFuture.supplyAsync(
                        //異步方式取得商店中産品價格
                        () -> discountShop.getPrice(product), executor))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(
                        quote -> CompletableFuture.supplyAsync(
                                //使用另一個異步任務通路折扣服務
                                () -> Discount.applyDiscount(quote), executor
                        )
                ))
                .collect(toList());

        //等待流中所有future執行完畢,并提取各自的傳回值.
        List<String> list = futureList.stream()
                //join想但與future中的get方法,隻是不會抛出異常
                .map(CompletableFuture::join)
                .collect(toList());

        return list;
    }
           

這一次,事情看起來變得更加複雜了,是以讓我們一步一步地了解到底發生了什麼。這三次 轉換的流程如圖11-5所示

這三次map操作都是用了CompletableFuture 類提供的特性,在需要的地方把它們變成了異步操作.

  1. 擷取價格 這三個操作的第一個你已經見過很多次,隻需要将lambda表達式傳遞給supplyAsync 工廠方法就可以以一部方式對shop進行查詢, 第一個轉換的結果是一個Stream 類型的清單, 一旦轉換結束,每個CompletableFuture對象中都包含對應shop傳回的字元串.
  2. 解析報價 現在你需要進行第二步轉換操作,将字元串轉換為 Quote對象,由于解析操作不涉及遠端服務等耗時操作,他幾乎可以在第一時間進行,是以可以采用同步操作.不會帶來太多延遲. 是以,你可以對上面生成的CompletableFuture對象調用thenApply而不是supplyAsync,将以個把字元串轉換為Quote對象的方法傳遞給他. 注意:直到上面的 CompletableFuture 執行結束, thenApply 方法才會開始執行,将 Stream 中的每個 CompletableFuture 對象轉換為對應的CompletableFuture 對象
  3. 擷取折扣服務

第三個map操作又涉及遠端操作,為從商店得到的原始價格申請折扣率,是以,你也希望它能夠異步執行. 為了實作這一操作,你像第一個調用傳遞 getPrice 給 supplyAsync 那樣,将applyDiscount(quote) 方法以lambda表達式的方法傳遞給了supplyAsync工廠方法. 該方法最終會傳回一個CompletableFuture對象的清單,到目前為止,你已經進行了兩次異步操作,用了兩個不同的CompletableFuture對象進行模組化,你希望把它們以級聯的方式串聯起來進行工作.

java8 CompletableFuture API提供了名為 thenCompose 的方法,thenCompose操作允許你對兩個異步操作執行流水線.即第一個異步操作完成後,把結果作為參數傳遞給第二個異步操作

将這這三次map操作的結果收集到一個清單,你就得到了List ,等這些CompletableFuture對象最終執行完畢,你就可以利用join取得他們的傳回值.

測試結果

[淘寶 price is 5604.87, 天貓 price is 3289.73, 京東 price is 6447.24, 亞馬遜 price is 10259.54, 實體店 price is 11947.6]
Done in 2070 ms
           

thenCompose()方法像大多數CompletableFuture的方法一樣,也提供了一個 thenComposeAsync 方法. 通常而言,名字中不帶Async的方法,和它的前一個任務一樣,在同一線程中運作;而名字以Async結尾的方法會将後續的任務送出到一個線程池,是以每個任務是由不同任務處理的.

将兩個 CompletableFuture 對象整合起來,無論它們是否存在依賴

上節代碼中, 你對一個 CompletableFuture 對象調用了 thenCompose 方法,并向其傳遞了第二個 CompletableFuture ,而第二個 CompletableFuture 又需要使用第一個 CompletableFuture 的執行結果作為輸入。但是,另一種比較常見的情況是,你需要将兩個完 全不相幹的 CompletableFuture 對象的結果整合起來,而且你也不希望等到第一個任務完全結 束才開始第二項任務。

這種情況,你應當使用 thenCombine 方法,它接收名為 BiFunction 的第二參數,這個參數 定義了當兩個 CompletableFuture 對象完成計算後,結果如何合并。同 thenCompose 方法一樣, thenCombine 方法也提供有一個 Async 的版本。這裡,如果使用 thenCombineAsync 會導緻 BiFunction 中定義的合并操作被送出到線程池中,由另一個任務以異步的方式執行

Future<Double> futurePriceInUSD =
        CompletableFuture.supplyAsync(() -> shop.getPrice(product))
        //接受兩個參數,一個是 CompletableFuture,第二個參數定義了兩個 CompletableFuture 對象完成計算後,結果如何合并。
        .thenCombine(
            CompletableFuture.supplyAsync(
            () -> exchangeService.getRate(Money.EUR, Money.USD)),
            (price, rate) -> price * rate
);
           

小結

  • 在執行邊角耗時的操作時,尤其是那些依賴遠端服務的操作,使用異步任務,可以極大的改善程式的性能.
  • 在設計API時,你應該盡可能的為客戶提供異步的API.使用CompletableFuture 類提供的特性,你能輕松實作這一目标.
  • CompletableFuture 還提供了異常管理機制,讓你有機會抛出并管理異步任務中所發生的異常.
  • 将同步的API封裝到CompletableFuture 中,你能夠以異步的方式使用其結果.
  • 如果異步任務之間互相獨立,或者說一個異步任務時另一些異步任務的輸入,你可以将這些異步任務構造或者合并成一個.
  • 你可以為 CompletableFuture 注冊一個回調函數,在Future執行完畢或者結果可用時,針對性的執行一些操作.
  • 你可以決定什麼時候結束程式的運作,是等待由CompletableFuture 構成的清單中所有對象都執行完畢,還是其中一個首先完成就終止程式的運作.

引用《java實戰》