背景
前段時間一個項目中因為涉及大量的線程開發,把jdk cocurrent的代碼重新再過了一遍。這篇文章中主要是記錄一下學習ThreadPoolExecutor過程中容易被人忽略的點,Doug Lea的整個類設計還是非常nice的
正文
先看一副圖,描述了ThreadPoolExecutor的工作機制:
整個ThreadPoolExecutor的任務處理有4步操作:
- 第一步,初始的poolSize < corePoolSize,送出的runnable任務,會直接做為new一個Thread的參數,立馬執行
- 第二步,當送出的任務數超過了corePoolSize,就進入了第二步操作。會将目前的runable送出到一個block queue中
- 第三步,如果block queue是個有界隊列,當隊列滿了之後就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務
- 第四步,如果第三步救急方案也無法處理了,就會走到第四步執行reject操作。
幾點說明:(相信這些網上一搜一大把,我這裡簡單介紹下,為後面做一下鋪墊)
-
block queue有以下幾種實作:
1. ArrayBlockingQueue : 有界的數組隊列
2. LinkedBlockingQueue : 可支援有界/無界的隊列,使用連結清單實作
3. PriorityBlockingQueue : 優先隊列,可以針對任務排序
4. SynchronousQueue : 隊列長度為1的隊列,和Array有點差別就是:client thread送出到block queue會是一個阻塞過程,直到有一個worker thread連接配接上來poll task。
-
RejectExecutionHandler是針對任務無法處理時的一些自保護處理:
1. Reject 直接抛出Reject exception
2. Discard 直接忽略該runnable,不可取
3. DiscardOldest 丢棄最早入隊列的的任務
4. CallsRun 直接讓原先的client thread做為worker線程,進行執行
容易被人忽略的點: 1. pool threads啟動後,以後的任務擷取都會通過block queue中,擷取堆積的runnable task.
是以建議: block size >= corePoolSize ,不然線程池就沒任何意義 2. corePoolSize 和 maximumPoolSize的差別, 和大家正常了解的資料庫連接配接池不太一樣。 * 據dbcp pool為例,會有minIdle , maxActive配置。minIdle代表是常駐記憶體中的threads數量,maxActive代表是工作的最大線程數。 * 這裡的corePoolSize就是連接配接池的maxActive的概念,它沒有minIdle的概念(每個線程可以設定keepAliveTime,超過多少時間多有任務後銷毀線程,預設隻會針對maximumPoolSize參數的線程生效,可以設定allowCoreThreadTimeOut=true,就可以對corePoolSize進行idle回收)。 * 這裡的maximumPoolSize,是一種救急措施的第一層。當threadPoolExecutor的工作threads存在滿負荷,并且block queue隊列也滿了,這時代表接近崩潰邊緣。這時允許臨時起一批threads,用來處理runnable,處理完後通過keepAliveTime進行排程回收。
是以建議: maximumPoolSize >= corePoolSize =期望的最大線程數。 (我曾經配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊列,最後就成了單線程工作的pool。典型的配置錯誤)
3. 善用blockqueue和reject組合. 這裡要重點推薦下CallsRun的Rejected Handler,從字面意思就是讓調用者自己來運作。 我們經常會線上上使用一些線程池做異步處理,比如我前面做的 (業務層)異步并行加載技術分析和設計, 将原本串行的請求都變為了并行操作,但過多的并行會增加系統的負載(比如軟中斷,上下文切換)。是以肯定需要對線程池做一個size限制。但是為了引入異步操作後,避免因在block queue的等待時間過長,是以需要在隊列滿的時,執行一個callsRun的政策,并行的操作又轉為一個串行處理,這樣就可以保證盡量少的延遲影響。
是以建議: RejectExecutionHandler = CallsRun , blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個考慮就是瞬間高峰處理,允許一個thread等待一個runnable任務)
Btrace容量規劃
再提供一個btrace腳本,分析線上的thread pool容量規劃是否合理,可以運作時輸出poolSize等一些資料。
import static com.sun.btrace.BTraceUtils.addToAggregation;
import static com.sun.btrace.BTraceUtils.field;
import static com.sun.btrace.BTraceUtils.get;
import static com.sun.btrace.BTraceUtils.newAggregation;
import static com.sun.btrace.BTraceUtils.newAggregationKey;
import static com.sun.btrace.BTraceUtils.printAggregation;
import static com.sun.btrace.BTraceUtils.println;
import static com.sun.btrace.BTraceUtils.str;
import static com.sun.btrace.BTraceUtils.strcat;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;
import com.sun.btrace.BTraceUtils;
import com.sun.btrace.aggregation.Aggregation;
import com.sun.btrace.aggregation.AggregationFunction;
import com.sun.btrace.aggregation.AggregationKey;
import com.sun.btrace.annotations.BTrace;
import com.sun.btrace.annotations.Kind;
import com.sun.btrace.annotations.Location;
import com.sun.btrace.annotations.OnEvent;
import com.sun.btrace.annotations.OnMethod;
import com.sun.btrace.annotations.OnTimer;
import com.sun.btrace.annotations.Self;
/**
* 并行加載監控
*
* @author jianghang 2011-4-7 下午10:59:53
*/
@BTrace
public class AsyncLoadTracer {
private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0);
private static Aggregation histogram = newAggregation(AggregationFunction.QUANTIZE);
private static Aggregation average = newAggregation(AggregationFunction.AVERAGE);
private static Aggregation max = newAggregation(AggregationFunction.MAXIMUM);
private static Aggregation min = newAggregation(AggregationFunction.MINIMUM);
private static Aggregation sum = newAggregation(AggregationFunction.SUM);
private static Aggregation count = newAggregation(AggregationFunction.COUNT);
@OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY))
public static void executeMonitor(@Self Object self) {
Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize");
Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize");
Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue");
Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count");
int poolSize = (Integer) get(poolSizeField, self);
int largestPoolSize = (Integer) get(largestPoolSizeField, self);
int queueSize = (Integer) get(countField, get(workQueueField, self));
println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),
str(largestPoolSize)), " queueSize : "), str(queueSize)));
}
@OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY))
public static void rejectMonitor(@Self Object self) {
String name = str(self);
if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {
BTraceUtils.incrementAndGet(rejecctCount);
}
}
@OnTimer(1000)
public static void rejectPrintln() {
int reject = BTraceUtils.getAndSet(rejecctCount, 0);
println(strcat("reject count in 1000 msec: ", str(reject)));
AggregationKey key = newAggregationKey("rejectCount");
addToAggregation(histogram, key, reject);
addToAggregation(average, key, reject);
addToAggregation(max, key, reject);
addToAggregation(min, key, reject);
addToAggregation(sum, key, reject);
addToAggregation(count, key, reject);
}
@OnEvent
public static void onEvent() {
BTraceUtils.truncateAggregation(histogram, 10);
println("---------------------------------------------");
printAggregation("Count", count);
printAggregation("Min", min);
printAggregation("Max", max);
printAggregation("Average", average);
printAggregation("Sum", sum);
printAggregation("Histogram", histogram);
println("---------------------------------------------");
}
}
運作結果:
poolSize : 1 , largestPoolSize = 10 , queueSize = 10
reject count in 1000 msec: 0
說明:
1. poolSize 代表為目前的線程數
2. largestPoolSize 代表為曆史最大的線程數
3. queueSize 代表blockqueue的目前堆積的size
4. reject count 代表在1000ms内的被reject的數量
最後
這是我對ThreadPoolExecutor使用過程中的一些經驗總結,希望能對大家有所幫助,如有描述不對的地方歡迎拍磚。