天天看点

线程池-Future和Callable什么是Callable和Future?什么是FutureTask?使用FutureTask源码分析

文章目录

  • 什么是Callable和Future?
    • Callable
    • Future
      • 5个方法详解
  • 什么是FutureTask?
  • 使用
    • 使用`Callable+Future`获取执行结果
    • 使用`Callable+FutureTask`获取执行结果
  • FutureTask源码分析
    • get方法执行流程
    • run()的执行流程

什么是Callable和Future?

以前的创建线程的两种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。

这2种方式都有一个缺陷就是:在执行完任务之后

无法获取执行结果

如果需要获取执行结果,就必须通过

共享变量

或者使用

线程通信

的方式来达到效果,这样使用起来就比较麻烦。

而自从Java 1.5开始就提供了Callable和Future,通过它们可以在任务执行完毕之后

得到任务执行结果

总结:

Callable是类似Runnable的创建任务的接口、

Future就是对发来的

Runnable或者Callable任务

执行结果

进行

取消、查询是否完成、获取结果

等等操作。

二者结合可以获取执行结果,弥补Thread和Runnable的缺陷、

Callable

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个call()方法

public interface Callable<V> {
    V call() throws Exception;
}
           

可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

那么怎么使用Callable呢?一般情况下是配合

ExecutorService

来使用的,在

ExecutorService

接口中声明了若干个

submit

方法的重载版本

<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
           

第一个submit方法里面的参数类型就是

Callable

Future

5个方法详解

Future类位于java.util.concurrent包下,它是一个接口:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() 
    V get(long timeout, TimeUnit unit)      
}
           

在Future接口中声明了5个方法,下面依次解释每个方法的作用:

  • get()方法

    用来

    获取执行结果

    ,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • get(long timeout, TimeUnit unit)

    用来

    获取执行结果

    ,如果在指定时间内,还没获取到结果,就直接返回null。
  • cancel方法

    用来

    取消任务

    ,如果取消任务成功则返回true,如果取消任务失败则返回false。

    参数

    mayInterruptIfRunning

    表示是否允许取消正在执行却没有执行完毕的任务,

    如果设置true,则表示可以取消正在执行过程中的任务。

    如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;

    如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;

    如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

  • isCancelled方法

    表示任务

    是否被取消成功

    ,如果在任务正常完成前被取消成功,则返回 true。
  • isDone方法

    表示任务是否已经完成,若任务完成,则返回true;

也就是说Future提供了三种功能

  • 判断任务是否完成;
  • 中断任务;
  • 获取任务执行结果。

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

什么是FutureTask?

FutureTask是Future接口的唯一实现类。

继承关系

线程池-Future和Callable什么是Callable和Future?什么是FutureTask?使用FutureTask源码分析

RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。

所以它既可以作为

Runnable

被线程执行,又可以作为Future得到Callable的返回值。

FutureTask的构造器:

public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
           

FutureTask执行期的3种状态

FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。根据

FutureTask.run()

方法被执行的时机,FutureTask可以处于下面3种状态:

  • 未启动。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
  • 已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
  • 已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。

使用

使用

Callable+Future

获取执行结果

ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();      
           

代码中创建了一个线程池,调用线程池的submit方法执行任务,submit参数为

Callable

接口:表示需要执行的任务有返回值,submit方法返回一个Future对象(就是FutureTask对象),Future相当于一个凭证,可以在任意时间拿着这个凭证去获取对应任务的执行结果(调用其get方法),代码中调用了result.get()方法之后,此方法会阻塞当前线程直到任务执行结束。

使用

Callable+FutureTask

获取执行结果

Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<>(task);
ExecutorService executorService = Executors.newCachedThreadPool();
//法一,给到Thread 
Thread thread = new Thread(futureTask);
//法二,给到线程池executorService
executorService.submit(futureTask);
           

FutureTask源码分析

FutureTask的实现基于

AbstractQueuedSynchronizer(AQS)

每一个基于AQS实现的同步器都会包含两种类型的操作:

1、至少一个acquire操作。这个操作阻塞调用线程,直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get(),Condition条件变量为awite()。

2、至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法,Condition条件变量为signal()。

基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync(jdk8之前),对FutureTask所有公有方法的调用都会委托给这个内部子类。

AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态。

线程池-Future和Callable什么是Callable和Future?什么是FutureTask?使用FutureTask源码分析

如图所示,Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,FutureTask所有的的公有方法都直接委托给了内部私有的Sync。

get方法执行流程

FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法的执行过程如下。

1)调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null。

2)如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。

3)当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程(这里会产生级联唤醒的效果,后面会介绍)。

4)最后返回计算的结果或抛出异常。

run()的执行流程

1) 执行在构造函数中指定的任务(Callable.call())。

2) 以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。

3)AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然会返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程。

4)调用FutureTask.done()。结束

总结流程:

当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态ran或已取消状态cancelled ,当前执行线程将到AQS的线程等待队列中等待,当某个线程执行FutureTask.run()方法或FutureTask.cancel(…)方法时,会唤醒线程等待队列的第一个线程.

线程池-Future和Callable什么是Callable和Future?什么是FutureTask?使用FutureTask源码分析

如上图,当线程E执行run()方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从队列中删除,然后唤醒它的后继线程B,最后线程A从get()方法返回。线程B、C和D重复A线程的处理流程。最终,在队列中等待的所有线程都被级联唤醒并从get()方法返回。

继续阅读