天天看点

定制并发类(七)实现ThreadFactory接口生成自定义的线程给Fork/Join框架

实现threadfactory接口生成自定义的线程给fork/join框架

fork/join框架是java7中最有趣的特征之一。它是executor和executorservice接口的一个实现,允许你执行callable和runnable任务而不用管理这些执行线程。

这个执行者面向执行能被拆分成更小部分的任务。主要组件如下:

一个特殊任务,实现forkjointask类

两种操作,将任务划分成子任务的fork操作和等待这些子任务结束的join操作

一个算法,优化池中线程的使用的work-stealing算法。当一个任务正在等待它的子任务(结束)时,它的执行线程将执行其他任务(等待执行的任务)。

forkjoinpool类是fork/join的主要类。在它的内部实现,有如下两种元素:

一个存储等待执行任务的列队。

一个执行任务的线程池

在这个指南中,你将学习如何实现一个在forkjoinpool类中使用的自定义的工作者线程,及如何使用一个工厂来使用它。

准备工作…

这个指南的例子使用eclipse ide实现。如果你使用eclipse或其他ide,如netbeans,打开它并创建一个新的java项目。

如何做…

按以下步骤来实现的这个例子:

1.创建一个继承forkjoinworkerthread类的myworkerthread类。

<code>1</code>

