系列目錄:
- Spring WebFlux運用中的思考與對比
- CompletableFuture與Spring的Sleuth結合工具類
- CommpetableFuture使用anyOf過程中的一些優化思考
- 結合CompletableFuture與Spring的Sleuth結合工具類與allOf以及anyOf
CompletableFuture的allOf
首先我們看看
allOf
的定義:
public static CompletableFuture allOf(CompletableFuture... cfs) {
// ...
}
複制
這個方法接受若幹個傳回不同類型的
CompletableFuture
為參數, 傳回一個傳回為空(Void)的
CompletableFuture
。也就是說,這個方法其實就是傳回一個在所有參數完成之後也完成的傳回為空(Void)的
CompletableFuture
,也就是充當一個
signaling device
這個方法很好,尤其是并發擷取多種io的結果的時候。但是用這個方法,帶來了很多不便,最大的不便就是,傳回是Void,而不是所有的參數的傳回。這樣導緻我們,需要在聚合這些結果的那個服務方法裡面,把最終結果封裝好,否則,擷取的就是一個Void。舉個例子:
假設我的一個服務方法的傳回是多個接口在使用,這個方法需要同時調用三個io等待他們都傳回時,利用這三個io的傳回,拼裝成接口需要的字段。對于這個場景,我們可以有兩種寫法,第一種是基于回調的寫法,第二種是基于傳回的寫法,兩種都OK,看個人習慣,我個人傾向于基于傳回的寫法,這樣代碼是瀑布式的,基于回調的會導緻多層嵌套,導緻代碼可讀性降低。
** 結果類:**
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class Result {
private String string;
private List strings;
private List integers;
}
複制
** 基于回調:**
public static void baseOnCallBack(CompletableFuture resultCompletableFuture) {
CompletableFuture> result1 = CompletableFuture.supplyAsync(() -> {
//模拟io
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return Lists.newArrayList("a", "b", "c");
});
CompletableFuture> result2 = CompletableFuture.supplyAsync(() -> {
//模拟io
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return Lists.newArrayList(1, 2, 3);
});
CompletableFuture result3 = CompletableFuture.supplyAsync(() -> {
//模拟io
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hash-test";
});
CompletableFuture.allOf(result1, result2, result3).thenAcceptAsync(v -> {
resultCompletableFuture.complete(Result.builder()
//一定存在的,因為已經完成了
.string(result3.join())
.strings(result1.join())
.integers(result2.join())
.build());
});
}
複制
** 基于傳回:**
public static CompletableFuture baseOnReturn() {
CompletableFuture completableFuture = new CompletableFuture();
CompletableFuture> result1 = CompletableFuture.supplyAsync(() -> {
//模拟io
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return Lists.newArrayList("a", "b", "c");
});
CompletableFuture> result2 = CompletableFuture.supplyAsync(() -> {
//模拟io
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return Lists.newArrayList(1, 2, 3);
});
CompletableFuture result3 = CompletableFuture.supplyAsync(() -> {
//模拟io
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hash-test";
});
CompletableFuture.allOf(result1, result2, result3).thenAcceptAsync(v -> {
completableFuture.complete(Result.builder()
//一定存在的,因為已經完成了
.string(result3.join())
.strings(result1.join())
.integers(result2.join())
.build());
});
return completableFuture;
}
複制
基于回調的接口使用結果:
CompletableFuture completableFuture = new CompletableFuture();
baseOnCallBack(completableFuture);
completableFuture = completableFuture.thenAcceptAsync(result -> {
System.out.println("baseOnCallback: " + result);
});
複制
基于傳回的接口使用結果:
CompletableFuture voidCompletableFuture = baseOnReturn().thenAcceptAsync(result -> {
System.out.println("baseOnReturn: " + result);
});
複制
可以看出,一層嵌套也是基于傳回的代碼看上去更優雅。
我們再來思考下,如果allOf中的所有
CompletableFuture
都傳回的是同一個類型的結果,例如String,那麼可不可以讓allOf直接傳回
List
呢?
我們可以将一個allOf變成多個allOf這麼實作:
public static CompletableFuture> allOf(Collection> futures) {
return futures.stream().collect(Collectors.collectingAndThen(
Collectors.toList(),
l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))
.thenApply(v -> l.stream().map(CompletableFuture::join).collect(Collectors.toList()))
)
);
}
複制