文章目录
- 什么是Callable和Future?
-
- Callable
- Future
-
- 5个方法详解
- 什么是FutureTask?
- 使用
-
- 使用`Callable+Future`获取执行结果
- 使用`Callable+FutureTask`获取执行结果
- FutureTask源码分析
-
- get方法执行流程
- run()的执行流程
什么是Callable和Future?
以前的创建线程的两种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。
这2种方式都有一个缺陷就是:在执行完任务之后
无法获取执行结果
。
如果需要获取执行结果,就必须通过
共享变量
或者使用
线程通信
的方式来达到效果,这样使用起来就比较麻烦。
而自从Java 1.5开始就提供了Callable和Future,通过它们可以在任务执行完毕之后
得到任务执行结果
。
总结:
Callable是类似Runnable的创建任务的接口、
Future就是对发来的
Runnable或者Callable任务
的
执行结果
进行
取消、查询是否完成、获取结果
等等操作。
二者结合可以获取执行结果,弥补Thread和Runnable的缺陷、
Callable
Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个call()方法
public interface Callable<V> {
V call() throws Exception;
}
可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。
那么怎么使用Callable呢?一般情况下是配合
ExecutorService
来使用的,在
ExecutorService
接口中声明了若干个
submit
方法的重载版本
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
第一个submit方法里面的参数类型就是
Callable
。
Future
5个方法详解
Future类位于java.util.concurrent包下,它是一个接口:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get()
V get(long timeout, TimeUnit unit)
}
在Future接口中声明了5个方法,下面依次解释每个方法的作用:
-
get()方法
用来
,这个方法会产生阻塞,会一直等到任务执行完毕才返回;获取执行结果
-
get(long timeout, TimeUnit unit)
用来
,如果在指定时间内,还没获取到结果,就直接返回null。获取执行结果
-
cancel方法
用来
取消任务
,如果取消任务成功则返回true,如果取消任务失败则返回false。
参数
mayInterruptIfRunning
表示是否允许取消正在执行却没有执行完毕的任务,
如果设置true,则表示可以取消正在执行过程中的任务。
如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;
如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;
如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
-
isCancelled方法
表示任务
,如果在任务正常完成前被取消成功,则返回 true。是否被取消成功
-
isDone方法
表示任务是否已经完成,若任务完成,则返回true;
也就是说Future提供了三种功能
- 判断任务是否完成;
- 中断任务;
- 获取任务执行结果。
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。
什么是FutureTask?
FutureTask是Future接口的唯一实现类。
继承关系
RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。
所以它既可以作为
Runnable
被线程执行,又可以作为Future得到Callable的返回值。
FutureTask的构造器:
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
FutureTask执行期的3种状态
FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。根据
FutureTask.run()
方法被执行的时机,FutureTask可以处于下面3种状态:
- 未启动。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
- 已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
- 已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。
使用
使用 Callable+Future
获取执行结果
Callable+Future
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();
代码中创建了一个线程池,调用线程池的submit方法执行任务,submit参数为
Callable
接口:表示需要执行的任务有返回值,submit方法返回一个Future对象(就是FutureTask对象),Future相当于一个凭证,可以在任意时间拿着这个凭证去获取对应任务的执行结果(调用其get方法),代码中调用了result.get()方法之后,此方法会阻塞当前线程直到任务执行结束。
使用 Callable+FutureTask
获取执行结果
Callable+FutureTask
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<>(task);
ExecutorService executorService = Executors.newCachedThreadPool();
//法一,给到Thread
Thread thread = new Thread(futureTask);
//法二,给到线程池executorService
executorService.submit(futureTask);
FutureTask源码分析
FutureTask的实现基于
AbstractQueuedSynchronizer(AQS)
每一个基于AQS实现的同步器都会包含两种类型的操作:
1、至少一个acquire操作。这个操作阻塞调用线程,直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get(),Condition条件变量为awite()。
2、至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法,Condition条件变量为signal()。
基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync(jdk8之前),对FutureTask所有公有方法的调用都会委托给这个内部子类。
AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态。
如图所示,Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,FutureTask所有的的公有方法都直接委托给了内部私有的Sync。
get方法执行流程
FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法的执行过程如下。
1)调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null。
2)如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。
3)当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程(这里会产生级联唤醒的效果,后面会介绍)。
4)最后返回计算的结果或抛出异常。
run()的执行流程
1) 执行在构造函数中指定的任务(Callable.call())。
2) 以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。
3)AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然会返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程。
4)调用FutureTask.done()。结束
总结流程:
当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态ran或已取消状态cancelled ,当前执行线程将到AQS的线程等待队列中等待,当某个线程执行FutureTask.run()方法或FutureTask.cancel(…)方法时,会唤醒线程等待队列的第一个线程.
如上图,当线程E执行run()方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从队列中删除,然后唤醒它的后继线程B,最后线程A从get()方法返回。线程B、C和D重复A线程的处理流程。最终,在队列中等待的所有线程都被级联唤醒并从get()方法返回。