天天看點

線程池使用小結

既然線程池是日常工作非常常見的知識且使用過程中需要對此有着充分的認知,是以今天就總結一下線程池的常見知識點。

線程池使用小結
如上圖:阿裡巴巴 Java 開發手冊中對于線程池的建立有着明确的規範。

簡單的例子

如下使用

ThreadPoolExecutor

實作了自定義線程池完成

Callable

的任務:

class ImpCallable implements Callable<String> {
    // 核心線程數
    private static final int CORE_POOL_SIZE=2;
    // 最大線程數
    private static final int MAX_POOL_SIZE=4;
    // 線程數大于 corePoolSize 線程持續時間
    private static final int KEEP_ALIVE_TIME=1;
    // 阻塞隊列的大小
    private static final int QUEUE_CAPACITY=5;
    private static final TimeUnit UNIT = TimeUnit.SECONDS;
    // 自定義線程名
    private static final String THREAD_NAME = "my-self-thread-pool-%d";

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                UNIT,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new BasicThreadFactory.Builder().namingPattern(THREAD_NAME).build(),
                new ThreadPoolExecutor.CallerRunsPolicy());
        ImpCallable task = new ImpCallable();
        List<Future<String>> futureList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            // 送出任務到線程池
            Future<String> future = executor.submit(task);
            // 任務結果 future 加入結果隊列
            futureList.add(future);
        }

        for (Future<String> fut : futureList) {
            try {
                // 取出結果
                System.out.println(new Date() + "--" + fut.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        executor.shutdown();

        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("All threads Finished");
    }

    @Override
    public String call() throws Exception {
        Thread.sleep(2000L);
        return Thread.currentThread().getName();
    }
}
      

阻塞隊列

用來儲存等待被執行的任務的阻塞隊列,Java 中提供了如下阻塞隊列:

  1. ArrayBlockingQueue

    :基于數組結構的有界阻塞隊列,其構造必須指定大小。對象按 FIFO 排序。
  2. LinkedBlockingQuene

    :基于連結清單結構的阻塞隊列,大小不固定,若不指定大小,其大小有

    Integer.MAX_VALUE

    來決定。對象按 FIFO 排序。吞吐量通常要高于

    ArrayBlockingQuene

  3. SynchronousQuene

    :特殊的

    BlockingQueue

    ,對其的操作必須是放和取交替完成。
  4. priorityBlockingQuene

    :類似于

    LinkedBlockingQueue

    ,對象的排序由對象的自然順序或者構造函數的

    Comparator

    決定。

自定義線程名

建立線程的工廠,通過自定義的線程工廠可以給每個建立的線程設定一個具有識别度的線程名。

關于這一點非常有必要,友善快速定位問題以及監控線程池。

拒絕政策

線程池的飽和政策,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續送出任務,必須采取一種政策處理該任務,線程池提供了4種政策:

  1. AbortPolicy

    直接抛出異常,預設政策,可以及時發現線程池的瓶頸。
  2. CallerRunsPolicy

    使用調用者所在的線程來執行任務,新任務不會丢失,采用誰送出誰負責的政策,有效控制線程池壓力。
  3. DiscardOldestPolicy

    丢棄阻塞隊列中靠最前的任務,并執行目前任務,該政策存在丢失任務的風險,不建議使用。
  4. DiscardPolicy

    直接丢棄任務,該政策簡單粗暴以保證系統可以為主要目标,不建議使用。

當然也可以根據應用場景實作

RejectedExecutionHandler

接口,自定義飽和政策。如記錄日志或持久化存儲不能處理的任務。

線程池大小

關于線程池線程大小可以根據實際業務場景具體設定。

推薦個适用面比較廣的公式( N 為 CPU 核心數):

  • CPU 密集型任務 N+1 。
  • IO 密集型任務 2N 。

線程池狀态

線程池使用小結

如上圖線程池的狀态分為五種,分别對應 Java 中五個 int 字段:

private static final int RUNNING = -536870912;
private static final int SHUTDOWN = 0;
private static final int STOP = 536870912;
private static final int TIDYING = 1073741824;
private static final int TERMINATED = 1610612736;      
  • RUNNING

    線程建立成功初始化狀态,能夠接收新任務,以及對已添加的任務進行處理。
  • SHUTDOWN

    不接收新任務,但能處理已添加的任務。
  • STOP

    不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務。
  • TIDYING

    當所有的任務已終止隊列任務也為空線程池會變為

    TIDYING

    狀态。
  • TERMINATED

    線程池徹底終止,由

    TIDYING

    狀态變成

    TERMINATED

監控線程池

我們可以通過第三方元件監控線程池的運作狀态比如 SpringBoot 中的 Actuator 。

除此之外,我們還可以利用

ThreadPoolExecutor

的相關 API 監測線程池狀态。如下圖我們可以輕松獲得線程池的各項參數,結合郵件實時監測線程池健康狀況。

線程池使用小結