天天看点

Java Fork Join框架 (三) 设计

原文 http://gee.cs.oswego.edu/dl/papers/fj.pdf

作者:Doug Lea

译者:Alex

Fork/Join程序可以在任何支持以下特性的框架之上运行:框架能够让构建的子任务并行执行,并且拥有一种等待子任务运行结束的机制。然而,java.lang.Thread类(同时也包括POSIX pthreads, 这些也是Java线程所基于的基础)对Fork/Join程序来说并不是最优的选择:

Fork/Join 任务对同步和管理有简单的和常规的需求。相对于常规的线程来说,fork/join任务所展示的计算布局将会带来更加灵活的调度策略。例如,fork/join任务除了等待子任务外,其他情况下是不需要阻塞的。因此传统的用于跟踪记录阻塞线程的代价在这种情况下实际上是一种浪费。

对于一个合理的基础任务粒度来说,构建和管理一个线程的代价甚至可以比任务执行本身所花费的代价更大。尽管粒度是应该随着应用程序在不同特定平台上运行而做出相应调整的。但是超过线程开销的极端粗粒度会限制并行的发挥。

简而言之,Java标准的线程框架对fork/join程序而言太笨重了。但是既然线程构成了很多其他的并发和并行编程的基础,完全消除这种代价或者为了这种方式而调整线程调度是不可能(或者说不切实际的)。

尽管这种思想已经存在了很长时间了,但是第一个发布的能系统解决这些问题的框架是Cilk[5]。Cilk和其他轻量级的框架是基于操作系统的基本的线程和进程机制来支持特殊用途的fork/join程序。这种策略同样适用于Java,尽管Java线程是基于低级别的操作系统的能力来实现的。创造这样一个轻量级的执行框架的主要优势是能够让fork/join程序以一种更直观的方式编写,进而能够在各种支持JVM的系统上运行。

Java Fork Join框架 (三) 设计

FJTask框架是基于Cilk设计的一种演变。其他的类似框架有Hood[4], Filaments[8],stackthreads[10], 以及一些依赖于轻量级执行任务的相关系统。所有这些框架都采用和操作系统把线程映射到CPU上相同的方式来把任务映射到线程上。只是他们会使用fork/join程序的简单性、常规性以及一致性来执行这种映射。尽管这些框架都能适应不能形式的并行程序,他们优化了fork/join的设计:

一组工作者线程池是准备好的。每个工作线程都是标准的(“重量级”)处理存放在队列中任务的线程(这地方指的是Thread类的子类FJTaskRunner的实例对象)。通常情况下,工作线程应该与系统的处理器数量一致。对于一些原生的框架例如说Cilk,他们首先将映射成内核线程或者是轻量级的进程,然后再在处理器上面运行。在Java中,虚拟机和操作系统需要相互结合来完成线程到处理器的映射。然后对于计算密集型的运算来说,这种映射对于操作系统来说是一种相对简单的任务。任何合理的映射策略都会导致线程映射到不同的处理器。

所有的fork/join任务都是轻量级执行类的实例,而不是线程实例。在Java中,独立的可执行任务必须要实现Runnable接口并重写run方法。在FJTask框架中,这些任务将作为子类继承FJTask而不是Thread,它们都实现了Runnable接口。(对于上面两种情况来说,一个类也可以选择实现Runnable接口,类的实例对象既可以在任务中执行也可以在线程中执行。因为任务执行受到来自FJTask方法严厉规则的制约,子类化FJTask相对来说更加方便,也能够直接调用它们。)

我们将采用一个特殊的队列和调度原则来管理任务并通过工作线程来执行任务。这些机制是由任务类中提供的相关方式实现的:主要是由fork,join,isDone(一个结束状态的标示符),和一些其他方便的方法,例如调用coInvoke来分解合并两个或两个以上的任务。

一个简单的控制和管理类(这里指的是FJTaskRunnerGroup)来启动工作线程池,并初始化执行一个由正常的线程调用所触发的fork/join任务(就类似于Java程序中的main方法)。

作为一个给程序员演示这个框架如何运行的标准实例,这是一个计算法斐波那契函数的类。

<a href="http://ifeve.com/a-java-fork-join-framework-3-2/#viewSource">查看源代码</a>

<code>01</code>

