天天看點

Java多線程ForkJoin并行架構詳細解析

1.引言:

     現在,硬體的處理能力,基本處于水準發展,為了更好的提高性能,現在都是采用的多核CPU,盡可能的提升并發性能,但是有一個問題是程式的本身并發處理能力不強,就會造成不能夠合理的利用多核心資源,例如,多個線程運作時,一個CPU配置設定的任務較少,當該CPU完成其任務後,處于空閑狀态,這就浪費了資源。

     在JDK7時,引入了ForkJoinPool,是一個線程池,其核心的思想是将一個大任務拆分成小任務,然後将每個小任務的結果彙總到一個結果上。它本身是一個線程池,其所支援的最大線程數,就是硬體的CPU的數量,是AbstractExecutorService的子類,并引入了“工作竊取算法”,使其在多線程并行處理的效果上更佳,如下圖所示:

Java多線程ForkJoin并行架構詳細解析

1.1 工作竊取算法

    work-stealing(工作竊取),ForkJoinPool提供了一個更有效的利用線程的機制,當ThreadPoolExecutor還在用單個隊列存放任務時,ForkJoinPool已經配置設定了與線程數相等的隊列,當有任務加入線程池時,會被平均配置設定到對應的隊列上,各線程進行正常工作,當有線程提前完成時,會從隊列的末端“竊取”其他線程未執行完的任務,當任務量特别大時,CPU多的計算機會表現出更好的性能。

2.常用方法:

   ForkJoin架構,必須先建立一個ForkJoin任務。它提供了分而治之的操作機制,即在提供的任務中執行fork()和join()的操作機制,通常我們不直接繼承ForkJoinTask類,而是繼承其兩個子類:

   1. RecursiveTask<T>:用于有傳回值結果的任務。

   2. RecursiveAction:用于沒有傳回結果的任務。

   ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到目前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列裡暫時沒有任務時,它會随機從其他工作線程的隊列的尾部擷取一個任務。

3.執行個體:

求一個數組中的最大值

比較了“正常算法”和“利用ForkJoin架構所需的時間”

package com.it.thread;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestForkJoinMax {

	public static void main(String[] args) {
		 int[] data = new int[40000000];
		 Random random = new Random();
		 for(int i = 0 ;i<data.length;i++) {
			 data[i]=random.nextInt(100000000);
		 }
		 
		 System.out.println(findMax1(data));
		 System.out.println(findMax2(data));
	}
	
	static int findMax1(int data[]) {
		int max = data[0];
		long t1 = System.nanoTime();
		for(int i = 1;i<data.length;i++) {
			if (max<data[i]) {
				max=data[i];
			}
		}	
		long t2 = System.nanoTime();
		System.out.println(t2-t1);
		return max;	
	}
	
	
	static int findMax2(int data[]) {
		class MyFindMax extends RecursiveTask<Integer>{
			int data[];
			int start;//包含數組的起始下标
			int end;//不包含結束下标
			//設定一個門檻值,當作條件進行大任務的拆分條件
			static final int THRESHOLD=100000;

			public MyFindMax(int[] data, int start, int end) {
				super();
				this.data = data;
				this.start = start;
				this.end = end;
			}



			@Override
			protected Integer compute() {
                                //小于門檻值,直接利用正常算法,進行最大值尋找
				if (end-start<=THRESHOLD) {
					int max = data[start];
					for(int i = start+1;i<end;i++) {
						if (max<data[i]) {
							max=data[i];
						}
					}
					return max;
				}else {
                                        //如果大于門檻值,進分裂成兩個子任務處理
					int middle = (start+end)/2;
					MyFindMax task1 = new MyFindMax(data, start, middle);
					MyFindMax task2 = new MyFindMax(data, middle, end);                                   //執行兩個子任務
					invokeAll(task1, task2);
                                        //擷取子任務的傳回值
					int result1 =task1.join();
					int result2 =task2.join();
					if (result2>result1) {
						return result2;				
					}else {
						return result1;
					}
				}
				
			}
			
		}
		long t1 = System.nanoTime();
                //建立ForkJoinPool 線程池
		ForkJoinPool pool = new ForkJoinPool();
		MyFindMax main = new MyFindMax(data, 0, data.length);
                //将大任務送出給線程池
		int result =  pool.invoke(main);
		long t2 = System.nanoTime();
		System.out.println(t2-t1);
		return result;
	}
}
           

 運作結果:

31780336

99999996

29180490

99999996

第一行:正常算法所需的時間(納秒為機關)已經是最優的算法了

第二行:正常算法所計算的最大值

第三行:ForkJoin架構運算所需時間(納秒為機關)

第四行:ForkJoin架構所計算的最大值

很明顯可以看出ForkJoin并行架構使用的時間少,說明其并行運作的效率很高。

4.對ForkJoin架構的常見優化方法:

1.盡量選取一個适合的門檻值。

2.分裂為子任務時,都需要建立對象,可以在此位置進行優化。

繼續閱讀