天天看點

CommpetableFuture使用anyOf過程中的一些優化思考

系列目錄:

  1. Spring WebFlux運用中的思考與對比
  2. CompletableFuture與Spring的Sleuth結合工具類
  3. CommpetableFuture使用anyOf過程中的一些優化思考
  4. 結合CompletableFuture與Spring的Sleuth結合工具類與allOf以及anyOf

CompletableFuture的allOf

首先我們看看

allOf

的定義:

public static CompletableFuture allOf(CompletableFuture... cfs) {
    // ...
}           

複制

這個方法接受若幹個傳回不同類型的

CompletableFuture

為參數, 傳回一個傳回為空(Void)的

CompletableFuture

。也就是說,這個方法其實就是傳回一個在所有參數完成之後也完成的傳回為空(Void)的

CompletableFuture

,也就是充當一個

signaling device

這個方法很好,尤其是并發擷取多種io的結果的時候。但是用這個方法,帶來了很多不便,最大的不便就是,傳回是Void,而不是所有的參數的傳回。這樣導緻我們,需要在聚合這些結果的那個服務方法裡面,把最終結果封裝好,否則,擷取的就是一個Void。舉個例子:

假設我的一個服務方法的傳回是多個接口在使用,這個方法需要同時調用三個io等待他們都傳回時,利用這三個io的傳回,拼裝成接口需要的字段。對于這個場景,我們可以有兩種寫法,第一種是基于回調的寫法,第二種是基于傳回的寫法,兩種都OK,看個人習慣,我個人傾向于基于傳回的寫法,這樣代碼是瀑布式的,基于回調的會導緻多層嵌套,導緻代碼可讀性降低。

** 結果類:**

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class Result {
   private String string;
   private List strings;
   private List integers;
}           

複制

** 基于回調:**

public static void baseOnCallBack(CompletableFuture resultCompletableFuture) {
   CompletableFuture> result1 = CompletableFuture.supplyAsync(() -> {
       //模拟io
       try {
           TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       return Lists.newArrayList("a", "b", "c");
   });
   CompletableFuture> result2 = CompletableFuture.supplyAsync(() -> {
       //模拟io
       try {
           TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       return Lists.newArrayList(1, 2, 3);
   });
   CompletableFuture result3 = CompletableFuture.supplyAsync(() -> {
       //模拟io
       try {
           TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       return "hash-test";
   });

   CompletableFuture.allOf(result1, result2, result3).thenAcceptAsync(v -> {
       resultCompletableFuture.complete(Result.builder()
               //一定存在的,因為已經完成了
               .string(result3.join())
               .strings(result1.join())
               .integers(result2.join())
               .build());
   });
}           

複制

** 基于傳回:**

public static CompletableFuture baseOnReturn() {
   CompletableFuture completableFuture = new CompletableFuture();
   CompletableFuture> result1 = CompletableFuture.supplyAsync(() -> {
       //模拟io
       try {
           TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       return Lists.newArrayList("a", "b", "c");
   });
   CompletableFuture> result2 = CompletableFuture.supplyAsync(() -> {
       //模拟io
       try {
           TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       return Lists.newArrayList(1, 2, 3);
   });
   CompletableFuture result3 = CompletableFuture.supplyAsync(() -> {
       //模拟io
       try {
           TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       return "hash-test";
   });
   CompletableFuture.allOf(result1, result2, result3).thenAcceptAsync(v -> {
       completableFuture.complete(Result.builder()
               //一定存在的,因為已經完成了
               .string(result3.join())
               .strings(result1.join())
               .integers(result2.join())
               .build());
   });
   return completableFuture;
}           

複制

基于回調的接口使用結果:

CompletableFuture completableFuture = new CompletableFuture();
baseOnCallBack(completableFuture);
completableFuture = completableFuture.thenAcceptAsync(result -> {
    System.out.println("baseOnCallback: " + result);
});           

複制

基于傳回的接口使用結果:

CompletableFuture voidCompletableFuture = baseOnReturn().thenAcceptAsync(result -> {
    System.out.println("baseOnReturn: " + result);
});           

複制

可以看出,一層嵌套也是基于傳回的代碼看上去更優雅。

我們再來思考下,如果allOf中的所有

CompletableFuture

都傳回的是同一個類型的結果,例如String,那麼可不可以讓allOf直接傳回

List

呢?

我們可以将一個allOf變成多個allOf這麼實作:

public static  CompletableFuture> allOf(Collection> futures) {
    return futures.stream().collect(Collectors.collectingAndThen(
                    Collectors.toList(),
                    l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))
                            .thenApply(v -> l.stream().map(CompletableFuture::join).collect(Collectors.toList()))
            )
    );
}           

複制