<code>public</code> <code>class</code> <code>myworkerthread</code><code>extends</code> <code>forkjoinworkerthread {</code>

2.声明和创建一个参数化为integer类的threadlocal属性,名为taskcounter。

<code>private</code> <code>static</code> <code>threadlocal&lt;integer&gt; taskcounter=</code><code>new</code> <code>threadlocal&lt;integer&gt;();</code>

3.实现这个类的构造器。

<code>protected</code> <code>myworkerthread(forkjoinpool pool) {</code>

<code>2</code>

<code>super</code><code>(pool);</code>

<code>3</code>

<code>}</code>

4.重写onstart()方法。调用父类的这个方法,写入一条信息到控制台。设置当前线程的taskcounter属性值为0。

<code>@override</code>

<code>protected</code> <code>void</code> <code>onstart() {</code>

<code>super</code><code>.onstart();</code>

<code>4</code>

<code>system.out.printf("myworkerthread %d: initializing task</code>

<code>5</code>

<code>counter.\n",getid());</code>

<code>6</code>

<code>taskcounter.set(</code><code>0</code><code>);</code>

<code>7</code>

5.重写ontermination()方法。写入当前线程的taskcounter属性值到控制台。

<code>protected</code> <code>void</code> <code>ontermination(throwable exception) {</code>

<code>system.out.printf("myworkerthread %d:</code>

<code>%d\n",getid(),taskcounter.get());</code>

<code>super</code><code>.ontermination(exception);</code>

6.实现addtask()方法。递增taskcounter属性值。

<code>public</code> <code>void</code> <code>addtask(){</code>

<code>int</code> <code>counter=taskcounter.get().intvalue();</code>

<code>counter++;</code>

<code>taskcounter.set(counter);</code>

7.创建一个实现forkjoinworkerthreadfactory接口的myworkerthreadfactory类。实现newthread()方法,创建和返回一个myworkerthread对象。

<code>public</code> <code>forkjoinworkerthread newthread(forkjoinpool pool) {</code>

<code>return</code> <code>new</code> <code>myworkerthread(pool);</code>

8.创建myrecursivetask类,它继承一个参数化为integer类的recursivetask类。

<code>public</code> <code>class</code> <code>myrecursivetask</code><code>extends</code> <code>recursivetask&lt;integer&gt; {</code>

9.声明一个私有的、int类型的属性array。

<code>private</code> <code>int</code> <code>array[];</code>

10.声明两个私有的、int类型的属性start和end。

<code>private</code> <code>int</code> <code>start, end;</code>

11.实现这个类的构造器,初始化它的属性。

<code>public</code> <code>myrecursivetask(</code><code>int</code> <code>array[],</code><code>int</code> <code>start,</code><code>int</code> <code>end) {</code>

<code>this</code><code>.array=array;</code>

<code>this</code><code>.start=start;</code>

<code>this</code><code>.end=end;</code>

12.实现compute()方法,用来合计数组中在start和end位置之间的所有元素。首先,将执行这个任务的线程转换成一个myworkerthread对象,然后使用addtask()方法来增长这个线程的任务计数器。

<code>protected</code> <code>integer compute() {</code>

<code>integer ret;</code>

<code>myworkerthread thread=(myworkerthread)thread.currentthread();</code>

<code>thread.addtask();</code>

13.实现addresults()方法。计算和返回两个任务(接收参数)的结果的总和。

<code>01</code>

<code>private</code> <code>integer addresults(task task1, task task2) {</code>

<code>02</code>

<code>int</code> <code>value;</code>

<code>03</code>

<code>try</code> <code>{</code>

<code>04</code>

<code>value = task1.get().intvalue()+task2.get().intvalue();</code>

<code>05</code>

<code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>06</code>

<code>e.printstacktrace();</code>

<code>07</code>

<code>value=</code><code>0</code><code>;</code>

<code>08</code>

<code>}</code><code>catch</code> <code>(executionexception e) {</code>

<code>09</code>

<code>10</code>

<code>11</code>

14.令这个线程睡眠10毫秒,然后返回任务的结果。

<code>timeunit.milliseconds.sleep(</code><code>10</code><code>);</code>

<code>return</code> <code>value;</code>

15.实现这个例子的主类,通过创建main类,并实现main()方法。

<code>public</code> <code>class</code> <code>main {</code>

<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>exception {</code>

16.创建一个名为factory的myworkerthreadfactory对象。

<code>myworkerthreadfactory factory=</code><code>new</code> <code>myworkerthreadfactory();</code>

17.创建一个名为pool的forkjoinpool对象,将前面创建的factory对象作为参数传给它的构造器。

<code>forkjoinpool pool=</code><code>new</code> <code>forkjoinpool(</code><code>4</code><code>, factory,</code><code>null</code><code>,</code><code>false</code><code>);</code>

18.创建一个大小为100000的整数数组,将所有元素初始化为值1。

<code>int</code> <code>array[]=</code><code>new</code> <code>int</code><code>[</code><code>100000</code><code>];</code>

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;array.length; i++){</code>

<code>array[i]=</code><code>1</code><code>;</code>

19.创建一个新的task对象,用来合计数组中的所有元素。

<code>myrecursivetask task=</code><code>new</code> <code>myrecursivetask(array,</code><code>0</code><code>,array.length);</code>

20.使用execute()方法,将这个任务提交给池。

<code>pool.execute(task);</code>

21.使用join()方法,等待这个任务的结束。

<code>task.join();</code>

22.使用shutdown()方法,关闭这个池。

<code>pool.shutdown();</code>

23.使用awaittermination()方法,等待这个执行者的结束。

<code>pool.awaittermination(</code><code>1</code><code>, timeunit.days);</code>

24.使用get()方法,将任务的结束写入到控制台。

<code>system.out.printf(</code><code>"main: result: %d\n"</code><code>,task.get());</code>

25.写入一条信息到控制台,表明程序的结束。

<code>system.out.printf(</code><code>"main: end of the program\n"</code><code>);</code>

它是如何工作的…

fork/join框架使用的线程叫工作者线程。java包含继承thread类的forkjoinworkerthread类和使用fork/join框架实现工作者线程。

在这个指南中,你已实现了继承forkjoinworkerthread类的myworkerthread类,并重写这个类的两个方法。你的目标是实现每个工作者线程的任务计数器,以至于你可以知道每个工作者线程执行多少个任务。你已经通过一个threadlocal属性实现计数器。这样,每个线程都拥有它自己的计数器,对于来你说是透明的。

你已重写forkjoinworkerthread类的onstart()方法来实现任务的计数器。当工作者线程开始它的执行时,这个方法将被调用。你也重写了ontermination()方法,将任务计数器的值写入到控制台。当工作者线程结束它的执行时,这个方法将被调用。你也在myworkerthread类中实现addtask()方法,用来增加每个线程的任务计数器。

对于forkjoinpool类,与java并发api中的所有执行者一样,使用工厂来创建它。所以,如果你想在forkjoinpool类中使用myworkerthread线程,你必须实现自己的线程工厂。对于fork/join框架,这个工厂必须实现forkjoinpool.forkjoinworkerthreadfactory类。为此,你已实现myworkerthreadfactory类。这个类只有一个用来创建一个新的myworkerthread对象的方法。

最后,你只要使用已创建的工厂来初始化forkjoinpool类。你已在main类中通过使用forkjoinpool的构造器实现了。

以下截图显示了这个程序的部分输出:

定制并发类(七)实现ThreadFactory接口生成自定义的线程给Fork/Join框架

你可以看出forkjoinpool对象如何执行4个工作者线程及每个工作者线程执行多少个任务。

不止这些…

考虑一下,当一个线程正常结束或抛出一个exception异常时,调用的forkjoinworkerthread提供的ontermination()方法。这个方法接收一个throwable对象作为参数。如果这个参数值为null时,表明这个工作者线程正常结束。但是,如果这个参数的值不为null,表明这个线程抛出一个异常。你必须包含必要的代码来处理这种情况。