天天看点

Java 是如何实现 Future 模式的?万字详解!

Java 是如何实现 Future 模式的?万字详解!
Java 是如何实现 Future 模式的?万字详解!

《java工程师面试突击(第3季)》重磅升级,由原来的70讲增至150讲,内容扩充一倍多,

https://github.com/yuanmabiji/jdk1.8-sourcecode-blogs

1 future是什么?

先举个例子,我们平时网购买东西,下单后会生成一个订单号,然后商家会根据这个订单号发货,发货后又有一个快递单号,然后快递公司就会根据这个快递单号将网购东西快递给我们。在这一过程中,这一系列的单号都是我们收货的重要凭证。因此,jdk的future就类似于我们网购买东西的单号,当我们执行某一耗时的任务时,我们可以另起一个线程异步去执行这个耗时的任务,同时我们可以干点其他事情。当事情干完后我们再根据future这个"单号"去提取耗时任务的执行结果即可。因此future也是多线程中的一种应用模式。

扩展: 说起多线程,那么future又与thread有什么区别呢?最重要的区别就是thread是没有返回结果的,而future模式是有返回结果的。

2 如何使用future

前面搞明白了什么是future,下面我们再来举个简单的例子看看如何使用future。假如现在我们要打火锅,首先我们要准备两样东西:把水烧开和准备食材。因为烧开水是一个比较漫长的过程(相当于耗时的业务逻辑),因此我们可以一边烧开水(相当于另起一个线程),一边准备火锅食材(主线程),等两者都准备好了我们就可以开始打火锅了。

执行结果如下截图,符合我们的预期:

Java 是如何实现 Future 模式的?万字详解!

从以上代码中可以看到,我们使用future主要有以下步骤:

新建一个callable匿名函数实现类对象,我们的业务逻辑在callable的call方法中实现,其中callable的泛型是返回结果类型;

然后把callable匿名函数对象作为futuretask的构造参数传入,构建一个futuretask对象;

然后再把futuretask对象作为thread构造参数传入并开启这个线程执行去执行业务逻辑;

最后我们调用futuretask对象的get方法得到业务逻辑执行结果。

可以看到跟future使用有关的jdk类主要有futuretask和callable两个,下面主要对futuretask进行源码分析。

扩展:还有一种使用future的方式是将callable实现类提交给线程池执行的方式,这里不再介绍,自行百度即可。

3 futuretask类结构分析

我们先来看下futuretask的类结构:

Java 是如何实现 Future 模式的?万字详解!

可以看到futuretask实现了runnablefuture接口,而runnablefuture接口又继承了future和runnable接口。因为futuretask间接实现了runnable接口,因此可以作为任务被线程thread执行;此外,最重要的一点就是futuretask还间接实现了future接口,因此还可以获得任务执行的结果。下面我们就来简单看看这几个接口的相关api。

runnable没啥好说的,相信大家都已经很熟悉了。

future接口象征着异步执行任务的结果即执行一个耗时任务完全可以另起一个线程执行,然后此时我们可以去做其他事情,做完其他事情我们再调用future.get()方法获取结果即可,此时若异步任务还没结束,此时会一直阻塞等待,直到异步任务执行完获取到结果。

runnablefuture是future和runnable接口的组合,即这个接口表示又可以被线程异步执行,因为实现了runnable接口,又可以获得线程异步任务的执行结果,因为实现了future接口。因此解决了runnable异步任务没有返回结果的缺陷。接下来我们来看下futuretask,futuretask实现了runnablefuture接口,因此是future和runnable接口的具体实现类,是一个可被取消的异步线程任务,提供了future的基本实现,即异步任务执行后我们能够获取到异步任务的执行结果,是我们接下来分析的重中之重。futuretask可以包装一个callable和runnable对象,此外,futuretask除了可以被线程执行外,还可以被提交给线程池执行。我们先看下futuretask类的api,其中重点方法已经红框框出。

Java 是如何实现 Future 模式的?万字详解!

上图中futuretask的run方法是被线程异步执行的方法,get方法即是取得异步任务执行结果的方法,还有cancel方法是取消任务执行的方法。接下来我们主要对这三个方法进行重点分析。

思考:

futuretask覆写的run方法的返回类型依然是void,表示没有返回值,那么futuretask的get方法又是如何获得返回值的呢?

futuretask的cancel方法能真正取消线程异步任务的执行么?什么情况下能取消?

因为futuretask异步任务执行结果还跟callable接口有关,因此我们再来看下callable接口:

我们都知道,callable<v>接口和runnable接口都可以被提交给线程池执行,唯一不同的就是callable<v>接口是有返回结果的,其中的泛型v就是返回结果,而runnable接口是没有返回结果的。

