天天看點

ThreadPoolExecutor幾點使用建議背景正文Btrace容量規劃最後

背景

前段時間一個項目中因為涉及大量的線程開發,把jdk cocurrent的代碼重新再過了一遍。這篇文章中主要是記錄一下學習ThreadPoolExecutor過程中容易被人忽略的點,Doug Lea的整個類設計還是非常nice的

正文

先看一副圖,描述了ThreadPoolExecutor的工作機制: 

ThreadPoolExecutor幾點使用建議背景正文Btrace容量規劃最後

整個ThreadPoolExecutor的任務處理有4步操作:

  • 第一步,初始的poolSize < corePoolSize,送出的runnable任務,會直接做為new一個Thread的參數,立馬執行
  • 第二步,當送出的任務數超過了corePoolSize,就進入了第二步操作。會将目前的runable送出到一個block queue中
  • 第三步,如果block queue是個有界隊列,當隊列滿了之後就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務
  • 第四步,如果第三步救急方案也無法處理了,就會走到第四步執行reject操作。

幾點說明:(相信這些網上一搜一大把,我這裡簡單介紹下,為後面做一下鋪墊)

  • block queue有以下幾種實作:

    1. ArrayBlockingQueue :  有界的數組隊列

    2. LinkedBlockingQueue : 可支援有界/無界的隊列,使用連結清單實作

    3. PriorityBlockingQueue : 優先隊列,可以針對任務排序

    4. SynchronousQueue : 隊列長度為1的隊列,和Array有點差別就是:client thread送出到block queue會是一個阻塞過程,直到有一個worker thread連接配接上來poll task。

  • RejectExecutionHandler是針對任務無法處理時的一些自保護處理:

    1. Reject 直接抛出Reject exception

    2. Discard 直接忽略該runnable,不可取

    3. DiscardOldest 丢棄最早入隊列的的任務

    4. CallsRun 直接讓原先的client thread做為worker線程,進行執行

容易被人忽略的點: 1.  pool threads啟動後,以後的任務擷取都會通過block queue中,擷取堆積的runnable task.

是以建議:  block size >= corePoolSize ,不然線程池就沒任何意義 2.  corePoolSize 和 maximumPoolSize的差別, 和大家正常了解的資料庫連接配接池不太一樣。   *  據dbcp pool為例,會有minIdle , maxActive配置。minIdle代表是常駐記憶體中的threads數量,maxActive代表是工作的最大線程數。   *  這裡的corePoolSize就是連接配接池的maxActive的概念,它沒有minIdle的概念(每個線程可以設定keepAliveTime,超過多少時間多有任務後銷毀線程,預設隻會針對maximumPoolSize參數的線程生效,可以設定allowCoreThreadTimeOut=true,就可以對corePoolSize進行idle回收)。    * 這裡的maximumPoolSize,是一種救急措施的第一層。當threadPoolExecutor的工作threads存在滿負荷,并且block queue隊列也滿了,這時代表接近崩潰邊緣。這時允許臨時起一批threads,用來處理runnable,處理完後通過keepAliveTime進行排程回收。

是以建議:   maximumPoolSize >= corePoolSize =期望的最大線程數。 (我曾經配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊列,最後就成了單線程工作的pool。典型的配置錯誤)

3. 善用blockqueue和reject組合. 這裡要重點推薦下CallsRun的Rejected Handler,從字面意思就是讓調用者自己來運作。 我們經常會線上上使用一些線程池做異步處理,比如我前面做的 (業務層)異步并行加載技術分析和設計, 将原本串行的請求都變為了并行操作,但過多的并行會增加系統的負載(比如軟中斷,上下文切換)。是以肯定需要對線程池做一個size限制。但是為了引入異步操作後,避免因在block queue的等待時間過長,是以需要在隊列滿的時,執行一個callsRun的政策,并行的操作又轉為一個串行處理,這樣就可以保證盡量少的延遲影響。

是以建議:   RejectExecutionHandler = CallsRun ,  blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個考慮就是瞬間高峰處理,允許一個thread等待一個runnable任務)

