天天看点

011Java并发包011异步调用

本文主要学习了Java中异步调用涉及到的Future和CompletableFuture。

1 Future

1.1 说明

在执行多个任务的时候,可以使用Java标准库提供的线程池,提交的任务只需要实现Runnable接口,就可以交给线程池去执行。

但是Runnable接口有个问题,那就是没有返回值。为此,Java标准库还提供了Callable接口,比Runnable接口多了返回值。

JDK在1.5之后增加了一个Future接口,用于获取异步执行的结果。

在Executor中提供的execute方法之外,在Executor的子接口ExecutorService中还提供了submit方法,submit方法的返回值是Future类型的对象。

1.2 使用

1.2.1 通过Callable获得返回值

使用Callable作为入参,获取Future结果:

1 <T> Future<T> submit(Callable<T> task);      

这种方式是最常用的一种,通过Callable对象存储结果,通过Future对象返回获取结果。

举例如下:

1 public static void main(String[] args) {
 2     ExecutorService executor = Executors.newSingleThreadExecutor();
 3     Future<String> future = executor.submit(
 4             new Callable<String>() {
 5                 @Override
 6                 public String call() throws Exception {
 7                     return "Hello World";
 8                 }
 9             }
10     );
11     try {
12         System.out.print(future.get());
13     } catch (Exception e) {
14         e.printStackTrace();
15     } finally {
16         executor.shutdown();
17     }
18 }      

1.2.2 通过传入指定对象获取返回值

使用Runnable和返回对象作为入参,获取Future结果:

1 <T> Future<T> submit(Runnable task, T result);      

通过传入的对象存储返回值,通过Future对象返回获取结果。

1 public static void main(String[] args) {
 2     final String[] result = new String[1];
 3     ExecutorService executor = Executors.newSingleThreadExecutor();
 4     Future<String[]> future = executor.submit(
 5             new Runnable() {
 6                 @Override
 7                 public void run() {
 8                     result[0] = "Hello World";
 9                 }
10             }, result
11     );
12     try {
13         System.out.print(future.get()[0]);
14     } catch (Exception e) {
15         e.printStackTrace();
16     } finally {
17         executor.shutdown();
18     }
19 }      

1.2.3 无需返回值

使用Runnable作为入参:

1 Future<?> submit(Runnable task);      

这种方式无需获取返回值。

1 public static void main(String[] args) {
 2     ExecutorService executor = Executors.newSingleThreadExecutor();
 3     Future future = executor.submit(
 4             new Runnable() {
 5                 @Override
 6                 public void run() {
 7                 }
 8             }
 9     );
10     try {
11         System.out.print(future.get());
12     } catch (Exception e) {
13         e.printStackTrace();
14     } finally {
15         executor.shutdown();
16     }
17 }      

2 CompletableFuture

2.1 说明

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果。

在JDK1.8中,CompletableFuture提供了非常强大的Future的扩展功能,简化了异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。

CompletableFuture实现了CompletionStage接口和Future接口,CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

2.2 使用

2.2.1 新建

CompletableFuture提供了四个静态方法来创建一个异步操作:

1 public static CompletableFuture<Void> runAsync(Runnable runnable)
2 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
3 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
4 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)      

runAsync表示创建无返回值的异步任务,相当于Runnable作为参数的submit方法:

1 public static void main(String[] args) {
 2     CompletableFuture<Void> cf = CompletableFuture.runAsync(()->{
 3         System.out.println(Thread.currentThread().getName() + "-runAsync");
 4     });
 5     try {
 6         System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
 7     } catch (Exception e) {
 8         e.printStackTrace();
 9     }
10 }      

结果如下:

1 ForkJoinPool.commonPool-worker-9-runAsync
2 main-result-null      

supplyAsync表示创建带返回值的异步任务的,相当于Callable作为参数的submit方法:

1 public static void main(String[] args) {
 2     CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{
 3         System.out.println(Thread.currentThread().getName() + "-supplyAsync");
 4         return "test";
 5     });
 6     try {
 7         System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
 8     } catch (Exception e) {
 9         e.printStackTrace();
10     }
11 }      
1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 main-result-test      

2.2.2 完成时回调

当CompletableFuture的计算结果完成,可以执行特定的方法:

1 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
2 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
3 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)      

whenComplete会在当前执行的线程继续执行CompletableFuture定义的任务:

1 public static void main(String[] args) {
 2     CompletableFuture<Integer> future = new CompletableFuture<Integer>();
 3     new Thread(() -> {
 4         System.out.println(Thread.currentThread().getName() + "-启动");
 5         try {
 6             Thread.sleep(1000);
 7         } catch (Exception e) {
 8             e.printStackTrace();
 9         }
10         System.out.println(Thread.currentThread().getName() + "-完成");
11         future.complete(100);
12         System.out.println(Thread.currentThread().getName() + "-结束");
13     }).start();
14     future.whenComplete((t, u) -> {
15         try {
16             Thread.sleep(1000);
17         } catch (Exception e) {
18             e.printStackTrace();
19         }
20         System.out.println(Thread.currentThread().getName() + "-t-" + t);
21         System.out.println(Thread.currentThread().getName() + "-u-" + u);
22     });
23     try {
24         System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
25     } catch (Exception e) {
26         e.printStackTrace();
27     }
28 }      