<code>class</code> <code>Fib</code><code>extends</code> <code>FJTask {</code>

<code>02</code>

<code>    </code><code>static</code> <code>final</code> <code>int</code> <code>threshold =</code><code>13</code><code>;</code>

<code>03</code>

<code>    </code><code>volatile</code> <code>int</code> <code>number;</code><code>// arg/result</code>

<code>04</code>

<code>05</code>

<code>    </code><code>Fib(</code><code>int</code> <code>n) {</code>

<code>06</code>

<code>        </code><code>number = n;</code>

<code>07</code>

<code>    </code><code>}</code>

<code>08</code>

<code>09</code>

<code>    </code><code>int</code> <code>getAnswer() {</code>

<code>10</code>

<code>        </code><code>if</code> <code>(!isDone())</code>

<code>11</code>

<code>            </code><code>throw</code> <code>new</code> <code>IllegalStateException();</code>

<code>12</code>

<code>        </code><code>return</code> <code>number;</code>

<code>13</code>

<code>14</code>

<code>15</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>16</code>

<code>        </code><code>int</code> <code>n = number;</code>

<code>17</code>

<code>        </code><code>if</code> <code>(n &lt;= threshold)</code><code>// granularity ctl</code>

<code>18</code>

<code>            </code><code>number = seqFib(n);</code>

<code>19</code>

<code>        </code><code>else</code> <code>{</code>

<code>20</code>

<code>            </code><code>Fib f1 =</code><code>new</code> <code>Fib(n ?</code><code>1</code><code>);</code>

<code>21</code>

<code>            </code><code>Fib f2 =</code><code>new</code> <code>Fib(n ?</code><code>2</code><code>);</code>

<code>22</code>

<code>            </code><code>coInvoke(f1, f2);</code>

<code>23</code>

<code>            </code><code>number = f1.number + f2.number;</code>

<code>24</code>

<code>        </code><code>}</code>

<code>25</code>

<code>26</code>

<code>27</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>

<code>28</code>

<code>        </code><code>try</code> <code>{</code>

<code>29</code>

<code>            </code><code>int</code> <code>groupSize =</code><code>2</code><code>;</code><code>// for example</code>

<code>30</code>

<code>            </code><code>FJTaskRunnerGroup group =</code><code>new</code> <code>FJTaskRunnerGroup(groupSize);</code>

<code>31</code>

<code>            </code><code>Fib f =</code><code>new</code> <code>Fib(</code><code>35</code><code>);</code><code>// for example</code>

<code>32</code>

<code>            </code><code>group.invoke(f);</code>

<code>33</code>

<code>            </code><code>int</code> <code>result = f.getAnswer();</code>

<code>34</code>

<code>            </code><code>System.out.println(</code><code>"Answer: "</code> <code>+ result);</code>

<code>35</code>

<code>        </code><code>}</code><code>catch</code> <code>(InterruptedException ex) {</code>

<code>36</code>

<code>37</code>

<code>38</code>

<code>39</code>

<code>    </code><code>int</code> <code>seqFib(</code><code>int</code> <code>n) {</code>

<code>40</code>

<code>        </code><code>if</code> <code>(n &lt;=</code><code>1</code><code>)</code><code>return</code> <code>n;</code>

<code>41</code>

<code>        </code><code>else</code> <code>return</code> <code>seqFib(n?</code><code>1</code><code>) + seqFib(n?</code><code>2</code><code>);</code>

<code>42</code>

<code>43</code>

<code>}</code>

这个版本在第4节中所提到的平台上的运行速度至少比每个任务都在Thread类中运行快30倍。在保持性能的同时这个程序仍然维持着Java多线程程序的可移植性。对程序员来说通常有两个参数值的他们关注:

对于工作线程的创建数量,通常情况下可以与平台所拥有的处理器数量保持一致(或者更少,用于处理其他相关的任务,或者有些情况下更多,来提升非计算密集型任务的性能)。

一个粒度参数代表了创建任务的代价会大于并行化所带来的潜在的性能提升的临界点。这个参数更多的是取决于算法而不是平台。通常在单处理器上运行良好的临界点,在多处理器平台上也会发挥很好的效果。作为一种附带的效益,这种方式能够与Java虚拟机的动态编译机制很好的结合,而这种机制在对小块方法的优化方面相对于单块的程序来说要好。这样,加上数据本地化的优势,fork/join算法的性能即使在单处理器上面的性能都较其他算法要好。

Fork/jion框架的核心在于轻量级调度机制。FJTask采用了Cilk的 work-stealing 所采用的基本调度策略:

每一个工作线程维护自己的调度队列中的可运行任务。

队列以双端队列的形式被维护(注:deques通常读作“decks”),不仅支持后进先出——LIFO的push和pop操作,还支持先进先出——FIFO的take操作。

对于一个给定的工作线程来说,任务所产生的子任务将会被放入到工作者自己的双端队列中。

工作线程使用后进先出——LIFO(最早的优先)的顺序,通过弹出任务来处理队列中的任务。

当一个工作线程的本地没有任务去运行的时候,它将使用先进先出——FIFO的规则尝试随机的从别的工作线程中拿(“偷窃”)一个任务去运行。

当一个工作线程触及了join操作,如果可能的话它将处理其他任务,直到目标任务被告知已经结束(通过isDone方法)。所有的任务都会无阻塞的完成。

当一个工作线程无法再从其他线程中获取任务和失败处理的时候,它就会退出(通过yields, sleeps, 和/或者优先级调整,参考第3节)并经过一段时间之后再度尝试直到所有的工作线程都被告知他们都处于空闲的状态。在这种情况下,他们都会阻塞直到其他的任务再度被上层调用。

Java Fork Join框架 (三) 设计

使用后进先出——LIFO用来处理每个工作线程的自己任务,但是使用先进先出——FIFO规则用于获取别的任务,这是一种被广泛使用的进行递归fork/join设计的一种调优手段。引用[5]讨论了详细讨论了里面的细节。

让偷取任务的线程从队列拥有者相反的方向进行操作会减少线程竞争。同样体现了递归分治算法的大任务优先策略。因此,更早期被偷取的任务有可能会提供一个更大的单元任务,从而使得偷取线程能够在将来进行递归分解。

作为上述规则的一个后果,对于一些基础的操作而言,使用相对较小粒度的任务比那些仅仅使用粗粒度划分的任务以及那些没有使用递归分解的任务的运行速度要快。尽管相关的少数任务在大多数的fork/join框架中会被其他工作线程偷取,但是创建许多组织良好的任务意味着只要有一个工作线程处于可运行的状态,那么这个任务就有可能被执行。