思考:一般情况下,runnable接口实现类才能被提交给线程池执行,为何callable接口实现类也可以被提交给线程池执行?想想线程池的submit方法内部有对callable做适配么?

4 futuretask源码分析

我们首先来看下futuretask的成员变量有哪些,理解这些成员变量对后面的源码分析非常重要。

这里我们要重点关注下futuretask的callable成员变量,因为futuretask的异步任务最终是委托给callable去实现的。

futuretask的成员变量runner,waiters和state都被volatile修饰,我们可以思考下为什么这三个成员变量需要被volatile修饰,而其他成员变量又不用呢?volatile关键字的作用又是什么呢?

既然已经定义了成员变量runner,waiters和state了,此时又定义了stateoffset,runneroffset和waitersoffset变量分别对应runner,waiters和state的偏移地址,为何要多此一举呢?

我们再来看看stateoffset,runneroffset和waitersoffset变量这三个变量的初始化过程:

前面讲了futuretask的成员变量,有一个表示状态的成员变量state我们要重点关注下,state变量表示任务执行的状态。

可以看到任务状态变量state有以上7种状态,0-6分别对应着每一种状态。任务状态一开始是new,然后由futuretask的三个方法set,setexception和cancel来设置状态的变化,其中状态变化有以下四种情况:

new -> completing -> normal:这个状态变化表示异步任务的正常结束,其中completing是一个瞬间临时的过渡状态,由set方法设置状态的变化;

new -> completing -> exceptional:这个状态变化表示异步任务执行过程中抛出异常,由setexception方法设置状态的变化;

new -> cancelled:这个状态变化表示被取消,即调用了cancel(false),由cancel方法来设置状态变化;

new -> interrupting -> interrupted:这个状态变化表示被中断,即调用了cancel(true),由cancel方法来设置状态变化。

futuretask有两个构造函数,我们分别来看看:

可以看到,这个构造函数在我们前面举的“打火锅”的例子代码中有用到,就是callable成员变量赋值,在异步执行任务时再调用callable.call方法执行异步任务逻辑。此外,此时给任务状态state赋值为new,表示任务新建状态。我们再来看下futuretask的另外一个构造函数:

这个构造函数在执行executors.callable(runnable, result)时是通过适配器runnableadapter来将runnable对象runnable转换成callable对象,然后再分别给callable和state变量赋值。注意,这里我们需要记住的是futuretask新建时,此时的任务状态state是new就好了。

前面我们有讲到futuretask间接实现了runnable接口,覆写了runnable接口的run方法,因此该覆写的run方法是提交给线程来执行的,同时,该run方法正是执行异步任务逻辑的方法,那么,执行完run方法又是如何保存异步任务执行的结果的呢?我们现在着重来分析下run方法:

可以看到执行异步任务的run方法主要分为以下四步来执行:

判断线程是否满足执行异步任务的条件:为了防止多线程并发执行异步任务,这里需要判断线程满不满足执行异步任务的条件;

若满足条件,执行异步任务:因为异步任务逻辑封装在callable.call方法中,此时直接调用callable.call方法执行异步任务,然后返回执行结果;

根据异步任务的执行情况做不同的处理:1) 若异步任务执行正常结束,此时调用set(result);来设置任务执行结果;2)若异步任务执行抛出异常,此时调用setexception(ex);来设置异常,详细分析请见4.4.1小节;

异步任务执行完后的善后处理工作:不管异步任务执行成功还是失败,若其他线程有调用futuretask.cancel(true),此时需要调用handlepossiblecancellationinterrupt方法处理中断,详细分析请见4.4.2小节。

这里值得注意的是判断线程满不满足执行异步任务条件时,runner是否为null是调用unsafe的cas方法compareandswapobject来判断和设置的,同时compareandswapobject是通过成员变量runner的偏移地址runneroffset来给runner赋值的,此外,成员变量runner被修饰为volatile是在多线程的情况下, 一个线程的volatile修饰变量的设值能够立即刷进主存,因此值便可被其他线程可见。

下面我们来看下当异步任务执行正常结束时,此时会调用set(result);方法:

可以看到当异步任务正常执行结束后,且异步任务没有被cancel的情况下,此时会做以下事情:将任务执行结果保存到futuretask的成员变量outcome中的,赋值结束后会调用finishcompletion方法来唤醒阻塞的线程(哪里来的阻塞线程?后面会分析),值得注意的是这里对应的任务状态变化是new -> completing -> normal。我们继续来看下当异步任务执行过程中抛出异常,此时会调用setexception(ex);方法。

