天天看點

Java的ThreadPoolExecutor使用幾點建議

背景

前段時間一個項目中因為涉及大量的線程開發,把jdk cocurrent的代碼重新再過了一遍。這篇文章中主要是記錄一下學習ThreadPoolExecutor過程中容易被人忽略的點,Doug Lea的整個類設計還是非常nice的

正文

先看一副圖,描述了ThreadPoolExecutor的工作機制:

整個ThreadPoolExecutor的任務處理有4步操作:

第一步,初始的poolSize < corePoolSize,送出的runnable任務,會直接做為new一個Thread的參數,立馬執行

第二步,當送出的任務數超過了corePoolSize,就進入了第二步操作。會将目前的runable送出到一個block queue中

第三步,如果block queue是個有界隊列,當隊列滿了之後就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務

第四步,如果第三步救急方案也無法處理了,就會走到第四步執行reject操作。

幾點說明:(相信這些網上一搜一大把,我這裡簡單介紹下,為後面做一下鋪墊) block queue有以下幾種實作:

1. ArrayBlockingQueue : 有界的數組隊列

2. LinkedBlockingQueue : 可支援有界/無界的隊列,使用連結清單實作

3. PriorityBlockingQueue : 優先隊列,可以針對任務排序

4. SynchronousQueue : 隊列長度為1的隊列,和Array有點差別就是:client thread送出到block queue會是一個阻塞過程,直到有一個worker thread連接配接上來poll task。

RejectExecutionHandler是針對任務無法處理時的一些自保護處理:

1. Reject 直接抛出Reject exception

2. Discard 直接忽略該runnable,不可取

3. DiscardOldest 丢棄最早入隊列的的任務

4. CallsRun 直接讓原先的client thread做為worker線程,進行執行

容易被人忽略的點: 1. pool threads啟動後,以後的任務擷取都會通過block queue中,擷取堆積的runnable task.

是以建議: block size >= corePoolSize ,不然線程池就沒任何意義2. corePoolSize 和 maximumPoolSize的差別, 和大家正常了解的資料庫連接配接池不太一樣。 * 據dbcp pool為例,會有minIdle , maxActive配置。minIdle代表是常駐記憶體中的threads數量,maxActive代表是工作的最大線程數。 * 這裡的corePoolSize就是連接配接池的maxActive的概念,它沒有minIdle的概念(每個線程可以設定keepAliveTime,超過多少時間多有任務後銷毀線程,但不會固定保持一定數量的threads)。 * 這裡的maximumPoolSize,是一種救急措施的第一層。當threadPoolExecutor的工作threads存在滿負荷,并且block queue隊列也滿了,這時代表接近崩潰邊緣。這時允許臨時起一批threads,用來處理runnable,處理完後立馬退出。

是以建議: maximumPoolSize >= corePoolSize =期望的最大線程數。 (我曾經配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊列,最後就成了單線程工作的pool。典型的配置錯誤)

3. 善用blockqueue和reject組合. 這裡要重點推薦下CallsRun的Rejected Handler,從字面意思就是讓調用者自己來運作。 我們經常會線上上使用一些線程池做異步處理,比如我前面做的 (業務層)異步并行加載技術分析和設計, 将原本串行的請求都變為了并行操作,但過多的并行會增加系統的負載(比如軟中斷,上下文切換)。是以肯定需要對線程池做一個size限制。但是為了引入異步操作後,避免因在block queue的等待時間過長,是以需要在隊列滿的時,執行一個callsRun的政策,并行的操作又轉為一個串行處理,這樣就可以保證盡量少的延遲影響。

是以建議: RejectExecutionHandler = CallsRun , blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個考慮就是瞬間高峰處理,允許一個thread等待一個runnable任務)

Btrace容量規劃

再提供一個btrace腳本,分析線上的thread pool容量規劃是否合理,可以運作時輸出poolSize等一些資料。

import static com.sun.btrace.BTraceUtils.addToAggregation;import static com.sun.btrace.BTraceUtils.field;import static com.sun.btrace.BTraceUtils.get;import static com.sun.btrace.BTraceUtils.newAggregation;import static com.sun.btrace.BTraceUtils.newAggregationKey;import static com.sun.btrace.BTraceUtils.printAggregation;import static com.sun.btrace.BTraceUtils.println;import static com.sun.btrace.BTraceUtils.str;import static com.sun.btrace.BTraceUtils.strcat;import java.lang.reflect.Field;import java.util.concurrent.atomic.AtomicInteger;import com.sun.btrace.BTraceUtils;import com.sun.btrace.aggregation.Aggregation;import com.sun.btrace.aggregation.AggregationFunction;import com.sun.btrace.aggregation.AggregationKey;import com.sun.btrace.annotations.BTrace;import com.sun.btrace.annotations.Kind;import com.sun.btrace.annotations.Location;import com.sun.btrace.annotations.OnEvent;import com.sun.btrace.annotations.OnMethod;import com.sun.btrace.annotations.OnTimer;import com.sun.btrace.annotations.Self;@BTracepublic class AsyncLoadTracer { private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0); private static Aggregation histogram = newAggregation(AggregationFunction.QUANTIZE); private static Aggregation average = newAggregation(AggregationFunction.AVERAGE); private static Aggregation max = newAggregation(AggregationFunction.MAXIMUM); private static Aggregation min = newAggregation(AggregationFunction.MINIMUM); private static Aggregation sum = newAggregation(AggregationFunction.SUM); private static Aggregation count = newAggregation(AggregationFunction.COUNT); @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY)) public static void executeMonitor(@Self Object self) { Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize"); Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize"); Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue"); Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count"); int poolSize = (Integer) get(poolSizeField, self); int largestPoolSize = (Integer) get(largestPoolSizeField, self); int queueSize = (Integer) get(countField, get(workQueueField, self)); println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "), str(largestPoolSize)), " queueSize : "), str(queueSize))); } @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY)) public static void rejectMonitor(@Self Object self) { String name = str(self); if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) { BTraceUtils.incrementAndGet(rejecctCount); } } @OnTimer(1000) public static void rejectPrintln() { int reject = BTraceUtils.getAndSet(rejecctCount, 0); println(strcat("reject count in 1000 msec: ", str(reject))); AggregationKey key = newAggregationKey("rejectCount"); addToAggregation(histogram, key, reject); addToAggregation(average, key, reject); addToAggregation(max, key, reject); addToAggregation(min, key, reject); addToAggregation(sum, key, reject); addToAggregation(count, key, reject); } @OnEvent public static void onEvent() { BTraceUtils.truncateAggregation(histogram, 10); println("---------------------------------------------"); printAggregation("Count", count); printAggregation("Min", min); printAggregation("Max", max); printAggregation("Average", average); printAggregation("Sum", sum); printAggregation("Histogram", histogram); println("---------------------------------------------"); }}

運作結果:

1 poolSize : 1, largestPoolSize = 10, queueSize = 10

2 reject count in 1000msec: 0

說明:

1. poolSize 代表為目前的線程數

2. largestPoolSize 代表為曆史最大的線程數

3. queueSize 代表blockqueue的目前堆積的size

4. reject count 代表在1000ms内的被reject的數量。

轉自:http://www.iteye.com/topic/1118660

繼續閱讀