天天看點

《Java8實戰》-第十一章筆記(CompletableFuture:組合式異步程式設計)CompletableFuture:組合式異步程式設計

CompletableFuture:組合式異步程式設計

最近這些年,兩種趨勢不斷地推動我們反思我們設計軟體的方式。第一種趨勢和應用運作的硬體平台相關,第二種趨勢與應用程式的架構相關,尤其是它們之間如何互動。我們在第7章中已經讨論過硬體平台的影響。我們注意到随着多核處理器的出現,提升應用程式處理速度最有效的方式是編寫能充分發揮多核能力的軟體。你已經看到通過切分大型的任務,讓每個子任務并行運作,這一目标是能夠實作的;你也已經了解相對直接使用線程的方式,使用分支/合并架構(在Java 7中引入)和并行流(在Java 8中新引入)能以更簡單、更有效的方式實作這一目标。

第二種趨勢反映在公共API日益增長的網際網路服務應用。著名的網際網路大鳄們紛紛提供了自己的公共API服務,比如谷歌提供了地理資訊服務,Facebook提供了社交資訊服務,Twitter提供了新聞服務。現在,很少有網站或者網絡應用會以完全隔離的方式工作。更多的時候,我們看到的下一代網絡應用都采用“混聚”(mash-up)的方式:它會使用來自多個來源的内容,将這些内容聚合在一起,友善使用者的生活。

比如,你可能希望為你的法國客戶提供指定主題的熱點報道。為實作這一功能,你需要向谷歌或者Twitter的API請求所有語言中針對該主題最熱門的評論,可能還需要依據你的内部算法對它們的相關性進行排序。之後,你可能還需要使用谷歌的翻譯服務把它們翻譯成法語,甚至利用谷歌地圖服務定位出評論作者的位置資訊,最終将所有這些資訊聚集起來,呈現在你的網站上。

當然,如果某些外部網絡服務發生響應慢的情況,你希望依舊能為使用者提供部分資訊,比如提供帶問号标記的通用地圖,以文本的方式顯示資訊,而不是呆呆地顯示一片空白螢幕,直到地圖伺服器傳回結果或者逾時退出。

要實作類似的服務,你需要與網際網路上的多個Web服務通信。可是,你并不希望因為等待某些服務的響應,阻塞應用程式的運作,浪費數十億寶貴的CPU時鐘周期。比如,不要因為等待Facebook的資料,暫停對來自Twitter的資料處理。

這些場景展現了多任務程式設計的另一面。第7章中介紹的分支/合并架構以及并行流是實作并行處理的寶貴工具;它們将一個操作切分為多個子操作,在多個不同的核、CPU甚至是機器上并行地執行這些子操作。

與此相反,如果你的意圖是實作并發,而非并行,或者你的主要目标是在同一個CPU上執行幾個松耦合的任務,充分利用CPU的核,讓其足夠忙碌,進而最大化程式的吞吐量,那麼你其實真正想做的是避免因為等待遠端服務的傳回,或者對資料庫的查詢,而阻塞線程的執行,浪費寶貴的計算資源,因為這種等待的時間很可能相當長。通過本章中你會了解,Future接口,尤其是它的新版實作CompletableFuture,是處理這種情況的利器。

Future 接口

Future接口在Java 5中被引入,設計初衷是對将來某個時刻會發生的結果進行模組化。它模組化了一種異步計算,傳回一個執行運算結果的引用,當運算結束後,這個引用被傳回給調用方。在Future中觸發那些潛在耗時的操作把調用線程解放出來,讓它能繼續執行其他有價值的工作,不再需要呆呆等待耗時的操作完成。打個比方,你可以把它想象成這樣的場景:你拿了一袋子衣服到你中意的幹洗店去洗。幹洗店的員工會給你張發票,告訴你什麼時候你的衣服會洗好(這就是一個Future事件)。衣服幹洗的同時,你可以去做其他的事情。Future的另一個優點是它比更底層的Thread更易用。要使用Future,通常你隻需要将耗時的操作封裝在一個Callable對象中,再将它送出給ExecutorService,就萬事大吉了。下面這段代碼展示了Java 8之前使用Future的一個例子。

ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
    public Double call() {
        return doSomeLongComputation();
    }
});
// 異步操作進行的同時,你可以做其他的事情
doSomethingElse();