可以看到setexception(throwable t)的代码逻辑跟前面的set(v v)几乎一样,不同的是任务执行过程中抛出异常,此时是将异常保存到futuretask的成员变量outcome中,还有,值得注意的是这里对应的任务状态变化是new -> completing -> exceptional。因为异步任务不管正常还是异常结束,此时都会调用futuretask的finishcompletion方法来唤醒唤醒阻塞的线程,这里阻塞的线程是指我们调用future.get方法时若异步任务还未执行完,此时该线程会阻塞。

finishcompletion方法的作用就是不管异步任务正常还是异常结束,此时都要唤醒且移除线程等待链表的等待线程节点,这个链表实现的是一个是treiber stack,因此唤醒(移除)的顺序是"后进先出"即后面先来的线程先被先唤醒(移除),关于这个线程等待链表是如何成链的,后面再继续分析。

在4.4小节分析的run方法里的最后有一个finally块,此时若任务状态state >= interrupting,此时说明有其他线程执行了cancel(true)方法,此时需要让出cpu执行的时间片段给其他线程执行,我们来看下具体的源码:

思考:为啥任务状态是interrupting时,此时就要让出cpu执行的时间片段呢?还有为什么要在义务任务执行后才调用handlepossiblecancellationinterrupt方法呢?

可以看到,如果任务状态state<=completing,说明异步任务正在执行过程中,此时会调用awaitdone方法阻塞等待;当任务执行完后,此时再调用report方法来报告任务结果,此时有三种情况:1)任务正常执行;2)任务执行异常;3)任务被取消。

futuretask.awaitdone方法会阻塞获取异步任务执行结果的当前线程,直到异步任务执行完成。

futuretask.awaitdone方法主要做的事情总结如下:

首先awaitdone方法里面是一个死循环;

若获取结果的当前线程被其他线程中断,此时移除该线程waitnode链表节点,并抛出interruptedexception;

如果任务状态state>completing,此时返回任务执行结果;

若任务状态为completing,此时获取任务结果的线程需让出cpu执行时间片段;

若q == null,说明当前线程还未设置到waitnode节点,此时新建waitnode节点并设置其thread属性为当前线程;

若queued==false,说明当前线程waitnode节点还未加入线程等待链表,此时加入该链表的头部;

当timed设置为true时,此时该方法具有超时功能,关于超时的逻辑这里不详细分析;

当前面6个条件都不满足时,此时阻塞当前线程。

我们分析到这里,可以直到执行异步任务只能有一个线程来执行,而获取异步任务结果可以多线程来获取,当异步任务还未执行完时,此时获取异步任务结果的线程会加入线程等待链表中,然后调用调用locksupport.park(this);方法阻塞当前线程。直到异步任务执行完成,此时会调用finishcompletion方法来唤醒并移除线程等待链表的每个waitnode节点,这里这里唤醒(移除)waitnode节点的线程是从链表头部开始的,前面我们也已经分析过。还有一个特别需要注意的就是awaitdone方法里面是一个死循环,当一个获取异步任务的线程进来后可能会多次进入多个条件分支执行不同的业务逻辑,也可能只进入一个条件分支。下面分别举两种可能的情况进行说明:情况1:当获取异步任务结果的线程进来时,此时异步任务还未执行完即state=new且没有超时设置时:

第一次循环:此时q = null,此时进入上面代码标号【1】的判断分支,即为当前线程新建一个waitnode节点;

第二次循环:此时queued = false,此时进入上面代码标号【2】的判断分支,即将之前新建的waitnode节点加入线程等待链表中;

第三次循环:此时进入上面代码标号【3】的判断分支,即阻塞当前线程;

第四次循环:加入此时异步任务已经执行完,此时进入上面代码标号【5】的判断分支,即返回异步任务执行结果。

情况2:当获取异步任务结果的线程进来时,此时异步任务已经执行完即state>completing且没有超时设置时,此时直接进入上面代码标号【5】的判断分支,即直接返回异步任务执行结果即可,也不用加入线程等待链表了。

在get方法中,当异步任务执行结束后即不管异步任务正常还是异常结束,亦或是被cancel,此时获取异步任务结果的线程都会被唤醒,因此会继续执行futuretask.report方法报告异步任务的执行情况,此时可能会返回结果,也可能会抛出异常。

我们最后再来看下futuretask.cancel方法,我们一看到futuretask.cancel方法,肯定一开始就天真的认为这是一个可以取消异步任务执行的方法,如果我们这样认为的话,只能说我们猜对了一半。

以上代码中,当异步任务状态state != new时,说明异步任务已经正常执行完或已经异常结束亦或已经被cancel,此时直接返回false;当异步任务状态state = new时,此时又根据mayinterruptifrunning参数是否为true分为以下两种情况:

