天天看點

java 實戰8 第十一章讀書筆記并行和并發CompletableFuture 和 Future接口

并行和并發

  • 第七章的fork-join 以及并行stream 是實作并行的工具
  • future接口實作在一個cpu上執行松耦合任務,充分利用cpu的周期(實作并發!)

CompletableFuture 和 Future接口

  • 錯誤處理的時候,盡量使用帶有逾時判斷的邏輯,避免發生類似問題,避免程式一直等待下去
  • 為了讓用戶端知道發生的異常,需要使用completableFuture 的 completeExceptionally方法包裝異常,否則異步任務的異常會被吞掉
  • 使用工廠方法supplyAsync建立 CompletableFuture
//可以避免異步任務的異常丢失
public Future<Double> getPriceAsync(String product) { 
    return CompletableFuture.supplyAsync(() -> calculatePrice(product)); 
}
           

代碼樣例

package com.example.demo.async;

import lombok.*;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toList;

@NoArgsConstructor
@Getter
@Setter
public class Shop {

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    //同步
    private double calculatePrice(String product) {
        delay();
        return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
    }

    //異步
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        });
        return futurePrice;
    }


    public Future<Double> getPriceAsyncMod(String product) {
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }


    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public List<String> findPrices(String product) {
        return shops.stream()
                .map(shop -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }


    public List<String> findPricesParallel(String product) {
        return shops.parallelStream()
                .map(shop -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

    public List<String> findPricesFuture(String product) {
        List<CompletableFuture<String>> collect = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
                        shop.getName(), shop.getPrice(product))))
                .collect(toList());

        return collect.stream().map(CompletableFuture::join).collect(toList());
    }



    public void testFindPrices() {
        long start = System.nanoTime();
        System.out.println(findPrices("Mi9"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("完成時間 " + duration);
    }

    public void testFindPriceParallel() {
        long start = System.nanoTime();
        System.out.println(findPricesParallel("Mi9"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("并行流完成時間 " + duration);

    }

    public void testFindPriceFuture() {
        long start = System.nanoTime();
        System.out.println(findPricesFuture("Mi9"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("并行流完成時間 " + duration);

    }

    static final List<Shop> shops = Arrays.asList(new Shop("萬達"), new Shop("蘇甯"), new Shop("永輝"), new Shop("蘇果")
            , new Shop("萬達1"), new Shop("蘇甯1"), new Shop("永輝1"), new Shop("蘇果1")
            , new Shop("萬達2"), new Shop("蘇甯2"), new Shop("永輝2"), new Shop("蘇果2"));


    public static void main(String[] args) {

        Shop shop = new Shop();
        shop.testFindPrices();//同步
        shop.testFindPriceParallel();//并行流
        shop.testFindPriceFuture();//future實作
    }

}

           

parallelStream 和 CompletableFuture 差別

parallelStream 底層用的是forkJoin架構,預設使用固定數目的線程,線程數取決于 Runtime().getRuntime().availableProcessors()

CompletableFuture 則比較靈活。可以對線程池大小進行調整

目前為止,你已經知道對集合進行并行計算有兩種方式:要麼将其轉化為并行流,利用map這樣的操作開展工作,要麼枚舉出集合中的每一個元素,建立新的線程,在CompletableFuture内對其進行操作。後者提供了更多的靈活性,你可以調整線程池的大小,而這能幫助你確定整體的計算不會因為線程都在等待I/O而發生阻塞。

我們對使用這些API的建議如下。

  • 如果你進行的是計算密集型的操作,并且沒有I/O,那麼推薦使用Stream接口,因為實作簡單,同時效率也可能是最高的(如果所有的線程都是計算密集型的,那就沒有必要建立比處理器核數更多的線程)。
  • 反之,如果你并行的工作單元還涉及等待I/O的操作(包括網絡連接配接等待),那麼使用CompletableFuture靈活性更好,你可以像前文讨論的那樣,依據等待/計算,或者W/C的比率設定需要使用的線程數。這種情況不使用并行流的另一個原因是,處理流的流水線中如果發生I/O等待,流的延遲特性會讓我們很難判斷到底什麼時候觸發了等待。

對異步任務進行流水線操作

  • thenCompose ,主線程仍然不會被阻塞
public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures =
     shops.stream() 
     .map(shop -> CompletableFuture.supplyAsync( 
     () -> shop.getPrice(product), executor))//getPrice耗時操作,擷取商品的價格字元串,使用異步方式
     .map(future -> future.thenApply(Quote::parse)) //将價格字元串解析成Quote對象(包裝了價格,折扣率等)
     .map(future -> future.thenCompose(quote -> 
     CompletableFuture.supplyAsync( 
     () -> Discount.applyDiscount(quote), executor))) //異步計算商品最終價格
     .collect(toList());
    return priceFutures.stream() 
     .map(CompletableFuture::join) //等待流中的所有Future執行完畢,并提取各自的傳回值
     .collect(toList());
}
           

thenapply()是傳回的是非CompletableFuture類型:它的功能相當于将CompletableFuture轉換成CompletableFuture。

thenCompose()用來連接配接兩個CompletableFuture,傳回值是新的CompletableFuture:

thenCompose方法允許你對兩個異步操作進行流水線,第一個操作完成時,将其結果作為參數傳遞給第二個操作。

  • 用thenCombine将兩個 CompletableFuture 對象整合起來,無論它們是否存在依賴
thenCombine方法,它接收名為BiFunction的第二參數,這個參數定義了當兩個CompletableFuture對象完成計算後,結果如何合并。同thenCompose方法一樣,thenCombine方法也提供有一個Async的版本。這裡,如果使用thenCombineAsync會導緻BiFunction中定義的合并操作被送出到線程池中,由另一個任務以異步的方式執行。
eg:有一家商店提供的價格是以歐元(EUR)計價的,但是你希望以美元的方式提供給你的客戶:

Future<double> futurePriceInUSD = 
 CompletableFuture.supplyAsync(() -> shop.getPrice(product)) 
 .thenCombine( 
 CompletableFuture.supplyAsync( 
 () -> exchangeService.getRate(Money.EUR, Money.USD)), 
 (price, rate) -> price * rate 
 );
           
  • 響應 CompletableFuture 的 completion 事件
隻要有商店傳回商品價格就在第一時間顯示傳回值,不再等待那些還未傳回的商店(有些甚至會發生逾時)。Java 8的CompletableFuture通 過thenAccept方法提供了這一功能,它接收CompletableFuture執行完畢後的傳回值做參數。
重構findPrices方法傳回一個由Future構成的流
public Stream<CompletableFuture<String>> findPricesStream(String product) { 
 return shops.stream() 
 .map(shop -> CompletableFuture.supplyAsync( 
 () -> shop.getPrice(product), executor)) 
 .map(future -> future.thenApply(Quote::parse)) 
 .map(future -> future.thenCompose(quote -> 
 CompletableFuture.supplyAsync( 
 () -> Discount.applyDiscount(quote), executor))); 
}

findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));

           

由于thenAccept方法已經定義了如何處理CompletableFuture傳回的結果,一旦CompletableFuture計算得到結果,它就傳回一個CompletableFuture。對這個<CompletableFuture>對象,你能做的事非常有限,隻能等待其運作結束。

繼續閱讀