天天看點

如何正确的關閉線程池?

首先看源碼中的一句注釋:

A pool that is no longer referenced in a program and has no remaining threads will be shutdown automatically.

如果程式中不再持有線程池的引用,并且線程池中沒有線程時,線程池将會自動關閉。

線程池自動關閉的兩個條件:1、線程池的引用不可達;2、線程池中沒有線程;

這裡對于條件2解釋一下,線程池中沒有線程是指線程池中的所有線程都已運作完自動消亡。然而我們常用的FixedThreadPool的核心線程沒有逾時政策,是以并不會自動關閉。

展示兩種不同線程池 不關閉 的情況:

1、FixedThreadPool 示例

public static void main(String[] args) {
    while(true) {
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        executorService.execute(() -> System.out.println("running"));
        executorService = null;
    }
}
           

輸出結果:

running
......
running
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
at test.PoolTest.main(PoolTest.java:29)
           

因為FixedThreadPool的核心線程不會自動逾時關閉,使用時必須在适當的時候調用shutdown()方法。

2、 CachedThreadPool 示例

public static void main(String[] args) {
    while(true) {
        // 預設keepAliveTime為 60s
       ExecutorService executorService = Executors.newCachedThreadPool();
       ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executorService;
        // 為了更好的模拟,動态修改為1納秒
        threadPoolExecutor.setKeepAliveTime(1, TimeUnit.NANOSECONDS);
        threadPoolExecutor.execute(() -> System.out.println("running"));
    }
}
           

輸出結果:

running
running
running
running
running
......
           

CachedThreadPool 的線程 keepAliveTime 預設為 60s ,核心線程數量為 0 ,是以不會有核心線程存活阻止線程池自動關閉。 為了更快的模拟,構造後将 keepAliveTime 修改為1納秒,相當于線程執行完馬上會消亡,是以線程池可以被回收。實際開發中,如果CachedThreadPool 确實忘記關閉,在一定時間後是可以被回收的。但仍然建議顯示關閉。

然而,線程池關閉的意義不僅僅在于結束線程執行,避免記憶體溢出,因為大多使用的場景并非上述示例那樣 朝生夕死。線程池一般是持續工作的全局場景,如資料庫連接配接池。

本文更多要讨論的是當線程池調用shutdown方法後,會經曆些什麼?思考一下幾個問題:

  1. 是否可以繼續接受新任務?繼續送出新任務會怎樣?
  2. 等待隊列裡的任務是否還會執行?
  3. 正在執行的任務是否會立即中斷?

問題1:是否可以繼續接受新任務?繼續送出新任務會怎樣?

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    executor.execute(() -> System.out.println("before shutdown"));
    executor.shutdown();
    executor.execute(() -> System.out.println("after shutdown"));
}
           

輸出結果如下:

before shutdown
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task PoolTest$$Lambda$2/[email protected] rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at PoolTest.main(PoolTest.java:12)
           

當線程池關閉後,繼續送出新任務會抛出異常。這句話也不夠準确,不一定是抛出異常,而是執行拒絕政策,預設的拒絕政策是抛出異常。

問題2:等待隊列裡的任務是否還會執行?

public class WaitqueueTest {
    public static void main(String[] args) {
    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
    for(int i = 1; i <= 100 ; i++){
        workQueue.add(new Task(String.valueOf(i)));
    }
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10,TimeUnit.SECONDS, workQueue);
    executor.execute(new Task("0"));
    executor.shutdown();
    System.out.println("workQueue size = " + workQueue.size() + " after shutdown");
}
    static class Task implements Runnable{
        String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            for(int i = 1; i <= 10; i++){
                System.out.println("task " + name + " is running");
            }
            System.out.println("task " + name + " is over");
        }
    }
}
           

這個demo解釋一下,我們用LinkedBlockingQueue構造了一個線程池,線上程池啟動前,我們先将工作隊列填充100個任務,然後執行task 0 後立即shutdown()線程池,來驗證線程池關閉隊列的任務運作狀态。

輸出結果如下:

......
task 0 is running
task 0 is over
workQueue size = 100 after shutdown //表示線程池關閉後,隊列任然有100個任務
task 1 is running
......
task 100 is running
task 100 is over
           

從結果中我們可以看到,線程池雖然關閉,但是隊列中的任務任然繼續執行,是以用 shutdown()方式關閉線程池時需要考慮是否是你想要的效果。