当mayinterruptifrunning = false时,此时任务状态state直接被赋值为cancelled,此时不会对执行异步任务的线程发出中断信号,值得注意的是这里对应的任务状态变化是new -> cancelled。

当mayinterruptifrunning = true时,此时会对执行异步任务的线程发出中断信号,值得注意的是这里对应的任务状态变化是new -> interrupting -> interrupted。

最后不管mayinterruptifrunning为true还是false,此时都要调用finishcompletion方法唤醒阻塞的获取异步任务结果的线程并移除线程等待链表节点。从futuretask.cancel源码中我们可以得出答案,该方法并不能真正中断正在执行异步任务的线程,只能对执行异步任务的线程发出中断信号。如果执行异步任务的线程处于sleep、wait或join的状态中,此时会抛出interruptedexception异常,该线程可以被中断;此外,如果异步任务需要在while循环执行的话,此时可以结合以下代码来结束异步任务线程,即执行异步任务的线程被中断时,此时thread.currentthread().isinterrupted()返回true,不满足while循环条件因此退出循环,结束异步任务执行线程,如下代码:

注意:调用了futuretask.cancel方法,只要返回结果是true,假如异步任务线程虽然不能被中断,即使异步任务线程正常执行完毕,返回了执行结果,此时调用futuretask.get方法也不能够获取异步任务执行结果,此时会抛出cancellationexception异常。请问知道这是为什么吗?因为调用了futuretask.cancel方法,只要返回结果是true,此时的任务状态为cancelled或interrupted,同时必然会执行finishcompletion方法,而finishcompletion方法会唤醒获取异步任务结果的线程等待列表的线程,而获取异步任务结果的线程唤醒后发现状态s >= cancelled,此时就会抛出cancellationexception异常了。

5 总结

好了,本篇文章对futuretask的源码分析就到此结束了,下面我们再总结下futuretask的实现逻辑:

我们实现callable接口,在覆写的call方法中定义需要执行的业务逻辑;

然后把我们实现的callable接口实现对象传给futuretask,然后futuretask作为异步任务提交给线程执行;

最重要的是futuretask内部维护了一个状态state,任何操作(异步任务正常结束与否还是被取消)都是围绕着这个状态进行,并随时更新state任务的状态;

只能有一个线程执行异步任务,当异步任务执行结束后,此时可能正常结束,异常结束或被取消。

可以多个线程并发获取异步任务执行结果,当异步任务还未执行完,此时获取异步任务的线程将加入线程等待列表进行等待;

当异步任务线程执行结束后,此时会唤醒获取异步任务执行结果的线程,注意唤醒顺序是"后进先出"即后面加入的阻塞线程先被唤醒。

当我们调用futuretask.cancel方法时并不能真正停止执行异步任务的线程,只是发出中断线程的信号。但是只要cancel方法返回true,此时即使异步任务能正常执行完,此时我们调用get方法获取结果时依然会抛出cancellationexception异常。

扩展:前面我们提到了futuretask的runner,waiters和state都是用volatile关键字修饰,说明这三个变量都是多线程共享的对象(成员变量),会被多线程操作,此时用volatile关键字修饰是为了一个线程操作volatile属性变量值后,能够及时对其他线程可见。此时多线程操作成员变量仅仅用了volatile关键字仍然会有线程安全问题的,而此时doug lea老爷子没有引入任何线程锁,而是采用了unsafe的cas方法来代替锁操作,确保线程安全性。

6 分析futuretask源码,我们能学到什么?

我们分析源码的目的是什么?除了弄懂futuretask的内部实现原理外,我们还要借鉴大佬写写框架源码的各种技巧,只有这样,我们才能成长。分析了futuretask源码,我们可以从中学到:

利用locksupport来实现线程的阻塞\唤醒机制;

利用volatile和unsafe的cas方法来实现线程共享变量的无锁化操作;

若要编写超时异常的逻辑可以参考futuretask的get(long timeout, timeunit unit)的实现逻辑;

多线程获取某一成员变量结果时若需要等待时的线程等待链表的逻辑实现;

某一异步任务在某一时刻只能由单一线程执行的逻辑实现;

futuretask中的任务状态state的变化处理的逻辑实现。 以上列举的几点都是我们可以学以致用的地方。

【源码笔记】github地址:

https://github.com/yuanmabiji/java-sourcecode-blogs

《java工程师面试突击第三季》加餐部分大纲

Java 是如何实现 Future 模式的?万字详解!
Java 是如何实现 Future 模式的?万字详解!
Java 是如何实现 Future 模式的?万字详解!