天天看点

Java7中的ForkJoin并发框架初探(中)

根据前文描述的doug lea的理论基础,在jdk1.7中已经给出了fork join的实现。在java se 7的api中,多了forkjointask、forkjoinpool、forkjoinworkerthread、recursiveaction、recursivetask这样5个类。本文就对jdk1.7中增加这5个工具类实现做简要分析。

0. jdk中forkjoin实现概述

在javase7的api和jdk1.7中,分别集成了支持forkjoin的五个类:

forkjoinpool 实现forkjoin的线程池

forkjoinworkerthread  实现forkjoin的线程

forkjointask<v> 一个描述forkjoin的抽象类

recursiveaction 无返回结果的forkjointask实现

recursivetask<v> 有返回结果的forkjointask实现

forkjoinpool维护了多个线程构成的数组,维护了任务提交队列,给出了多个线程之间工作窃取的实现。给出了任务类型适配,和提交任务逻辑的实现。需要和线程紧密配合。

而forkjoinworkerthread则继承了java.lang.thread类,维护了线程自己的队列,同一个任务fork()操作原则上会添加到同一个线程队列中。而这个线程类需要和forkjoinpool紧密合作,有指向对应forkjoinpool对象的引用。

forkjointask则实现了future接口,除了对接口的实现外,主要是fork()和join()操作。注意,貌似fork()只有forkjoinworkerthread 中才能执行。

两个子类recursiveaction和recursivetask则实现比较简单,区别就在于返回值的处理不同。

1. forkjoinpool

forkjoinpool是实现了 fork join 的线程池。看jdk源码我们知道forkjoinpool是extends abstractexecutorservice的,也就是说间接地实现了executor和executorservice接口。实际上也就意味着forkjoinpool是继threadpoolexecutor后的又一个executor(service)的具体实现。

1.1. 构建初始化

我们先看forkjoinpool的构造方法,一共有3个重载的实现。有一个单参数的默认实现,通常我们使用这个就足够了,这最终会以默认的参数调用3参数的构造方法。我们再来看3个参数的构造方法实现。其中:

int parallelism 第一个参数是并行度,这个参数简介影响着(会额外做一些运算)这个forkjoinpool的forkjoinworkerthread 线程数。默认情况下,这个参数是任务运行环境的处理器个数,比如系统提供的处理器数目为4,初始化线程池会开启16个线程。

forkjoinworkerthreadfactory factory 这个是forkjoinpool构建新线程forkjoinworkerthread 对象的工厂,类似于threadpoolexecutor中用到的threadfactory。

thread.uncaughtexceptionhandler handler 这个前面并发的文章页提到过,是线程异常处理器,这里不多说了。

1.2. 任务提交

前面已经提到,forkjoinpool也是executor(service)的实现,那么execute()和submit()这样向threadpoolexecutor提交任务的方法对于forkjoinpool来说也是一样有效的。

需要说明的是,除了增加支持forkjointask对象参数的重载实现外,还在runnable和callable参数的方法中对原始的runnable和callable对象做了到forkjointask的适配,使用的分别是forkjointask的静态内部类adaptedrunnable和adaptedcallable的对象。而这两个类型参数对应的方法最终都会调用forkjointask参数的方法:

1

2

3

4

5

6

<code>public</code> <code>&lt;t&gt; forkjointask&lt;t&gt; submit(forkjointask&lt;t&gt; task) {</code>

<code>    </code><code>if</code> <code>(task == </code><code>null</code><code>)</code>

<code>        </code><code>throw</code> <code>new</code> <code>nullpointerexception();</code>

<code>    </code><code>forkorsubmit(task);</code>

<code>    </code><code>return</code> <code>task;</code>

<code>}</code>

我们接下来再看下任务提交中被调用到的forkorsubmit()方法:

<code></code>

7

8

9

10

11