try {
    Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
    // 計算抛出一個異常
} catch (InterruptedException ie) {
    // 目前線程在等待過程中被中斷
} catch (TimeoutException te) {
    // 在Future對象完成之前超過已過期
}           

這種程式設計方式讓你的線程可以在ExecutorService以并發方式調用另一個線程執行耗時操作的同時,去執行一些其他的任務。接着,如果你已經運作到沒有異步操作的結果就無法繼續任何有意義的工作時,可以調用它的get方法去擷取操作的結果。如果操作已經完成,該方法會立刻傳回操作的結果,否則它會阻塞你的線程,直到操作完成,傳回相應的結果。

你能想象這種場景存在怎樣的問題嗎?如果該長時間運作的操作永遠不傳回了會怎樣?為了處理這種可能性,雖然Future提供了一個無需任何參數的get方法,我們還是推薦大家使用重載版本的get方法,它接受一個逾時的參數,通過它,你可以定義你的線程等待Future結果的最長時間,而不是樣永無止境地等待下去。

Future 接口的局限性

通過第一個例子,我們知道Future接口提供了方法來檢測異步計算是否已經結束(使用isDone方法),等待異步操作結束,以及擷取計算的結果。但是這些特性還不足以讓你編寫簡潔的并發代碼。比如,我們很難表述Future結果之間的依賴性;從文字描述上這很簡單,“當長時間計算任務完成時,請将該計算的結果通知到另一個長時間運作的計算任務,這兩個計算任務都完成後,将計算的結果與另一個查詢操作結果合并”。但是,使用Future中提供的方法完成這樣的操作又是另外一回事。這也是我們需要更具描述能力的特性的原因,比如下面這些。

  • 将兩個異步計算合并為一個——這兩個異步計算之間互相獨立,同時第二個又依賴于第一個的結果。
  • 等待Future集合中的所有任務都完成。
  • 僅等待Future集合中最快結束的任務完成(有可能因為它們試圖通過不同的方式計算同一個值),并傳回它的結果。
  • 通過程式設計方式完成一個Future任務的執行(即以手工設定異步操作結果的方式)。
  • 應對Future的完成事件(即當Future的完成事件發生時會收到通知,并能使用Future計算的結果進行下一步的操作,不隻是簡單地阻塞等待操作的結果)。

這一章中,你會了解新的CompletableFuture類(它實作了Future接口)如何利用Java 8的新特性以更直覺的方式将上述需求都變為可能。Stream和CompletableFuture的設計都遵循了類似的模式:它們都使用了Lambda表達式以及流水線的思想。從這個角度,你可以說CompletableFuture和Future的關系就跟Stream和Collection的關系一樣。

使用CompletableFuture 建構異步應用

為了展示CompletableFuture的強大特性,我們會建立一個名為“最佳價格查詢器”(best-price-finder)的應用,它會查詢多個線上商店,依據給定的産品或服務找出最低的價格。這個過程中,你會學到幾個重要的技能。

  • 首先,你會學到如何為你的客戶提供異步API(如果你擁有一間線上商店的話,這是非常有幫助的)。
  • 其次,你會掌握如何讓你使用了同步API的代碼變為非阻塞代碼。你會了解如何使用流水線将兩個接續的異步操作合并為一個異步計算操作。這種情況肯定會出現,比如,線上商店傳回了你想要購買商品的原始價格,并附帶着一個折扣代碼——最終,要計算出該商品的實際價格,你不得不通路第二個遠端折扣服務,查詢該折扣代碼對應的折扣比率。
  • 你還會學到如何以響應式的方式處理異步操作的完成事件,以及随着各個商店傳回它的商品價格,最佳價格查詢器如何持續地更新每種商品的最佳推薦,而不是等待所有的商店都傳回他們各自的價格(這種方式存在着一定的風險,一旦某家商店的服務中斷,使用者可能遭遇白屏)。

