天天看點

java.util.concurrent包下的Fork/Join架構1.概念介紹2.建立一個Fork/join池3.加入任務的結果(用RecursiveTask傳回任務結果)4.異步運作任務5.在任務中抛出異常6.取消任務

Fork/Join是JDK1.7的新特性。這個架構被設計用來解決可以使用分而治之技術将任務分解成更小的問題。

1.概念介紹

參考連結:http://ifeve.com/fork-join-1/

1)fork和join如何了解呢?

fork就是建立分支的意思:如果任務大小小于我們能接受的大小,那線程直接執行;否則,我們會建立分支,由兩個子線程來執行原任務,依次遞歸;

join就是線程等待的意思:父任務線程建立了子任務線程,他需要等待子任務線程執行完畢,傳回最終結果。

2)主要的幾個類:

ForkJoinPool:執行ForkJoin的線程池,實作了ExecutorService接口;

ForkJoinTask:抽象類,是ForkJoinPool池的任務基類。其有兩個實作的子類:

RecursiveAction:針對沒有傳回結果的子任務集;

RecursiveTask:針對有傳回結果的子任務集。

2.建立一個Fork/join池

參考連結:http://ifeve.com/fork-join-2/

3)Fork/join代碼:在RecursiveAction中實作compute()方法,方法中的邏輯如下:

If (problem size < default size){

execute(tasks);

} else {

tasks=divide(task);

//一般會用invokeAll(task1,task2..)

}

RecursiveAction主要屬性:

1)要操作的大對象。比如一個數組。

2)自定義線程不細分最小執行量(threshold)。比如子數組長度<100執行,100就是一個自定義的threshold。

3)要操作對象的指針,首指針+尾指針。因為每個子任務不可能将原大對象複制一遍去處理,是以每個子線程操作的同一個大對象,那就需要兩個指針,指明操作的段

4)對一個産品數組進行價格更新的示例,來自上一個連接配接中的示例。下面是手敲:

(ForkJoinPool這個池可用被監視,但本機測試效果不理想。還沒搞清那些方法的真正含義,搞不動了,等實際用時再看)

@SuppressWarnings("serial")
public class ForkJoinModTest extends RecursiveAction{
	
	private List<Producer> ps;
	private int startP;
	private int endP;
	
	public ForkJoinModTest(){}
	public ForkJoinModTest(List<Producer> ps,int startP,int endP){
		this.ps = ps;
		this.startP = startP;
		this.endP = endP;
	}
	
	private int threshold = 100;	//線程細分處理的最小量

	@Override
	protected void compute() {
		if(endP-startP < threshold)	//可以直接處理
		{
			this.updatePrice();
		}else	//繼續細分處理
		{
			int middle = (startP + endP) >>> 1;		//除以2
			//分成兩個子任務
			ForkJoinModTest task1 = new ForkJoinModTest(ps,startP,middle+1);
			ForkJoinModTest task2 = new ForkJoinModTest(ps,middle+1,endP);
			//用invokeAll()調用執行兩個子任務
			super.invokeAll(task1,task2);			
		}
	}
	
	private void updatePrice(){
		for(;startP<endP;startP++){
			Producer p = ps.get(startP);
			p.setPrice(12);	//價格統一調高2元
		}
		try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		List<Producer> ps = genarateProducers(1000000);	//1w個待更新的産品
		//ForkJoinPool,預設線程池數量為處理器核數
		ForkJoinPool fjPool = new ForkJoinPool();	
		//建立Fork/join任務
		ForkJoinModTest fjTask = new ForkJoinModTest(ps,0,ps.size());
		//将任務交給線程池執行
		fjPool.execute(fjTask);
		
		//上面的代碼已經完成了工作任務。下面的代碼用于監視ForkJoinPool工作情況
		do{
			System.out.printf("池中活動線程數量: %d\n",fjPool.getActiveThreadCount());
			System.out.printf("偷竊算法數量: %d\n",fjPool.getStealCount());
			System.out.printf("并行數量: %d\n",fjPool.getParallelism());
			System.out.println();
			try {
				Thread.sleep(5);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}while(!fjTask.isDone());
		
		fjPool.shutdown();	//實際中使用完,也記得關閉線程池
		
		if(fjTask.isCompletedNormally()){
			System.out.println("價格更新成功!");
		}
		for (int i=0; i<ps.size(); i++){
			Producer product=ps.get(i);
			if (product.getPrice()!=12) {
				System.out.printf("Product %s: %f\n",product.getName(),product.getPrice());
			}
		}
		System.out.println("--------the end--------");
	}
	
	public static List<Producer> genarateProducers(long x){
		List<Producer> l = new ArrayList<Producer>();
		for(int i=0;i<x;i++){
			Producer p = new ForkJoinModTest().new Producer();
			p.setName("p"+i);
			p.setPrice(10);
		}
		return l;
	}
	
	class Producer{
		private String name;
		private int price;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public int getPrice() {
			return price;
		}
		public void setPrice(int price) {
			this.price = price;
		}
	}
}
           

3.加入任務的結果(用RecursiveTask傳回任務結果)

參考連結: http://ifeve.com/fork-join-3/

問題:

1)報錯:java.util.concurrent.ExecutionException: java.lang.StackOverflowError

ForkJoin不會對堆棧進行控制,編寫代碼時注意方法遞歸不能超過jvm的記憶體,如果必要需要調整jvm的記憶體:在Eclipse中JDK的配置中加上   -XX:MaxDirectMemorySize=128(預設是64M)。改為128後不報棧溢出,但是報下一個錯。

2)java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node

最終問題解決:

編碼錯誤。在DocumentTask和LineTask兩個類的compute()方法中,int midd = (start + end) >>> 1;。但開始時寫錯了,寫成int midd = (start + end) >>> 2;造成子任務的處理長度不平衡。是以報上面的錯。修改後,上面的1)、2)的錯都消失了。

手寫代碼參考:test\src\com\sitech\concurrent\forkjoin\recursivetask包

4.異步運作任務

參考連結:http://ifeve.com/fork-join-4/

異步運作任務是指:建立任務後,送出給ForkJoinPool異步執行,不等待子任務執行傳回結果。

在ForkJoinPool中執行ForkJoinTask時,你可以使用同步或異步方式來實作。當你使用同步方式時,送出任務給池的方法直到送出的任務完成它的執行,才會傳回結果。當你使用異步方式時,送出任務給執行者的方法将立即傳回,是以這個任務可以繼續執行。

當你使用同步方法,調用這些方法(比如:invokeAll()方法)的任務将被阻塞,直到送出給池的任務完成它的執行。這允許ForkJoinPool類使用work-stealing算法,配置設定一個新的任務給正在執行睡眠任務的工作線程。反之,當你使用異步方法(比如:fork()方法),這個任務将繼續它的執行,是以ForkJoinPool類不能使用work-stealing算法來提高應用程式的性能。在這種情況下,隻有當你調用join()或get()方法來等待任務的完成時,ForkJoinPool才能使用work-stealing算法。

連接配接例子:搜尋一個目錄下,包括子目錄中,檔案為特定字尾的檔案名,并列印。

例子中用的就是task.fork(),這個語句不會阻塞。是以當需要擷取list結果集時,有可能搜尋還為結束,需要用task.join()來等待并擷取結果。

5.在任務中抛出異常

參考連結:http://ifeve.com/fork-join-5/

這章将的是在RecursiveTask的compute()方法中不能抛出已檢測異常,如IOException或ClassNotFoundException等。即便實際運作時抛出了這些異常,程式也不會停止。是以在程式運作結束後,可以用:task.isCompletedAbnormally()來判斷任務是否抛出異常(true表示抛出過異常),如果為true,則task.getException()來擷取異常資訊。

用ForkJoinTask.completeExceptionally()方法可完成Fork/Join異常處理。

6.取消任務

參考連結: http://ifeve.com/fork-join-6/ 當你在一個ForkJoinPool類中執行ForkJoinTask對象,在它們開始執行之前,你可以取消執行它們。

但是在ForkJoinTask類中并不知道何時執行cancle()方法,而且每個任務都可能有喝多子任務,如果我們需要準确的取消某些還未送出的任務,就需要一個自定義的任務容器,将任務關聯起來,然後試着執行某些任務的cancle()方法來取消任務執行。

關于Fork/Join的使用說明還有很多,可以參考上面連結頁面中的擴充連結。暫時記錄這麼多,後續用的在檢視。