天天看點

java fork hadoop,java并發程式設計之Fork/Join 架構

ForkJoin是Java7提供的原生多線程并行處理架構,其基本思想是将大人物分割成小任務,最後将小任務聚合起來得到結果。它非常類似于HADOOP提供的MapReduce架構,隻是MapReduce的任務可以針對叢集内的所有計算節點,可以充分利用叢集的能力完成計算任務。ForkJoin更加類似于單機版的MapReduce。

我們要使用ForkJoin架構,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join的操作機制,通常我們不直接繼承ForkjoinTask類,隻需要直接繼承其子類。

1. RecursiveAction,用于沒有傳回結果的任務

2. RecursiveTask,用于有傳回值的任務

· ForkJoinPool:task要通過ForkJoinPool來執行,分割的子任務也會添加到目前工作線程的雙端隊列中,進入隊列的頭部。當一個工作線程中沒有任務時,會從其他工作線程的隊列尾部擷取一個任務。

ForkJoin架構使用了工作竊取的思想(work-stealing),算法從其他隊列中竊取任務來執行。

下面代碼,使我模拟的一個場景,即進行數組求和,假如我們的數組大小為400,計算機每次求一百個數字之和需要花費一秒鐘(使用線程睡眠模拟)。那麼單線程情況下大概需要4秒鐘,而我們使用4個線程分别進行100個數字的求和,其并行計算,隻需要1秒鐘。

代碼如下:

package com.zt.thread;

import java.util.Random;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.ForkJoinTask;

import java.util.concurrent.RecursiveTask;

class CountTask extends RecursiveTask {

private static final long serialVersionUID = 1L;

// 100個數組進行一次計算

private static final int SIZE = 100;

// 存儲本次計算的數字

private int[] array;

// 計算起始位置

private int start;

// 計算結束位置

private int end;

CountTask(int[] array, int start, int end) {

this.array = array;

this.start = start;

this.end = end;

}

@Override

protected Long compute() {

// 如果任務小于我們規定的閥值,則直接進行計算

if (end - start <= SIZE) {

long sum = 0;

for (int i = start; i < end; i++) {

sum += array[i];

}

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

return sum;

}

// 任務太大,将任務一分為二

int middle = (end + start) / 2;

CountTask ct1 = new CountTask(array, start, middle);

CountTask ct2 = new CountTask(array, middle, end);

invokeAll(ct1, ct2);

long res1 = ct1.join();

long res2 = ct2.join();

return res1 + res2;

}

}

public class ForkAndJoin {

private static void fillRandomArray(int[] array) {

Random rd=new Random();

for (int i = 0; i < array.length; i++) {

array[i] = rd.nextInt(100);

}

}

// 單線程進行數組求和計算

private static long computeArray(int[] array) {

long sum = 0;

for (int i = 0; i < array.length; i++) {

sum += array[i];

if ((i + 1) % 100 == 0) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

}

}

return sum;

}

public static void main(String[] args) {

// 擷取cpu核心數

// Runtime.getRuntime().availableProcessors()

int[] test = new int[400];

fillRandomArray(test);

// 模拟單個線程進行計算,假設,我們每計算100個數字,需要消耗一秒鐘(這裡我們使用線程睡眠1秒鐘)

long startTime1 = System.currentTimeMillis();

Long result1 = computeArray(test);

long endTime1 = System.currentTimeMillis();

System.out.println("single Thread sum: " + result1 + " in "

+ (endTime1 - startTime1) + " ms.");

// 使用fork/join 進行并行計算,我的電腦四核 可以使用

// Runtime.getRuntime().availableProcessors()

ForkJoinPool fp = new ForkJoinPool(Runtime.getRuntime()

.availableProcessors());

ForkJoinTask ft = new CountTask(test, 0, test.length);

long startTime = System.currentTimeMillis();

Long result = fp.invoke(ft);

long endTime = System.currentTimeMillis();

System.out.println("Fork/join sum: " + result + " in "

+ (endTime - startTime) + " ms.");

}

}

執行結果:

java fork hadoop,java并發程式設計之Fork/Join 架構

image.png

注意,這裡有個特别需要注意的地方,在劃分任務時,很多人會這樣寫。

int middle = (end + start) / 2;

CountTask ct1 = new CountTask(array, start, middle);

CountTask ct2 = new CountTask(array, middle, end);

ct1.fork();

ct2.fork();

long res1 = ct1.join();

long res2 = ct2.join();

因為compute()方法其實本身就是一個線程,這樣寫,就讓compute()所在的線程閑置了。是以應當使用invokeAll()方法,invokeAll的N個任務中,其中N-1個任務會使用fork()交給其它線程執行,但是,它還會留一個任務自己執行,這樣,就充分利用了線程池,保證沒有空閑的不幹活的線程。(廖學峰大大的教程真的不錯,強烈推薦)。