實作異步API

為了實作最佳價格查詢器應用,讓我們從每個商店都應該提供的API定義入手。首先,商店應該聲明依據指定産品名稱傳回價格的方法:

public double getPrice(String product) {
    // 待實作
}           

該方法的内部實作會查詢商店的資料庫,但也有可能執行一些其他耗時的任務,比如聯系其他外部服務(比如,商店的供應商,或者跟制造商相關的推廣折扣)。我們在本章剩下的内容中,采用delay方法模拟這些長期運作的方法的執行,它會人為地引入1秒鐘的延遲,方法聲明如下。

public class Util {
    public static void delay() {
        int delay = 1000;
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}           

為了介紹本章的内容,getPrice方法會調用delay方法,并傳回一個随機計算的值,代碼清單如下所示。傳回随機計算的價格這段代碼看起來有些取巧。它使用charAt,依據産品的名稱,生成一個随機值作為價格。

public class Shop {
    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}           

很明顯,這個API的使用者(這個例子中為最佳價格查詢器)調用該方法時,它依舊會被阻塞。為等待同步事件完成而等待1秒鐘,這是無法接受的,尤其是考慮到最佳價格查詢器對網絡中的所有商店都要重複這種操作。本章接下來的小節中,你會了解如何以異步方式使用同步API解決這個問題。但是,出于學習如何設計異步API的考慮,我們會繼續這一節的内容,假裝我們還在深受這一困難的煩擾:你是一個睿智的商店店主,你已經意識到了這種同步API會為你的使用者帶來多麼痛苦的體驗,你希望以異步API的方式重寫這段代碼,讓使用者更流暢地通路你的網站。

将同步方法轉換為異步方法

為了實作這個目标,你首先需要将getPrice轉換為getPriceAsync方法,并修改它的傳回值:

public class Shop {
    ...
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();
        return futurePrice;
    }
    ...
}           

在這段代碼中,你建立了一個代表異步計算的CompletableFuture對象執行個體,它在計算完成時會包含計算的結果。接着,你調用fork建立了另一個線程去執行實際的價格計算工作,不等該耗時計算任務結束,直接傳回一個Future執行個體。當請求的産品價格最終計算得出時,你可以使用它的complete方法,結束completableFuture對象的運作,并設定變量的值。很顯然,這個新版Future的名稱也解釋了它所具有的特性。使用這個API的用戶端,可以通過下面的這段代碼對其進行調用。

public class ShopMain {