<code>private</code> <code>&lt;t&gt; </code><code>void</code> <code>forkorsubmit(forkjointask&lt;t&gt; task) {</code>

<code>    </code><code>forkjoinworkerthread w;</code>

<code>    </code><code>thread t = thread.currentthread();</code>

<code>    </code><code>if</code> <code>(shutdown)</code>

<code>        </code><code>throw</code> <code>new</code> <code>rejectedexecutionexception();</code>

<code>    </code><code>if</code> <code>((t </code><code>instanceof</code> <code>forkjoinworkerthread) &amp;&amp;</code>

<code>        </code><code>(w = (forkjoinworkerthread)t).pool == </code><code>this</code><code>)</code>

<code>        </code><code>w.pushtask(task);</code>

<code>    </code><code>else</code>

<code>        </code><code>addsubmission(task);</code>

逻辑很容易理解,先判断forkjoinpool的状态,若已停止,则抛异常返回。之后如果当前线程是forkjoinworkerthread类型的,则将任务追加到forkjoinworkerthread对象中维护的队列上,否则将新的任务放入forkjoinpool的提交队列中,并通知线程工作。

1.3. 线程的启动和工作

前面已经强调过,forkjoinpool和forkjoinworkerthread是紧密相关,耦合在一起的。thread的start()会调用run(),而forkjoinworkerthread类重写了run()方法,会调用对应的线程池forkjoinpool对象的work()方法。

我们来看一下work()方法的实现。

<code>final</code> <code>void</code> <code>work(forkjoinworkerthread w) {</code>

<code>    </code><code>boolean</code> <code>swept = </code><code>false</code><code>;                </code><code>// true on empty scans</code>

<code>    </code><code>long</code> <code>c;</code>

<code>    </code><code>while</code> <code>(!w.terminate &amp;&amp; (</code><code>int</code><code>)(c = ctl) &gt;= </code><code>0</code><code>) {</code>

<code>        </code><code>int</code> <code>a;                            </code><code>// active count</code>

<code>        </code><code>if</code> <code>(!swept &amp;&amp; (a = (</code><code>int</code><code>)(c &gt;&gt; ac_shift)) &lt;= </code><code>0</code><code>)</code>

<code>            </code><code>swept = scan(w, a);</code>

<code>        </code><code>else</code> <code>if</code> <code>(tryawaitwork(w, c))</code>

<code>            </code><code>swept = </code><code>false</code><code>;</code>

<code>    </code><code>}</code>

里面主要是一个while循环体,只要当前的线程和线程池不是处于终止状态,则这个循环一直执行。执行的内容则是这样的,如果能够根据scan()方法得到任务,并执行,否则进入阻塞状态。

我们来看一下scan()方法的实现。

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

<code>private</code> <code>boolean</code> <code>scan(forkjoinworkerthread w, </code><code>int</code> <code>a) {</code>

<code>    </code><code>int</code> <code>g = scanguard; </code><code>// mask 0 avoids useless scans if only one active</code>

<code>    </code><code>int</code> <code>m = (parallelism == </code><code>1</code> <code>- a &amp;&amp; blockedcount == </code><code>0</code><code>) ? </code><code>0</code> <code>: g &amp; smask;</code>

<code>    </code><code>forkjoinworkerthread[] ws = workers;</code>

<code>    </code><code>if</code> <code>(ws == </code><code>null</code> <code>|| ws.length &lt;= m)         </code><code>// staleness check</code>

<code>        </code><code>return</code> <code>false</code><code>;</code>

<code>    </code><code>for</code> <code>(</code><code>int</code> <code>r = w.seed, k = r, j = -(m + m); j &lt;= m + m; ++j) {</code>

<code>        </code><code>forkjointask&lt;?&gt; t; forkjointask&lt;?&gt;[] q; </code><code>int</code> <code>b, i;</code>

<code>        </code><code>forkjoinworkerthread v = ws[k &amp; m];</code>

<code>        </code><code>if</code> <code>(v != </code><code>null</code> <code>&amp;&amp; (b = v.queuebase) != v.queuetop &amp;&amp;</code>

<code>            </code><code>(q = v.queue) != </code><code>null</code> <code>&amp;&amp; (i = (q.length - </code><code>1</code><code>) &amp; b) &gt;= </code><code>0</code><code>) {</code>

<code>            </code><code>long</code> <code>u = (i &lt;&lt; ashift) + abase;</code>

<code>            </code><code>if</code> <code>((t = q[i]) != </code><code>null</code> <code>&amp;&amp; v.queuebase == b &amp;&amp;</code>

<code>                </code><code>unsafe.compareandswapobject(q, u, t, </code><code>null</code><code>)) {</code>

<code>                </code><code>int</code> <code>d = (v.queuebase = b + </code><code>1</code><code>) - v.queuetop;</code>

<code>                </code><code>v.stealhint = w.poolindex;</code>

<code>                </code><code>if</code> <code>(d != </code><code>0</code><code>)</code>

<code>                    </code><code>signalwork();             </code><code>// propagate if nonempty</code>

<code>                </code><code>w.exectask(t);</code>

<code>            </code><code>}</code>

<code>            </code><code>r ^= r &lt;&lt; </code><code>13</code><code>; r ^= r &gt;&gt;&gt; </code><code>17</code><code>; w.seed = r ^ (r &lt;&lt; </code><code>5</code><code>);</code>

<code>            </code><code>return</code> <code>false</code><code>;                     </code><code>// store next seed</code>

<code>        </code><code>}</code>

<code>        </code><code>else</code> <code>if</code> <code>(j &lt; </code><code>0</code><code>) {                     </code><code>// xorshift</code>

<code>            </code><code>r ^= r &lt;&lt; </code><code>13</code><code>; r ^= r &gt;&gt;&gt; </code><code>17</code><code>; k = r ^= r &lt;&lt; </code><code>5</code><code>;</code>

<code>        </code><code>else</code>

<code>            </code><code>++k;</code>

<code>    </code><code>if</code> <code>(scanguard != g)                       </code><code>// staleness check</code>

<code>    </code><code>else</code> <code>{                                    </code><code>// try to take submission</code>

<code>        </code><code>if</code> <code>((b = queuebase) != queuetop &amp;&amp;</code>

<code>            </code><code>(q = submissionqueue) != </code><code>null</code> <code>&amp;&amp;</code>

<code>            </code><code>(i = (q.length - </code><code>1</code><code>) &amp; b) &gt;= </code><code>0</code><code>) {</code>

<code>            </code><code>if</code> <code>((t = q[i]) != </code><code>null</code> <code>&amp;&amp; queuebase == b &amp;&amp;</code>

<code>                </code><code>queuebase = b + </code><code>1</code><code>;</code>

<code>            </code><code>return</code> <code>false</code><code>;</code>

<code>        </code><code>return</code> <code>true</code><code>;                         </code><code>// all queues empty</code>

看起来很复杂,实际的原理则很简单,就是先尝试做任务窃取( work stealing ),如果不满足条件则到提交队列中获取任务。而forkjoinworkerthread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:

线程会先执行forkjoinworkerthread对象内维护的任务队列中的任务,即forkjoinworkerthread的exectask()方法中的循环实现。通常是lifo,即去最新的任务。也有特殊情况,这个根据变量locallyfifo的值来判断。

之后会尝试做任务窃取,尝试从其他线程中获取任务

任务窃取条件不满足时,到提交队列中获取提交的任务

1.4. forkjoinpool的其它属性

除了上述提到的操作,forkjoin中还维护了

线程数组和提交任务的队列,这是最基本的

操作相关的锁和条件对象

volatile long ctl; 等线程池forkjoinpool状态的属性

static final random workerseedgenerator; 等和任务窃取策略相关的一系列属性

 private volatile long stealcount; 等数据统计相关属性

等数据属性。

2. forkjoinworkerthread

forkjoinworkerthread扩展于thread类,但提供了很多支持forkjoin的特性。

上文在介绍forkjoinpool的时候已经对这个类做了很多描述,也强调过线程类forkjoinworkerthread和forkjoinpool相互依赖,放在一起才有意义。实际上,还要提到描述fork join任务的类forkjointask。

除了上面提到的以外,对于forkjoinworkerthread这个类,再稍微提一下这样几个点:

forkjointask&lt;?&gt;[] queue; 这是维护和forkjoin相关的(子)任务队列,还有queuetop和queuebase属性,分别标记队列的尾部和头部

final forkjoinpool pool; 指向线程池的引用,需要注意的是,这个属性被final修饰

和forkjointask的fork()和join()方法相关的方法——pushtask()和unpushtask(),分别负责在当前forkjoinworkerthread对象维护的队列中新增和取回任务

其它与状态和统计相关的属性

3. forkjointask及两个抽象子类

forkjointask是forkjoin框架中的主体,是forkjoin中任务的体现。这个类实现了future和serializable接口。除了futrue接口要满足的方法外,我想有这样3个方法是有必要知道的,分别是fork()、join()和exec()。

对于fork(),这个也许大家都很熟悉了,在这里也就是分解出子任务的执行。这个在实现上很简单那,就是在当前线程forkjoinworkerthread对象维护的队列中加入新的子任务。实现如下:

public final forkjointask fork() {

    ((forkjoinworkerthread) thread.currentthread())

        .pushtask(this);

    return this;

}

需要注意的是fork()方法的调用是在当前线程对象为forkjoinworkerthread的条件下。

我们再来看看对应的join()实现:

<code>public</code> <code>final</code> <code>v join() {</code>

<code>    </code><code>if</code> <code>(dojoin() != normal)</code>

<code>        </code><code>return</code> <code>reportresult();</code>

<code>        </code><code>return</code> <code>getrawresult();</code>

显然,它有调用了dojoin()方法,我们再来深入了解下。

<code>private</code> <code>int</code> <code>dojoin() {</code>

<code>    </code><code>thread t; forkjoinworkerthread w; </code><code>int</code> <code>s; </code><code>boolean</code> <code>completed;</code>

<code>    </code><code>if</code> <code>((t = thread.currentthread()) </code><code>instanceof</code> <code>forkjoinworkerthread) {</code>

<code>        </code><code>if</code> <code>((s = status) &lt; </code><code>0</code><code>)</code>

<code>            </code><code>return</code> <code>s;</code>

<code>        </code><code>if</code> <code>((w = (forkjoinworkerthread)t).unpushtask(</code><code>this</code><code>)) {</code>

<code>            </code><code>try</code> <code>{</code>

<code>                </code><code>completed = exec();</code>

<code>            </code><code>} </code><code>catch</code> <code>(throwable rex) {</code>

<code>                </code><code>return</code> <code>setexceptionalcompletion(rex);</code>

<code>            </code><code>if</code> <code>(completed)</code>

<code>                </code><code>return</code> <code>setcompletion(normal);</code>

<code>        </code><code>return</code> <code>w.jointask(</code><code>this</code><code>);</code>

<code>        </code><code>return</code> <code>externalawaitdone();</code>

大概的逻辑是这样的,在当前线程对象为forkjoinworkerthread的条件下,从队列中取回当前任务forkjointask对象,并尝试在调用线程对其直接执行,否则当前线程调用wait()阻塞等待。更深入的理解可续继续查阅源码。

最后,我们再来看看exec()方法,这个是在forkjointask中是没有给出实现的。

在jdk中,有forkjointask的两个抽象子类recursiveaction和recursivetask,他们分别给出了exec()的实现,这也是这两个子类主要做的事情,实际上是调用了各自的compute()方法,而在recursiveaction和recursivetask中compute()又是未给出实现的。

实际上,compute()方法就是fork join要执行的内容,是fork join任务的实质,需要开发者给出。

而recursiveaction和recursivetask就是方便开发者使用fork join的,recursiveaction和recursivetask这两个类的区别仅仅是返回结果的情况不同。而这个compute()方法就是留给开发者继承扩展使用的。这个会在下篇文章详细讲述。

特别说明:尊重作者的劳动成果,转载请注明出处哦~~~http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp85