天天看點

并發程式設計:Executors(二、自定義線程池)

        前面介紹到,Executor的線程池工廠通過ThreadPoolExecutor來建構帶有特定功能的線程池,同樣,也可以使用ThreadPoolExecutor來建構自定義線程池。

        通過ThreadPoolExecutor建構自定義線程池比較關鍵的是其構造函數中所傳入的隊列類型。

        使用有界隊列時:若有新的任務需要執行時,如果線程池實際線程數小于corePoolSize,則優先建立線程,若大于corePoolSize,則會将任務加入隊列,若隊列已滿,則在總線程數不大于maximumPoolSize的前提下建立新的線程,若線程數大于maximumPoolSize,則執行拒絕政策或其他自定義處理政策。下面舉一段小例子:

類:有界隊列的線程池

public static void main(String[] args) {
		//自定義線程池:核心線程數1、最大線程數2、時間組合:數量6、時間組合:機關秒、任務隊列(有界隊列):數量3、自定義拒絕政策
		ThreadPoolExecutor myPool = new ThreadPoolExecutor(1,2,6,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),new 自定義拒絕政策());
		//任務
		MyTask task1 = new MyTask(1,"張三送出的任務:吃飯");
		MyTask task2 = new MyTask(2,"李四送出的任務:睡覺");
		MyTask task3 = new MyTask(3,"王五送出的任務:寫BUG");
		MyTask task4 = new MyTask(4,"趙六送出的任務:錢七,你今天帶藥了嗎?");
		MyTask task5 = new MyTask(5,"錢七送出的任務:今天我帶的是速效救心丸!");
		MyTask task6 = new MyTask(6,"呂八送出的任務:我是秀才,我要去找芙妹!");
		//線程池執行任務
		myPool.execute(task1);
		myPool.execute(task2);
		myPool.execute(task3);
		myPool.execute(task4);
		myPool.execute(task5);
		//myPool.execute(task6);
		//優雅關機
		myPool.shutdown();
                //myPool.shutdownNow();
	}
           

運作效果如圖:

MyTask [taskId=1, taskName=張三送出的任務:吃飯]
MyTask [taskId=5, taskName=錢七送出的任務:今天我帶的是速效救心丸!]
MyTask [taskId=2, taskName=李四送出的任務:睡覺]
MyTask [taskId=3, taskName=王五送出的任務:寫BUG]
MyTask [taskId=4, taskName=趙六送出的任務:錢七,你今天帶藥了嗎?]
           

        顯然,任務12345的執行順序為:1先到且立即執行,2、3、4任務到達後存放進ArrayBlockingQueue隊列,任務5到達後線程池開啟最大線程數(2),且立即執行了任務5,之後再依次執行了ArrayBlockingQueue隊列裡邊的任務2、3、4。

将任務六放行,運作結果如圖:

MyTask [taskId=1, taskName=張三送出的任務:吃飯]
嘀 ___好人卡
MyTask [taskId=5, taskName=錢七送出的任務:今天我帶的是速效救心丸!]
MyTask [taskId=2, taskName=李四送出的任務:睡覺]
MyTask [taskId=3, taskName=王五送出的任務:寫BUG]
MyTask [taskId=4, taskName=趙六送出的任務:錢七,你今天帶藥了嗎?]
           

       顯然,在任務5到達後(此時任務1正在執行中,任務2、3、4已經放入隊列),任務5被立即執行,此時任務6到達,但此時線程池線程個數已經達到最大且任務隊列也已經滿了,是以執行了JDK提供的預設的拒絕政策,任務6不被執行,而在任務隊列中的任務2、3、4則繼續依次執行。

        使用無界隊列(LinkedBlockingQueue)時:與有界隊列相比,除非系統資源耗盡,否則不會出現任務入隊失敗的情況。當有新任務到來時,系統的線程數小于corePoolSize時,則建立線程以執行任務,當線程數達到核心線程數個數之後就不會再增加線程,如果此時有新的任務到達,則将任務放到任務隊列裡邊。當任務建立和處理的速度差異很大時,任務隊列就會持續增加,知道系統記憶體被耗盡。下面舉一段小例子:

public class 無界隊列的線程池 implements Runnable {
	private static AtomicInteger count = new AtomicInteger(0);
	@Override
	public void run() {
		try {
			System.err.println("這是任務"+count.incrementAndGet());
			Thread.sleep(2*1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) throws Exception {
		BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
		ExecutorService executor = new ThreadPoolExecutor(5,10,120l,TimeUnit.SECONDS,queue);
		for(int index = 0;index<50;index++) {
			executor.execute(new 無界隊列的線程池());
		}
		
		int size = queue.size();
		while(size > 0 ) {
			Thread.sleep(1*1000);
			System.err.println("queue size is :"+size);
			size = queue.size();
		}
	}

}
           

        JDK提供的拒絕政策有:AbortPolicy(直接抛出一個異常,阻止系統正常工作)、CallerRunsPolicy(隻要線程池未關閉,直接在調用者線程中運作目前被丢棄的任務)、DiscardOldestPolicy(丢棄最老的一個請求,嘗試再次送出目前任務)、DiscardPolicy(丢棄無法處理的任務,不給予任何處理)。

        自定義拒絕政策可以通過實作RejectedExecutionHandler接口來建構。

繼續閱讀