    public static void main(String[] args) {
        Shop shop = new Shop("最好的商店");
        long start = System.nanoTime();
        Future<Double> futurePrice = shop.getPriceAsync("我最喜歡的商品");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("調用時間 " + invocationTime);
        // 這裡可以做其他的事情,比如查詢其他的商店
        doSomethingElse();
        // 計算商品價格
        try {
            double price = futurePrice.get();
            System.out.printf("價格是 %.2f%n", price);
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("計算價格時間 " + retrievalTime);
    }

    private static void doSomethingElse() {
        System.out.println("正在查詢其他的商店...");
    }
}           

我們看到這段代碼中,客戶向商店查詢了某種商品的價格。由于商店提供了異步API,該次調用立刻傳回了一個Future對象,通過該對象客戶可以在将來的某個時刻取得商品的價格。這種方式下,客戶在進行商品價格查詢的同時,還能執行一些其他的任務,比如查詢其他家商店中商品的價格,不會呆呆地阻塞在那裡等待第一家商店傳回請求的結果。最後,如果所有有意義的工作都已經完成,客戶所有要執行的工作都依賴于商品價格時,再調用Future的get方法。執行了這個操作後,客戶要麼獲得Future中封裝的值(如果異步任務已經完成),要麼發生阻塞,直到該異步任務完成,期望的值能夠通路。上面的代碼中,輸出的結果:

調用時間 116
正在查詢其他的商店...
價格是 49107.07
計算價格時間 1172           

你一定已經發現getPriceAsync方法的調用時間遠遠早于最終價格計算完成的時間,在之前的代碼,你還會知道我們有可能避免發生用戶端被阻塞的風險。實際上這非常簡單,Future執行完畢可以發送一個通知,僅在計算結果可用時執行一個由Lambda表達式或者方法引用定義的回調函數。不過,我們當下不會對此進行讨論,現在我們要解決的是另一個問題:如何正确地管理異步任務執行過程中可能出現的錯誤。

錯誤處理

如果沒有意外,我們目前開發的代碼工作得很正常。但是,如果價格計算過程中産生了錯誤會怎樣呢?非常不幸,這種情況下你會得到一個相當糟糕的結果:用于提示錯誤的異常會被限制在試圖計算商品價格的目前線程的範圍内,最終會殺死該線程,而這會導緻等待get方法傳回結果的用戶端永久地被阻塞。

用戶端可以使用重載版本的get方法,它使用一個逾時參數來避免發生這樣的情況。這是一種值得推薦的做法,你應該盡量在你的代碼中添加逾時判斷的邏輯,避免發生類似的問題。使用這種方法至少能防止程式永久地等待下去,逾時發生時,程式會得到通知發生了TimeoutException。不過,也因為如此,你不會有機會發現計算商品價格的線程内到底發生了什麼問題才引發了這樣的失效。為了讓用戶端能了解商店無法提供請求商品價格的原因,你需要使用CompletableFuture的completeExceptionally方法将導緻CompletableFuture内發生問題的異常抛出。對代碼優化後的結果如下所示。

public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            // 如果價格計算正常結束,完成Future操作并設定商品價格
            futurePrice.complete(price);
        } catch (Exception e) {
            // 否則就抛出導緻失敗的異常,完成這次Future操作
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}           

用戶端現在會收到一個ExecutionException異常,該異常接收了一個包含失敗原因的Exception參數,即價格計算方法最初抛出的異常。是以,舉例來說,如果該方法抛出了一個運作時異常“product not available”,用戶端就會得到像下面這樣一段ExecutionException:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: product
    not availableat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
    at xin.codedream.java8.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
    ... 5 more
Caused by: java.lang.RuntimeException: product not available
    at xin.codedream.java8.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
    atxin.codedream.java8.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)
    at xin.codedream.java8.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:744)           

使用工廠方法supplyAsync建立CompletableFuture

目前為止我們已經了解了如何通過程式設計建立CompletableFuture對象以及如何擷取傳回值,雖然看起來這些操作已經比較友善,但還有進一步提升的空間,CompletableFuture類自身提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個流程,還不用擔心實作的細節。比如,采用supplyAsync方法後,你可以用一行語句重寫getPriceAsync方法,如下所示。

public Future<Double> getPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}           

太棒了!七八行才能實作的功能,我們現在隻需要一行就可以搞定了!supplyAsync方法接受一個生産者(Supplier)作為參數,傳回一個CompletableFuture對象,該對象完成異步執行後會讀取調用生産者方法的傳回值。生産者方法會交由ForkJoinPool池中的某個執行線程(Executor)運作,但是你也可以使用supplyAsync方法的重載版本,傳遞第二個參數指定不同的執行線程執行生産者方法。一般而言,向CompletableFuture的工廠方法傳遞可選參數,指定生産者方法的執行線程是可行的,在後面,你會使用這一能力,後面我們将使用适合你應用特性的執行線程改善程式的性能。

接下來剩餘部分中,我們會假設你非常不幸,無法控制Shop類提供API的具體實作,最終提供給你的API都是同步阻塞式的方法。這也是當你試圖使用服務提供的HTTP API時最常發生的情況。你會學到如何以異步的方式查詢多個商店,避免被單一的請求所阻塞,并由此提升你的“最佳價格查詢器”的性能和吞吐量。

讓你的代碼免受阻塞之苦

