并行和并發
- 第七章的fork-join 以及并行stream 是實作并行的工具
- future接口實作在一個cpu上執行松耦合任務,充分利用cpu的周期(實作并發!)
CompletableFuture 和 Future接口
- 錯誤處理的時候,盡量使用帶有逾時判斷的邏輯,避免發生類似問題,避免程式一直等待下去
- 為了讓用戶端知道發生的異常,需要使用completableFuture 的 completeExceptionally方法包裝異常,否則異步任務的異常會被吞掉
- 使用工廠方法supplyAsync建立 CompletableFuture
//可以避免異步任務的異常丢失
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
代碼樣例
package com.example.demo.async;
import lombok.*;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
@NoArgsConstructor
@Getter
@Setter
public class Shop {
public double getPrice(String product) {
return calculatePrice(product);
}
//同步
private double calculatePrice(String product) {
delay();
return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
}
//異步
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
});
return futurePrice;
}
public Future<Double> getPriceAsyncMod(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
public static void delay() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private String name;
public Shop(String name) {
this.name = name;
}
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s 價格 %.2f",
shop.getName(), shop.getPrice(product)))
.collect(toList());
}
public List<String> findPricesParallel(String product) {
return shops.parallelStream()
.map(shop -> String.format("%s 價格 %.2f",
shop.getName(), shop.getPrice(product)))
.collect(toList());
}
public List<String> findPricesFuture(String product) {
List<CompletableFuture<String>> collect = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價格 %.2f",
shop.getName(), shop.getPrice(product))))
.collect(toList());
return collect.stream().map(CompletableFuture::join).collect(toList());
}
public void testFindPrices() {
long start = System.nanoTime();
System.out.println(findPrices("Mi9"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("完成時間 " + duration);
}
public void testFindPriceParallel() {
long start = System.nanoTime();
System.out.println(findPricesParallel("Mi9"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("并行流完成時間 " + duration);
}
public void testFindPriceFuture() {
long start = System.nanoTime();
System.out.println(findPricesFuture("Mi9"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("并行流完成時間 " + duration);
}
static final List<Shop> shops = Arrays.asList(new Shop("萬達"), new Shop("蘇甯"), new Shop("永輝"), new Shop("蘇果")
, new Shop("萬達1"), new Shop("蘇甯1"), new Shop("永輝1"), new Shop("蘇果1")
, new Shop("萬達2"), new Shop("蘇甯2"), new Shop("永輝2"), new Shop("蘇果2"));
public static void main(String[] args) {
Shop shop = new Shop();
shop.testFindPrices();//同步
shop.testFindPriceParallel();//并行流
shop.testFindPriceFuture();//future實作
}
}
parallelStream 和 CompletableFuture 差別
parallelStream 底層用的是forkJoin架構,預設使用固定數目的線程,線程數取決于 Runtime().getRuntime().availableProcessors()
CompletableFuture 則比較靈活。可以對線程池大小進行調整
目前為止,你已經知道對集合進行并行計算有兩種方式:要麼将其轉化為并行流,利用map這樣的操作開展工作,要麼枚舉出集合中的每一個元素,建立新的線程,在CompletableFuture内對其進行操作。後者提供了更多的靈活性,你可以調整線程池的大小,而這能幫助你確定整體的計算不會因為線程都在等待I/O而發生阻塞。
我們對使用這些API的建議如下。
- 如果你進行的是計算密集型的操作,并且沒有I/O,那麼推薦使用Stream接口,因為實作簡單,同時效率也可能是最高的(如果所有的線程都是計算密集型的,那就沒有必要建立比處理器核數更多的線程)。
- 反之,如果你并行的工作單元還涉及等待I/O的操作(包括網絡連接配接等待),那麼使用CompletableFuture靈活性更好,你可以像前文讨論的那樣,依據等待/計算,或者W/C的比率設定需要使用的線程數。這種情況不使用并行流的另一個原因是,處理流的流水線中如果發生I/O等待,流的延遲特性會讓我們很難判斷到底什麼時候觸發了等待。
對異步任務進行流水線操作
- thenCompose ,主線程仍然不會被阻塞
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))//getPrice耗時操作,擷取商品的價格字元串,使用異步方式
.map(future -> future.thenApply(Quote::parse)) //将價格字元串解析成Quote對象(包裝了價格,折扣率等)
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor))) //異步計算商品最終價格
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join) //等待流中的所有Future執行完畢,并提取各自的傳回值
.collect(toList());
}
thenapply()是傳回的是非CompletableFuture類型:它的功能相當于将CompletableFuture轉換成CompletableFuture。
thenCompose()用來連接配接兩個CompletableFuture,傳回值是新的CompletableFuture:
thenCompose方法允許你對兩個異步操作進行流水線,第一個操作完成時,将其結果作為參數傳遞給第二個操作。
- 用thenCombine将兩個 CompletableFuture 對象整合起來,無論它們是否存在依賴
thenCombine方法,它接收名為BiFunction的第二參數,這個參數定義了當兩個CompletableFuture對象完成計算後,結果如何合并。同thenCompose方法一樣,thenCombine方法也提供有一個Async的版本。這裡,如果使用thenCombineAsync會導緻BiFunction中定義的合并操作被送出到線程池中,由另一個任務以異步的方式執行。
eg:有一家商店提供的價格是以歐元(EUR)計價的,但是你希望以美元的方式提供給你的客戶:
Future<double> futurePriceInUSD =
CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(
() -> exchangeService.getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate
);
- 響應 CompletableFuture 的 completion 事件
隻要有商店傳回商品價格就在第一時間顯示傳回值,不再等待那些還未傳回的商店(有些甚至會發生逾時)。Java 8的CompletableFuture通 過thenAccept方法提供了這一功能,它接收CompletableFuture執行完畢後的傳回值做參數。
重構findPrices方法傳回一個由Future構成的流
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)));
}
findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));
由于thenAccept方法已經定義了如何處理CompletableFuture傳回的結果,一旦CompletableFuture計算得到結果,它就傳回一個CompletableFuture。對這個<CompletableFuture>對象,你能做的事非常有限,隻能等待其運作結束。