ForkJoin是Java7提供的原生多線程并行處理架構,其基本思想是将大人物分割成小任務,最後将小任務聚合起來得到結果。它非常類似于HADOOP提供的MapReduce架構,隻是MapReduce的任務可以針對叢集内的所有計算節點,可以充分利用叢集的能力完成計算任務。ForkJoin更加類似于單機版的MapReduce。
我們要使用ForkJoin架構,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join的操作機制,通常我們不直接繼承ForkjoinTask類,隻需要直接繼承其子類。
1. RecursiveAction,用于沒有傳回結果的任務
2. RecursiveTask,用于有傳回值的任務
· ForkJoinPool:task要通過ForkJoinPool來執行,分割的子任務也會添加到目前工作線程的雙端隊列中,進入隊列的頭部。當一個工作線程中沒有任務時,會從其他工作線程的隊列尾部擷取一個任務。
ForkJoin架構使用了工作竊取的思想(work-stealing),算法從其他隊列中竊取任務來執行。
下面代碼,使我模拟的一個場景,即進行數組求和,假如我們的數組大小為400,計算機每次求一百個數字之和需要花費一秒鐘(使用線程睡眠模拟)。那麼單線程情況下大概需要4秒鐘,而我們使用4個線程分别進行100個數字的求和,其并行計算,隻需要1秒鐘。
代碼如下:
package com.zt.thread;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
class CountTask extends RecursiveTask {
private static final long serialVersionUID = 1L;
// 100個數組進行一次計算
private static final int SIZE = 100;
// 存儲本次計算的數字
private int[] array;
// 計算起始位置
private int start;
// 計算結束位置
private int end;
CountTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 如果任務小于我們規定的閥值,則直接進行計算
if (end - start <= SIZE) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return sum;
}
// 任務太大,将任務一分為二
int middle = (end + start) / 2;
CountTask ct1 = new CountTask(array, start, middle);
CountTask ct2 = new CountTask(array, middle, end);
invokeAll(ct1, ct2);
long res1 = ct1.join();
long res2 = ct2.join();
return res1 + res2;
}
}
public class ForkAndJoin {
private static void fillRandomArray(int[] array) {
Random rd=new Random();
for (int i = 0; i < array.length; i++) {
array[i] = rd.nextInt(100);
}
}
// 單線程進行數組求和計算
private static long computeArray(int[] array) {
long sum = 0;
for (int i = 0; i < array.length; i++) {
sum += array[i];
if ((i + 1) % 100 == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
return sum;
}
public static void main(String[] args) {
// 擷取cpu核心數
// Runtime.getRuntime().availableProcessors()
int[] test = new int[400];
fillRandomArray(test);
// 模拟單個線程進行計算,假設,我們每計算100個數字,需要消耗一秒鐘(這裡我們使用線程睡眠1秒鐘)
long startTime1 = System.currentTimeMillis();
Long result1 = computeArray(test);
long endTime1 = System.currentTimeMillis();
System.out.println("single Thread sum: " + result1 + " in "
+ (endTime1 - startTime1) + " ms.");
// 使用fork/join 進行并行計算,我的電腦四核 可以使用
// Runtime.getRuntime().availableProcessors()
ForkJoinPool fp = new ForkJoinPool(Runtime.getRuntime()
.availableProcessors());
ForkJoinTask ft = new CountTask(test, 0, test.length);
long startTime = System.currentTimeMillis();
Long result = fp.invoke(ft);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in "
+ (endTime - startTime) + " ms.");
}
}
執行結果:
image.png
注意,這裡有個特别需要注意的地方,在劃分任務時,很多人會這樣寫。
int middle = (end + start) / 2;
CountTask ct1 = new CountTask(array, start, middle);
CountTask ct2 = new CountTask(array, middle, end);
ct1.fork();
ct2.fork();
long res1 = ct1.join();
long res2 = ct2.join();
因為compute()方法其實本身就是一個線程,這樣寫,就讓compute()所在的線程閑置了。是以應當使用invokeAll()方法,invokeAll的N個任務中,其中N-1個任務會使用fork()交給其它線程執行,但是,它還會留一個任務自己執行,這樣,就充分利用了線程池,保證沒有空閑的不幹活的線程。(廖學峰大大的教程真的不錯,強烈推薦)。