是以,你已經被要求進行“最佳價格查詢器”應用的開發了,不過你需要查詢的所有商店都如上面開始時介紹的那樣,隻提供了同步API。換句話說,你有一個商家的清單,如下所示:

public class BestPriceFinder {
    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"));
    ...
}           

你需要使用下面這樣的簽名實作一個方法,它接受産品名作為參數,傳回一個字元串清單,這個字元串清單中包括商店的名稱、該商店中指定商品的價格:

public List<String> findPrices(String product);           

你的第一個想法可能是使用我們在前面的章節中學習的Stream特性。你可能試圖寫出類似下面這個代碼(是的,作為第一個方案,如果你想到這些已經相當棒了!)。

好吧,這段代碼看起來非常直白。現在試着用該方法去查詢你最近這些天瘋狂着迷的唯一産品(是的,你已經猜到了,它就是Old-Mi-Mix3)。此外,也請記錄下方法的執行時間,通過這些資料,我們可以比較優化之後的方法會帶來多大的性能提升,具體的代碼如下。

public class BestPriceFinder {
    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"));

    public static void main(String[] args) {
        BestPriceFinder finder = new BestPriceFinder();
        finder.testFindPrices();
    }

    public void testFindPrices() {
        long start = System.nanoTime();
        System.out.println(findPrices("Old-Mi-Mix3"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("完成時間 " + duration);
    }

    public List<String> findPrices(String product) {
        return shops.stream()
                .map(shop -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }
}           

輸出結果:

[BestPrice 價格 109.64, LetsSaveBig 價格 143.13, MyFavoriteShop 價格 175.50, BuyItAll 價格 154.20]
完成時間 4184           

正如你預期的,findPrices方法的執行時間僅比4秒鐘多了那麼幾百毫秒,因為對這4個商店的查詢是順序進行的,并且一個查詢操作會阻塞另一個,每一個操作都要花費大約1秒左右的時間計算請求商品的價格。你怎樣才能改進這個結果呢?

使用并行流對請求進行并行操作

如果你看了第七章的筆記,那麼你應該想到的第一個,可能也是最快的改善方法是使用并行流來避免順序計算,如下所示。

public List<String> findPricesParallel(String product) {
    return shops.parallelStream()
            .map(shop -> String.format("%s 價格 %.2f",
                    shop.getName(), shop.getPrice(product)))
            .collect(toList());
}           

運作代碼,與最初的代碼執行結果相比較,你發現了新版findPrices的改進了吧。

[BestPrice 價格 109.64, LetsSaveBig 價格 143.13, MyFavoriteShop 價格 175.50, BuyItAll 價格 154.20]
完成時間 1248           

相當不錯啊!看起來這是個簡單但有效的主意:現在對四個不同商店的查詢實作了并行,是以完成所有操作的總耗時隻有1秒多一點兒。你能做得更好嗎?讓我們嘗試使用剛學過的CompletableFuture,将findPrices方法中對不同商店的同步調用替換為異步調用。

使用CompletableFuture 發起異步請求

你已經知道我們可以使用工廠方法supplyAsync建立CompletableFuture對象。讓我們把它利用起來:

public List<CompletableFuture<String>> findPricesFuture(String product) {
    return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
                    shop.getName(), shop.getPrice(product))))
            .collect(toList());
}           

使用這種方式,你會得到一個List>,清單中的每個CompletableFuture對象在計算完成後都包含商店的String類型的名稱。但是,由于你用CompletableFutures實作的findPrices方法要求傳回一個List,你需要等待所有的future執行完畢,将其包含的值抽取出來,填充到清單中才能傳回。

為了實作這個效果,你可以向最初的List>施加第二個map操作,對List中的所有future對象執行join操作,一個接一個地等待它們運作結束。注意CompletableFuture類中的join方法和Future接口中的get有相同的含義,并且也聲明在Future接口中,它們唯一的不同是join不會抛出任何檢測到的異常。使用它你不再需要使用try/catch語句塊讓你傳遞給第二個map方法的Lambda表達式變得過于臃腫。所有這些整合在一起,你就可以重新實作findPrices了,具體代碼如下。