执行结果:

1 Thread-0-启动
2 //等待1s
3 Thread-0-完成
4 main-return-100
5 //等待1s
6 Thread-0-v-100
7 Thread-0-ex-null
8 Thread-0-结束      

whenCompleteAsync会在线程池起一个新的线程执行CompletableFuture定义的任务:

1 public static void main(String[] args) {
 2     CompletableFuture<Integer> future = new CompletableFuture<Integer>();
 3     new Thread(() -> {
 4         System.out.println(Thread.currentThread().getName() + "-启动");
 5         try {
 6             Thread.sleep(1000);
 7         } catch (Exception e) {
 8             e.printStackTrace();
 9         }
10         System.out.println(Thread.currentThread().getName() + "-完成");
11         future.complete(100);
12         System.out.println(Thread.currentThread().getName() + "-结束");
13     }).start();
14     future.whenCompleteAsync((t, u) -> {
15         try {
16             Thread.sleep(1000);
17         } catch (Exception e) {
18             e.printStackTrace();
19         }
20         System.out.println(Thread.currentThread().getName() + "-t-" + t);
21         System.out.println(Thread.currentThread().getName() + "-u-" + u);
22     });
23     try {
24         System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
25         // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
26         Thread.sleep(2000);
27     } catch (Exception e) {
28         e.printStackTrace();
29     }
30 }      
1 Thread-0-启动
2 //等待1s
3 Thread-0-完成
4 main-return-100
5 Thread-0-结束
6 //等待1s
7 ForkJoinPool.commonPool-worker-9-v-100
8 ForkJoinPool.commonPool-worker-9-ex-null
9 //等待1s      

2.2.3 异常处理

当CompletableFuture的计算抛出异常的时候,可以执行特定的方法:

1 public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)      

exceptionally会在抛出异常的时候执行:

1 public static void main(String[] args) {
 2     CompletableFuture<Integer> future = new CompletableFuture<Integer>();
 3     new Thread(() -> {
 4         future.completeExceptionally(new NullPointerException());
 5     }).start();
 6     future.exceptionally(e -> {
 7         e.printStackTrace();
 8         return 100;
 9     });
10 }      
1 java.lang.NullPointerException      

2.2.4 线程依赖

当一个线程依赖另一个线程时,可以使用方法来把这两个线程串行化:

1 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
2 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
3 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)      

串行化执行:

1 public static void main(String[] args) {
 2     CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
 3         System.out.println(Thread.currentThread().getName() + "-supplyAsync");
 4         return "test";
 5     }).thenApply(t->{
 6         System.out.println(Thread.currentThread().getName() + "-t-" + t);
 7         return t + "-apply";
 8     });
 9     try {
10         System.out.println(Thread.currentThread().getName() + "-" + future.get());
11     } catch (Exception e) {
12         e.printStackTrace();
13     }
14 }      
1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 ForkJoinPool.commonPool-worker-9-t-test
3 main-test-apply      

2.2.5 消费处理

接收任务的处理结果,并消费处理,无返回结果:

1 public CompletionStage<Void> thenAccept(Consumer<? super T> action)
2 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action)
3 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)      

消费处理结果:

1 public static void main(String[] args) {
 2     CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
 3         System.out.println(Thread.currentThread().getName() + "-supplyAsync");
 4         return "test";
 5     }).thenAccept(t->{
 6         System.out.println(Thread.currentThread().getName() + "-t-" + t);
 7     });
 8     try {
 9         System.out.println(Thread.currentThread().getName() + "-" + future.get());
10     } catch (Exception e) {
11         e.printStackTrace();
12     }
13 }      
1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 main-t-test
3 main-null      

2.2.6 取消返回

不接收任务的处理结果,取消返回:

1 public CompletionStage<Void> thenRun(Runnable action)
2 public CompletionStage<Void> thenRunAsync(Runnable action)
3 public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor)      
1 public static void main(String[] args) {
 2     CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
 3         System.out.println(Thread.currentThread().getName() + "-supplyAsync");
 4         return "test";
 5     }).thenRun(()->{
 6         System.out.println(Thread.currentThread().getName() + "-t-null");
 7     });
 8     try {
 9         System.out.println(Thread.currentThread().getName() + "-" + future.get());
10     } catch (Exception e) {
11         e.printStackTrace();
12     }
13 }      
1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 main-t-null
3 main-null