如果你希望線程池中的等待隊列中的任務不繼續執行,可以使用shutdownNow()方法,将上述代碼進行調整,如下:

public class WaitqueueTest {
    public static void main(String[] args) {
    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
    for(int i = 1; i <= 100 ; i++){
        workQueue.add(new Task(String.valueOf(i)));
    }
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10,TimeUnit.SECONDS, workQueue);
    executor.execute(new Task("0"));
    // shutdownNow有傳回值,傳回被抛棄的任務list
    List<Runnable> dropList = executor.shutdownNow();
    System.out.println("workQueue size = " + workQueue.size() + " after shutdown");
    System.out.println("dropList size = " + dropList.size());
}

    static class Task implements Runnable{
        String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            for(int i = 1; i <= 10; i++){
                System.out.println("task " + name + " is running");
            }
            System.out.println("task " + name + " is over");
        }
    }
}
           

輸出結果如下:

task 0 is running
workQueue size = 0 after shutdown
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
dropList size = 100
task 0 is over
           

從上述輸出可以看到,隻有任務0執行完畢,其他任務都被drop掉了,dropList的size為100。通過dropList我們可以對未處理的任務進行進一步的處理,如log記錄,轉發等;

問題3:正在執行的任務是否會立即中斷?

要驗證這個問題,需要對線程的 interrupt 方法有一定了解。

關于 interrupt 方法:

首先,一個線程不應該由其他線程來強制中斷或停止,而是應該由線程自己自行停止。

是以,Thread.stop, Thread.suspend, Thread.resume 都已經被廢棄了。

而 Thread.interrupt 的作用其實也不是中斷線程,而是「通知線程應該中斷了」,具體到底中斷還是繼續運作,應該由被通知的線程自己處理。

具體來說,當對一個線程,調用 interrupt() 時,

① 如果線程處于被阻塞狀态(例如處于sleep, wait, join 等狀态),那麼線程将立即退出被阻塞狀态,并抛出一個InterruptedException異常。僅此而已。

② 如果線程處于正常活動狀态,那麼會将該線程的中斷标志設定為 true,僅此而已。被設定中斷标志的線程将繼續正常運作,不受影響。

interrupt() 并不能真正的中斷線程,需要被調用的線程自己進行配合才行。也就是說,一個線程如果有被中斷的需求,那麼就可以這樣做。

① 在正常運作任務時,經常檢查本線程的中斷标志位,如果被設定了中斷标志就自行停止線程。

② 在調用阻塞方法時正确處理InterruptedException異常。(例如,catch異常後就結束線程。)

public class InteruptTest {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        executor.execute(new Task("0"));
        Thread.sleep(1);
        executor.shutdown();
        System.out.println("executor has been shutdown");
}
    static class Task implements Runnable {
        String name;
        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            for (int i = 1; i <= 100 && !Thread.interrupted(); i++) {
                Thread.yield();
                System.out.println("task " + name + " is running, round " + i);
            }
        }
    }
}
           

輸出結果如下:

task 0 is running, round 1
task 0 is running, round 2
task 0 is running, round 3
......
task 0 is running, round 28
executor has been shutdown
......
task 0 is running, round 99
task 0 is running, round 100
           

為了展現在任務執行中打斷,在主線程進行短暫 sleep , task 中 調用 Thread.yield() ,出讓時間片。從結果中可以看到,線程池被關閉後,正則運作的任務沒有被 interrupt。說明shutdown()方法不會 interrupt 運作中線程。再将其改修改為shutdownNow() 後輸出結果如下:

task 0 is running, round 1
task 0 is running, round 2
......
task 0 is running, round 56
task 0 is running, round 57
task 0 is running, round 58
task 0 is running, round 59
executor has been shutdown
           

修改為shutdownNow() 後,task任務沒有執行完,執行到中間的時候就被 interrupt 後沒有繼續執行了。

總結,想要正确的關閉線程池,并不是簡單的調用shutdown方法那麼簡單,要考慮到應用場景的需求,如何拒絕新來的請求任務?如何處理等待隊列中的任務?如何處理正在執行的任務?

PS:線程被 interrupt 後,需要再run方法中單獨處理 interrupted 狀态,interrupt 更類似一個标志位,不會直接打斷線程的執行。

原文作者:徐志毅

原文連結:https://www.jianshu.com/p/bdf06e2c1541