public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
                    shop.getName(), shop.getPrice(product))))
            .collect(toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}           

運作下代碼了解下第三個版本findPrices方法的性能,你會得到下面這幾行輸出:

[BestPrice 價格 109.64, LetsSaveBig 價格 143.13, MyFavoriteShop 價格 175.50, BuyItAll 價格 154.20]
完成時間 2207           

這個結果讓人相當失望,不是嗎?超過2秒意味着利用CompletableFuture實作的版本,比剛開始的代碼中的原生順序執行且會發生阻塞的版本快。但是它的用時也差不多是使用并行流的前一個版本的兩倍。尤其是,考慮到從順序執行的版本轉換到并行流的版本隻做了非常小的改動,就讓人更加沮喪。

與此形成鮮明對比的是,我們為采用CompletableFutures完成的新版方法做了大量的工作!但,這就是全部的真相嗎?這種場景下使用CompletableFutures真的是浪費時間嗎?或者我們可能漏掉了某些重要的東西?繼續往下探究之前,讓我們休息幾分鐘,尤其是想想你測試代碼的機器是否足以以并行方式運作四個線程。

尋找更好的方案

并行流的版本工作得非常好,那是因為它能并行地執行四個任務,是以它幾乎能為每個商家配置設定一個線程。但是,如果你想要增加第五個商家到商店清單中,讓你的“最佳價格查詢”應用對其進行處理,這時會發生什麼情況?

public class BestPriceFinder {
    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"),
            new Shop("ShopEasy"));
    ...

    public List<String> findPricesParallel(String product) {
        return shops.parallelStream()
                .map(shop -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

    public List<String> findPricesSequential(String product) {
        return shops.stream()
                .map(shop -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }


    public List<String> findPricesFuture(String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product))))
                .collect(toList());

        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }
}

public class BestPriceFinderMain {

    private static BestPriceFinder bestPriceFinder = new BestPriceFinder();

    public static void main(String[] args) {
        execute("sequential", () -> bestPriceFinder.findPricesSequential("Old-Mi-Mix3"));
    }

    private static void execute(String msg, Supplier<List<String>> s) {
        long start = System.nanoTime();
        System.out.println(s.get());
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println(msg + " 完成時間 " + duration);
    }
}           

毫不意外,順序執行版本的執行還是需要大約5秒多鐘的時間,下面是執行的輸出:

[BestPrice 價格 109.64, LetsSaveBig 價格 143.13, MyFavoriteShop 價格 175.50, BuyItAll 價格 154.20, ShopEasy 價格 147.92]
sequential 完成時間 5139           

非常不幸,并行流版本的程式這次比之前也多消耗了差不多1秒鐘的時間,因為可以并行運作(通用線程池中處于可用狀态的)的四個線程現在都處于繁忙狀态,都在對前4個商店進行查詢。第五個查詢隻能等到前面某一個操作完成釋放出空閑線程才能繼續,它的運作結果如下:

[BestPrice 價格 163.19, LetsSaveBig 價格 141.77, MyFavoriteShop 價格 159.81, BuyItAll 價格 165.02, ShopEasy 價格 165.81]
parallel 完成時間 2106           

CompletableFuture版本的程式結果如何呢?我們也試着添加第5個商店對其進行了測試,結果如下:

[BestPrice 價格 144.31, LetsSaveBig 價格 142.49, MyFavoriteShop 價格 146.99, BuyItAll 價格 132.52, ShopEasy 價格 139.15]
composed CompletableFuture 完成時間 2004           

