分而治之政策
當我們要處理很大的資料,一個重要的思想就是把問題劃分成若幹個小問題處理,然後把小問題的結果進行整合,得到最終的結果。在JDK中有一個ForkJoin線程池,使用fork/join方法處理資料。Fork/Join 模式有自己的适用範圍。如果一個應用能被分解成多個子任務,并且組合多個子任務的結果就能夠獲得最終的答案,那麼這個應用就适合用 Fork/Join 模式來解決
ForkJoinPool線程池
java.util.concurrent.ForkJoinPool 繼承了 AbstractExecutorService類,這個線程池提供了下面幾種執行任務的方法,有些有傳回值有些沒有傳回值。有些要求任務是Runnable對象,有些要求任務是Callable對象
它的無參數構造方法,傳回跟系統CPU數量一樣的線程
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
其中一個重要的方法是,送出ForkJoinTask對象的任務,傳回ForkJoinTask對象的結果
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
ForkJoinTask任務
ForkJoinTask任務就是支援fork()分解和join()等待的任務。
ForkJoinTask是一個J.U.C下的一個抽象類
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
這個類中有兩個方法
fork()方法,當ForkJoinTask任務執行fork()方法,會把ForkJoinTask任務送出給ForkJoinPool
join方法,ForkJoinTask任務執行join()方法,會把結果傳回。
它有兩個實作類
① public abstract class RecursiveTask<V> extends ForkJoinTask<V> 帶有傳回結果的任務
② public abstract class RecursiveAction extends ForkJoinTask<Void> 沒有傳回結果的任務
通常我們并不直接繼承 ForkJoinTask,它包含了太多的抽象方法。針對特定的問題,我們可以選擇 ForkJoinTask 的不同子類來完成任務。RecursiveAction 是 ForkJoinTask 的一個子類,它代表了一類最簡單的 ForkJoinTask:不需要傳回值,當子任務都執行完畢之後,不需要進行中間結果的組合。如果我們從 RecursiveAction 開始繼承,那麼我們隻需要重載
protected void compute()
方法
RecursiveTask 是 ForkJoinTask 的一個子類,它是需要傳回值的任務,當子任務都執行完畢之後,進行中間結果的組合。如果我們從RecursiveTask 開始繼承,那麼我們隻需要重載
protected void compute()
方法,它可使用泛型指定一個傳回值的類型
下面這個例子解決這樣一個問題就用到帶有傳回結果的任務。
如何充分利用多核CPU,計算很大List中所有整數的和?
那一個大的任務分成幾個小任務來執行。而且在ForkJoinPool線程池中,每一個線程對應一個任務隊列,如果線程A已經完成了自己的任務,發現B線程的任務隊列還有很多,它會去B的任務隊列從後面拿任務來執行,如果任務時ForkJoinTask類型的,就支援join(),一起傳回。
package xidian.lili.reentrantlock;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Long>{
public int start;
public int end;
public final int THROLDSOLD=1000;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
protected Long compute() {
Long sum=0L;
if((end-start)<THROLDSOLD){
for(int i=start;i<=end;i++){
sum+=i;
}
}else{
int step=(start+end)/100;
ArrayList<CountTask> subtasks=new ArrayList();
int pos=start;
for(int i=0;i<100;i++){
int lastone=pos+step;
if(lastone>end)lastone=end;
CountTask subtask=new CountTask(pos,lastone);
subtasks.add(subtask);
pos=lastone+1;
subtask.fork();
}
for(CountTask subtask:subtasks){
sum+=subtask.join();
}
}
return sum;
}
public static void main(String[] args) {
CountTask task=new CountTask(0,10000);
ForkJoinPool pool=new ForkJoinPool();
ForkJoinTask<Long> result=pool.submit(task);
try {
long res=result.get();
System.out.println(res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
問題
使用Fork/Join模式,如果任務的層次劃分的很深,一直得不到傳回值,一可能就是任務太多,系統内的線程數多,導緻系統性能下降。二可能調用層次太深導緻棧溢出
當我們上述任務的子任務劃分更小,也就是調用層次更深,出現了OOM異常