1.引言:
現在,硬體的處理能力,基本處于水準發展,為了更好的提高性能,現在都是采用的多核CPU,盡可能的提升并發性能,但是有一個問題是程式的本身并發處理能力不強,就會造成不能夠合理的利用多核心資源,例如,多個線程運作時,一個CPU配置設定的任務較少,當該CPU完成其任務後,處于空閑狀态,這就浪費了資源。
在JDK7時,引入了ForkJoinPool,是一個線程池,其核心的思想是将一個大任務拆分成小任務,然後将每個小任務的結果彙總到一個結果上。它本身是一個線程池,其所支援的最大線程數,就是硬體的CPU的數量,是AbstractExecutorService的子類,并引入了“工作竊取算法”,使其在多線程并行處理的效果上更佳,如下圖所示:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczX0xiRGZkRGZ0Xy9GbvNGL2EzXlpXazxSPrdVWrljVaFDazgFcGdFZv50MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zROBlL3gzMzADM1kTM5ADNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
1.1 工作竊取算法
work-stealing(工作竊取),ForkJoinPool提供了一個更有效的利用線程的機制,當ThreadPoolExecutor還在用單個隊列存放任務時,ForkJoinPool已經配置設定了與線程數相等的隊列,當有任務加入線程池時,會被平均配置設定到對應的隊列上,各線程進行正常工作,當有線程提前完成時,會從隊列的末端“竊取”其他線程未執行完的任務,當任務量特别大時,CPU多的計算機會表現出更好的性能。
2.常用方法:
ForkJoin架構,必須先建立一個ForkJoin任務。它提供了分而治之的操作機制,即在提供的任務中執行fork()和join()的操作機制,通常我們不直接繼承ForkJoinTask類,而是繼承其兩個子類:
1. RecursiveTask<T>:用于有傳回值結果的任務。
2. RecursiveAction:用于沒有傳回結果的任務。
ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到目前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列裡暫時沒有任務時,它會随機從其他工作線程的隊列的尾部擷取一個任務。
3.執行個體:
求一個數組中的最大值
比較了“正常算法”和“利用ForkJoin架構所需的時間”
package com.it.thread;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestForkJoinMax {
public static void main(String[] args) {
int[] data = new int[40000000];
Random random = new Random();
for(int i = 0 ;i<data.length;i++) {
data[i]=random.nextInt(100000000);
}
System.out.println(findMax1(data));
System.out.println(findMax2(data));
}
static int findMax1(int data[]) {
int max = data[0];
long t1 = System.nanoTime();
for(int i = 1;i<data.length;i++) {
if (max<data[i]) {
max=data[i];
}
}
long t2 = System.nanoTime();
System.out.println(t2-t1);
return max;
}
static int findMax2(int data[]) {
class MyFindMax extends RecursiveTask<Integer>{
int data[];
int start;//包含數組的起始下标
int end;//不包含結束下标
//設定一個門檻值,當作條件進行大任務的拆分條件
static final int THRESHOLD=100000;
public MyFindMax(int[] data, int start, int end) {
super();
this.data = data;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
//小于門檻值,直接利用正常算法,進行最大值尋找
if (end-start<=THRESHOLD) {
int max = data[start];
for(int i = start+1;i<end;i++) {
if (max<data[i]) {
max=data[i];
}
}
return max;
}else {
//如果大于門檻值,進分裂成兩個子任務處理
int middle = (start+end)/2;
MyFindMax task1 = new MyFindMax(data, start, middle);
MyFindMax task2 = new MyFindMax(data, middle, end); //執行兩個子任務
invokeAll(task1, task2);
//擷取子任務的傳回值
int result1 =task1.join();
int result2 =task2.join();
if (result2>result1) {
return result2;
}else {
return result1;
}
}
}
}
long t1 = System.nanoTime();
//建立ForkJoinPool 線程池
ForkJoinPool pool = new ForkJoinPool();
MyFindMax main = new MyFindMax(data, 0, data.length);
//将大任務送出給線程池
int result = pool.invoke(main);
long t2 = System.nanoTime();
System.out.println(t2-t1);
return result;
}
}
運作結果:
31780336
99999996
29180490
99999996
第一行:正常算法所需的時間(納秒為機關)已經是最優的算法了
第二行:正常算法所計算的最大值
第三行:ForkJoin架構運算所需時間(納秒為機關)
第四行:ForkJoin架構所計算的最大值
很明顯可以看出ForkJoin并行架構使用的時間少,說明其并行運作的效率很高。
4.對ForkJoin架構的常見優化方法:
1.盡量選取一個适合的門檻值。
2.分裂為子任務時,都需要建立對象,可以在此位置進行優化。