本文主要学习了Java中异步调用涉及到的Future和CompletableFuture。
1 Future
1.1 说明
在执行多个任务的时候,可以使用Java标准库提供的线程池,提交的任务只需要实现Runnable接口,就可以交给线程池去执行。
但是Runnable接口有个问题,那就是没有返回值。为此,Java标准库还提供了Callable接口,比Runnable接口多了返回值。
JDK在1.5之后增加了一个Future接口,用于获取异步执行的结果。
在Executor中提供的execute方法之外,在Executor的子接口ExecutorService中还提供了submit方法,submit方法的返回值是Future类型的对象。
1.2 使用
1.2.1 通过Callable获得返回值
使用Callable作为入参,获取Future结果:
1 <T> Future<T> submit(Callable<T> task);
这种方式是最常用的一种,通过Callable对象存储结果,通过Future对象返回获取结果。
举例如下:
1 public static void main(String[] args) {
2 ExecutorService executor = Executors.newSingleThreadExecutor();
3 Future<String> future = executor.submit(
4 new Callable<String>() {
5 @Override
6 public String call() throws Exception {
7 return "Hello World";
8 }
9 }
10 );
11 try {
12 System.out.print(future.get());
13 } catch (Exception e) {
14 e.printStackTrace();
15 } finally {
16 executor.shutdown();
17 }
18 }
1.2.2 通过传入指定对象获取返回值
使用Runnable和返回对象作为入参,获取Future结果:
1 <T> Future<T> submit(Runnable task, T result);
通过传入的对象存储返回值,通过Future对象返回获取结果。
1 public static void main(String[] args) {
2 final String[] result = new String[1];
3 ExecutorService executor = Executors.newSingleThreadExecutor();
4 Future<String[]> future = executor.submit(
5 new Runnable() {
6 @Override
7 public void run() {
8 result[0] = "Hello World";
9 }
10 }, result
11 );
12 try {
13 System.out.print(future.get()[0]);
14 } catch (Exception e) {
15 e.printStackTrace();
16 } finally {
17 executor.shutdown();
18 }
19 }
1.2.3 无需返回值
使用Runnable作为入参:
1 Future<?> submit(Runnable task);
这种方式无需获取返回值。
1 public static void main(String[] args) {
2 ExecutorService executor = Executors.newSingleThreadExecutor();
3 Future future = executor.submit(
4 new Runnable() {
5 @Override
6 public void run() {
7 }
8 }
9 );
10 try {
11 System.out.print(future.get());
12 } catch (Exception e) {
13 e.printStackTrace();
14 } finally {
15 executor.shutdown();
16 }
17 }
2 CompletableFuture
2.1 说明
虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果。
在JDK1.8中,CompletableFuture提供了非常强大的Future的扩展功能,简化了异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。
CompletableFuture实现了CompletionStage接口和Future接口,CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
2.2 使用
2.2.1 新建
CompletableFuture提供了四个静态方法来创建一个异步操作:
1 public static CompletableFuture<Void> runAsync(Runnable runnable)
2 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
3 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
4 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync表示创建无返回值的异步任务,相当于Runnable作为参数的submit方法:
1 public static void main(String[] args) {
2 CompletableFuture<Void> cf = CompletableFuture.runAsync(()->{
3 System.out.println(Thread.currentThread().getName() + "-runAsync");
4 });
5 try {
6 System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
7 } catch (Exception e) {
8 e.printStackTrace();
9 }
10 }
结果如下:
1 ForkJoinPool.commonPool-worker-9-runAsync
2 main-result-null
supplyAsync表示创建带返回值的异步任务的,相当于Callable作为参数的submit方法:
1 public static void main(String[] args) {
2 CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{
3 System.out.println(Thread.currentThread().getName() + "-supplyAsync");
4 return "test";
5 });
6 try {
7 System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
8 } catch (Exception e) {
9 e.printStackTrace();
10 }
11 }
1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 main-result-test
2.2.2 完成时回调
当CompletableFuture的计算结果完成,可以执行特定的方法:
1 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
2 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
3 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
whenComplete会在当前执行的线程继续执行CompletableFuture定义的任务:
1 public static void main(String[] args) {
2 CompletableFuture<Integer> future = new CompletableFuture<Integer>();
3 new Thread(() -> {
4 System.out.println(Thread.currentThread().getName() + "-启动");
5 try {
6 Thread.sleep(1000);
7 } catch (Exception e) {
8 e.printStackTrace();
9 }
10 System.out.println(Thread.currentThread().getName() + "-完成");
11 future.complete(100);
12 System.out.println(Thread.currentThread().getName() + "-结束");
13 }).start();
14 future.whenComplete((t, u) -> {
15 try {
16 Thread.sleep(1000);
17 } catch (Exception e) {
18 e.printStackTrace();
19 }
20 System.out.println(Thread.currentThread().getName() + "-t-" + t);
21 System.out.println(Thread.currentThread().getName() + "-u-" + u);
22 });
23 try {
24 System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
25 } catch (Exception e) {
26 e.printStackTrace();
27 }
28 }
执行结果:
1 Thread-0-启动
2 //等待1s
3 Thread-0-完成
4 main-return-100
5 //等待1s
6 Thread-0-v-100
7 Thread-0-ex-null
8 Thread-0-结束
whenCompleteAsync会在线程池起一个新的线程执行CompletableFuture定义的任务:
1 public static void main(String[] args) {
2 CompletableFuture<Integer> future = new CompletableFuture<Integer>();
3 new Thread(() -> {
4 System.out.println(Thread.currentThread().getName() + "-启动");
5 try {
6 Thread.sleep(1000);
7 } catch (Exception e) {
8 e.printStackTrace();
9 }
10 System.out.println(Thread.currentThread().getName() + "-完成");
11 future.complete(100);
12 System.out.println(Thread.currentThread().getName() + "-结束");
13 }).start();
14 future.whenCompleteAsync((t, u) -> {
15 try {
16 Thread.sleep(1000);
17 } catch (Exception e) {
18 e.printStackTrace();
19 }
20 System.out.println(Thread.currentThread().getName() + "-t-" + t);
21 System.out.println(Thread.currentThread().getName() + "-u-" + u);
22 });
23 try {
24 System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
25 // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
26 Thread.sleep(2000);
27 } catch (Exception e) {
28 e.printStackTrace();
29 }
30 }
1 Thread-0-启动
2 //等待1s
3 Thread-0-完成
4 main-return-100
5 Thread-0-结束
6 //等待1s
7 ForkJoinPool.commonPool-worker-9-v-100
8 ForkJoinPool.commonPool-worker-9-ex-null
9 //等待1s
2.2.3 异常处理
当CompletableFuture的计算抛出异常的时候,可以执行特定的方法:
1 public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
exceptionally会在抛出异常的时候执行:
1 public static void main(String[] args) {
2 CompletableFuture<Integer> future = new CompletableFuture<Integer>();
3 new Thread(() -> {
4 future.completeExceptionally(new NullPointerException());
5 }).start();
6 future.exceptionally(e -> {
7 e.printStackTrace();
8 return 100;
9 });
10 }
1 java.lang.NullPointerException
2.2.4 线程依赖
当一个线程依赖另一个线程时,可以使用方法来把这两个线程串行化:
1 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
2 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
3 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
串行化执行:
1 public static void main(String[] args) {
2 CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
3 System.out.println(Thread.currentThread().getName() + "-supplyAsync");
4 return "test";
5 }).thenApply(t->{
6 System.out.println(Thread.currentThread().getName() + "-t-" + t);
7 return t + "-apply";
8 });
9 try {
10 System.out.println(Thread.currentThread().getName() + "-" + future.get());
11 } catch (Exception e) {
12 e.printStackTrace();
13 }
14 }
1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 ForkJoinPool.commonPool-worker-9-t-test
3 main-test-apply
2.2.5 消费处理
接收任务的处理结果,并消费处理,无返回结果:
1 public CompletionStage<Void> thenAccept(Consumer<? super T> action)
2 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action)
3 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
消费处理结果:
1 public static void main(String[] args) {
2 CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
3 System.out.println(Thread.currentThread().getName() + "-supplyAsync");
4 return "test";
5 }).thenAccept(t->{
6 System.out.println(Thread.currentThread().getName() + "-t-" + t);
7 });
8 try {
9 System.out.println(Thread.currentThread().getName() + "-" + future.get());
10 } catch (Exception e) {
11 e.printStackTrace();
12 }
13 }
1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 main-t-test
3 main-null
2.2.6 取消返回
不接收任务的处理结果,取消返回:
1 public CompletionStage<Void> thenRun(Runnable action)
2 public CompletionStage<Void> thenRunAsync(Runnable action)
3 public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor)
1 public static void main(String[] args) {
2 CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
3 System.out.println(Thread.currentThread().getName() + "-supplyAsync");
4 return "test";
5 }).thenRun(()->{
6 System.out.println(Thread.currentThread().getName() + "-t-null");
7 });
8 try {
9 System.out.println(Thread.currentThread().getName() + "-" + future.get());
10 } catch (Exception e) {
11 e.printStackTrace();
12 }
13 }
1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 main-t-null
3 main-null