天天看點

ForkJoinPool 學習示例

     在JAVA7之前,并行處理資料非常麻煩。第一,你得明确把包含資料的資料結構分成若幹份。第二,你要将每個子部分配置設定給一個獨立的線程。第三,你要在恰當的時候對它們進行同步避免不希望的競争條件,等待所有線程完成,最後把這些部分結果合并起來。在Java 7引入了分支/合并架構,讓這些操作更穩定、更不容易出錯。

     分支/合并架構的目的是以遞歸的方式将可以并行的任務拆分為更小的任務,然後将每個子任務的結果合并起來生成整體結果。要把子任務送出到ForkJoinPool必須建立RecursiveTask<R>的子類。需要實作它唯一的抽象方法 protected abstract R compute();  在這個方法中定義了将任務拆分成子任務的邏輯,以及無法拆分時生成單個子任務結果的邏輯。

  計算1到10000000的和

/**
 * Desc:Fork/Join架構的目的是以遞歸方式将可以并行的任務拆分為更小的任務,然後将每個子任務的結果合并起來生成一個整體結果。
 * 要把任務送出到ForkJoinPool必須建立RecursiveTask<T> 的一個子類
 * 
 * @author wei.zw
 * @since 2016年7月6日 下午9:27:56
 * @version v 0.1
 */
public class ForkJoinSumCalculator extends RecursiveTask<Long> {

	/**  */
	private static final long serialVersionUID = -8013303660374621470L;

	private final long[] numbers;

	private final int start;

	private final int end;

	private static final long THRESHOLD = 1000;

	/**
	 * @param numbers
	 * @param start
	 * @param end
	 */
	public ForkJoinSumCalculator(long[] numbers, int start, int end) {
		super();
		this.numbers = numbers;
		this.start = start;
		this.end = end;
	}

	/**
	 * @param numbers
	 */
	public ForkJoinSumCalculator(long[] numbers) {
		super();
		this.numbers = numbers;
		this.start = 0;
		this.end = numbers.length;
	}

	/**
	 * @see java.util.concurrent.RecursiveTask#compute()
	 */
	@Override
	protected Long compute() {
		int length = end - start;

		if (length <= THRESHOLD) {
			return computeSequentially();
		}
		ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
		leftTask.fork();
		ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
		rightTask.fork();
		Long rightResult = 0L;
		try {
			rightResult = rightTask.get();
		} catch (Exception e) {

		}

		Long leftResult = leftTask.join();
		return leftResult + rightResult;
	}

	/**
	 * 
	 * @return
	 * @author wei.zw
	 */
	private Long computeSequentially() {
		long sum = 0;
		for (int i = start; i < end; i++) {
			sum += numbers[i];
		}
		return sum;
	}

	public static void main(String[] args) {
		long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();
		long start = System.currentTimeMillis();
		System.out.println(new ForkJoinPool().invoke(new ForkJoinSumCalculator(numbers)) + " 耗時:"
				+ (System.currentTimeMillis() - start));

	}

}
           

 結果是:50000005000000 耗時:37

優化後的

/**
	 * @see java.util.concurrent.RecursiveTask#compute()
	 */
	@Override
	protected Long compute() {
		int length = end - start;

		if (length <= THRESHOLD) {
			return computeSequentially();
		}
		ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
		leftTask.fork();
		ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
		
		Long rightResult = rightTask.compute();
		

		Long leftResult = leftTask.join();
		return leftResult + rightResult;
	}
           

 計算結果是:50000005000000 耗時:25

使用Fork/Join架構的最佳做法:

  • 對一個任務調用join方法會阻塞調用方,直到該任務作出結果。是以,又必須要在兩個子任務的計算都開始之後再調用它。
  • 不應該在RecursiveTask内部使用ForkJoinPool的invoke方法,應該直接調用compute或者fork方法
  • 對子任務調用fork方法可以将這個子任務排進ForkJoinPool。同時對左右兩邊的子任務都調用似乎很自然,但是這樣做的效率比直接對其中一個調用compute方法低。這樣做可以為其中一個子任務重用同一線程,進而避免線上程池中多配置設定一個任務造成的開銷。

繼續閱讀