Btrace容量規劃

再提供一個btrace腳本,分析線上的thread pool容量規劃是否合理,可以運作時輸出poolSize等一些資料。

import static com.sun.btrace.BTraceUtils.addToAggregation;
import static com.sun.btrace.BTraceUtils.field;
import static com.sun.btrace.BTraceUtils.get;
import static com.sun.btrace.BTraceUtils.newAggregation;
import static com.sun.btrace.BTraceUtils.newAggregationKey;
import static com.sun.btrace.BTraceUtils.printAggregation;
import static com.sun.btrace.BTraceUtils.println;
import static com.sun.btrace.BTraceUtils.str;
import static com.sun.btrace.BTraceUtils.strcat;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;

import com.sun.btrace.BTraceUtils;
import com.sun.btrace.aggregation.Aggregation;
import com.sun.btrace.aggregation.AggregationFunction;
import com.sun.btrace.aggregation.AggregationKey;
import com.sun.btrace.annotations.BTrace;
import com.sun.btrace.annotations.Kind;
import com.sun.btrace.annotations.Location;
import com.sun.btrace.annotations.OnEvent;
import com.sun.btrace.annotations.OnMethod;
import com.sun.btrace.annotations.OnTimer;
import com.sun.btrace.annotations.Self;

/**
 * 并行加載監控
 * 
 * @author jianghang 2011-4-7 下午10:59:53
 */
@BTrace
public class AsyncLoadTracer {

    private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0);
    private static Aggregation   histogram    = newAggregation(AggregationFunction.QUANTIZE);
    private static Aggregation   average      = newAggregation(AggregationFunction.AVERAGE);
    private static Aggregation   max          = newAggregation(AggregationFunction.MAXIMUM);
    private static Aggregation   min          = newAggregation(AggregationFunction.MINIMUM);
    private static Aggregation   sum          = newAggregation(AggregationFunction.SUM);
    private static Aggregation   count        = newAggregation(AggregationFunction.COUNT);

    @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY))
    public static void executeMonitor(@Self Object self) {
        Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize");
        Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize");
        Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue");

        Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count");
        int poolSize = (Integer) get(poolSizeField, self);
        int largestPoolSize = (Integer) get(largestPoolSizeField, self);
        int queueSize = (Integer) get(countField, get(workQueueField, self));

        println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),
                                     str(largestPoolSize)), " queueSize : "), str(queueSize)));
    }

    @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY))
    public static void rejectMonitor(@Self Object self) {
        String name = str(self);
        if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {
            BTraceUtils.incrementAndGet(rejecctCount);
        }
    }

    @OnTimer(1000)
    public static void rejectPrintln() {
        int reject = BTraceUtils.getAndSet(rejecctCount, 0);
        println(strcat("reject count in 1000 msec: ", str(reject)));
        AggregationKey key = newAggregationKey("rejectCount");
        addToAggregation(histogram, key, reject);
        addToAggregation(average, key, reject);
        addToAggregation(max, key, reject);
        addToAggregation(min, key, reject);
        addToAggregation(sum, key, reject);
        addToAggregation(count, key, reject);
    }

    @OnEvent
    public static void onEvent() {
        BTraceUtils.truncateAggregation(histogram, 10);
        println("---------------------------------------------");
        printAggregation("Count", count);
        printAggregation("Min", min);
        printAggregation("Max", max);
        printAggregation("Average", average);
        printAggregation("Sum", sum);
        printAggregation("Histogram", histogram);
        println("---------------------------------------------");
    }
}
           

運作結果:

poolSize : 1 , largestPoolSize = 10 , queueSize = 10
reject count in 1000 msec: 0
           

說明:

1. poolSize 代表為目前的線程數

2. largestPoolSize 代表為曆史最大的線程數

3. queueSize 代表blockqueue的目前堆積的size

4. reject count 代表在1000ms内的被reject的數量

最後

  這是我對ThreadPoolExecutor使用過程中的一些經驗總結,希望能對大家有所幫助,如有描述不對的地方歡迎拍磚。