CompletableFuture版本的程式似乎比并行流版本的程式還快那麼一點兒。但是最後這個版本也不太令人滿意。比如,如果你試圖讓你的代碼處理9個商店,并行流版本耗時3143毫秒,而CompletableFuture版本耗時3009毫秒。它們看起來不相伯仲,究其原因都一樣:它們内部采用的是同樣的通用線程池,預設都使用固定數目的線程,具體線程數取決于Runtime.getRuntime().availableProcessors()的傳回值。然而,CompletableFuture具有一定的優勢,因為它允許你對執行器(Executor)進行配置,尤其是線程池的大小,讓它以更适合應用需求的方式進行配置,滿足程式的要求,而這是并行流API無法提供的。讓我們看看你怎樣利用這種配置上的靈活性帶來實際應用程式性能上的提升。

使用定制的執行器

就這個主題而言,明智的選擇似乎是建立一個配有線程池的執行器,線程池中線程的數目取決于你預計你的應用需要處理的負荷,但是你該如何選擇合适的線程數目呢?

《Java8實戰》-第十一章筆記(CompletableFuture:組合式異步程式設計)CompletableFuture:組合式異步程式設計

你的應用99%的時間都在等待商店的響應,是以估算出的W/C比率為100。這意味着如果你期望的CPU使用率是100%,你需要建立一個擁有400個線程的線程池。實際操作中,如果你建立的線程數比商店的數目更多,反而是一種浪費,因為這樣做之後,你線程池中的有些線程根本沒有機會被使用。出于這種考慮,我們建議你将執行器使用的線程數,與你需要查詢的商店數目設定為同一個值,這樣每個商店都應該對應一個服務線程。不過,為了避免發生由于商店的數目過多導緻伺服器超負荷而崩潰,你還是需要設定一個上限,比如100個線程。代碼清單如下所示。

private final Executor executor = Executors.newFixedThreadPool(100, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });           

注意,你現在正建立的是一個由守護線程構成的線程池。Java程式無法終止或者退出一個正在運作中的線程,是以最後剩下的那個線程會由于一直等待無法發生的事件而引發問題。與此相反,如果将線程标記為守護程序,意味着程式退出時它也會被回收。這二者之間沒有性能上的差異。現在,你可以将執行器作為第二個參數傳遞給supplyAsync工廠方法了。比如,你現在可以按照下面的方式建立一個可查詢指定商品價格的CompletableFuture對象:

CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)), executor)           

改進之後,使用CompletableFuture方案的程式處理5個商店結果:

[BestPrice 價格 144.31, LetsSaveBig 價格 142.49, MyFavoriteShop 價格 146.99, BuyItAll 價格 132.52, ShopEasy 價格 139.15]
composed CompletableFuture 完成時間 1004           

這個例子證明了要建立更适合你的應用特性的執行器,利用CompletableFutures向其送出任務執行是個不錯的主意。處理需大量使用異步操作的情況時,這幾乎是最有效的政策。

并行——使用流還是CompletableFutures?

目前為止,你已經知道對集合進行并行計算有兩種方式:要麼将其轉化為并行流,利用map這樣的操作開展工作,要麼枚舉出集合中的每一個元素,建立新的線程,在CompletableFuture内對其進行操作。後者提供了更多的靈活性,你可以調整線程池的大小,而這能幫助你確定整體的計算不會因為線程都在等待I/O而發生阻塞。書中使用這些API的建議如下。

  • 如果你進行的是計算密集型的操作,并且沒有I/O,那麼推薦使用Stream接口,因為實作簡單,同時效率也可能是最高的(如果所有的線程都是計算密集型的,那就沒有必要建立比處理器核數更多的線程)。
  • 反之,如果你并行的工作單元還涉及等待I/O的操作(包括網絡連接配接等待),那麼使用CompletableFuture靈活性更好,你可以像前文讨論的那樣,依據等待/計算,或者W/C的比率設定需要使用的線程數。這種情況不使用并行流的另一個原因是,處理流的流水線中如果發生I/O等待,流的延遲特性會讓我們很難判斷到底什麼時候觸發了等待。

現在你已經了解了如何利用CompletableFuture為你的使用者提供異步API,以及如何将一個同步又緩慢的服務轉換為異步的服務。不過到目前為止,我們每個Future中進行的都是單次的操作。

代碼

Github:

chap11

Gitee: