天天看點

java線程和線程池的關閉Thread的關閉interrupt相關的處理總結

Thread的關閉

關于線程的關閉,衆所周知,有幾個已經廢棄的API

Thread#stop()

Thread#destroy()

廢棄的原因是會無條件的終止任務,而且不會讓出資源的鎖,不夠安全

正确的外部關閉方法是使用

Thread#interrupt()

這個方法隻是改變線程的一個狀态标志,并且這個标志位不是線程的NEW,RUNNABLE,BLOCKED這些狀态(如不熟悉請自行查閱線程狀态圖),而是一個單獨的狀态标志

public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
           

interrupt0()

是本地方法

blocker

是用來進行中斷的對象。如果線上程執行可中斷IO操作時調用

Thread#interupt()

,會調用該對象的處理方法

/* The object in which this thread is blocked in an interruptible I/O
     * operation, if any.  The blocker's interrupt method should be invoked
     * after setting this thread's interrupt status.
     */
    private volatile Interruptible blocker;
           

Interruptible是内部接口,不公開,但是nio中的InterruptibleChannel和Selector的實作類中

都聚合了這個對象,是以也可以說是為nio提供了實作的基礎的接口(不熟悉nio也就是非阻塞IO請自行查閱)

interrupt相關的處理

首先,當線程處于WAITING、TIMED_WAITING狀态時(注意不包含BLOCKED狀态),會檢測中斷的狀态标志并重置,并抛出

InterruptException

這個異常是受檢異常,是以必須要捕獲。

隻有一些特殊的方法才會讓線程進入這些狀态比如

Object#wait()

Thread.sleep()

或者

LockSupport#park()

等,也就是隻對鎖的等待隊列中元素進行檢測,并抛出異常。

在調用這些方法時注意對中斷異常的處理

但是線程處于BLOCKED狀态也就是鎖的同步隊列中的元素時并不會檢查這個狀态(這也很合理,畢竟阻塞狀态不應該能檢測變化),是以需要線上程運作方法中

檢測中斷狀态并處理

一個經典的模式是這樣

class Task implements Runnable {
    public void run() {
        while (!Thread.interrupted()) {//clean the interrupt flag
            doWork();
        }
        //some finalize work
        doClose();//當檢測到目前線程已中斷後,做一些關閉處理,之後run()運作結束,即線程關閉

    } 
    
    private void doWork() {
        //...
    }
    
    private void doClose() {
        //...
    }
    
   
}
           

當檢測到目前線程已中斷後,做一些關閉處理,之後run()運作結束,即線程關閉

檢測中斷的方法有幾個,常用的兩個中一個是

Thread.interrupted()

,這是靜态方法,檢測目前線程,檢測後會将中斷标志清除,

是以即使這個任務已經中斷,但線程可以重用

另一個是

Thread#isInterrupted()

隻會檢測不會清除中斷标志并且是執行個體方法,如果目前線程中隻用這個方法檢測中斷,就意味着

這個線程不會再重用,通常在向線程池送出可中斷的任務時不使用這個方法

#ThreadPoolExecutor 的 shutdown()方法和shutdownNow() 方法

線程池的關閉有

awaitTermination()

shutdown()

shutdownNow()

幾個方法

awaitTermination()

會等待所有已送出的任務執行完

shutdown()

會修改線程池狀态為SHUTDOWN,此狀态下工作線程執行配置設定的任務後會停止,但調用的同一時刻可以繼續配置設定任務,調用後不會再配置設定任務

(已送出但未配置設定的任務不會執行,)

shutdownNow()

會修改線程池狀态為STOP,此狀态下工作線程執行現有任務後會停止,但關閉的同時就不會配置設定任務,并且會傳回未完成的任務清單

shutdown()

為例分析

/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);//1
            interruptIdleWorkers();//2
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
           

advanceRunState(SHUTDOWN);

隻是使用CAS操作設定線程池的狀态

關鍵在于

interruptIdleWorkers();

這一步。

它最終調用的是interruptIdleWorkers(false);

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
           

基本上就是對工作線程依次設定中斷

shutdown()

最後的

tryTerminate();

方法主要是将線程池的狀态設為TERMINATED,

然後調用

protected void terminate()

方法,這是個空方法,可以被繼承用來做一些關閉後額外的工作。

調用順序在

onShutdown()

之後(這裡的onShutdown()是空方法,但是ScheduledThreadPoolExecutor不空)

shutdownNow()

方法的主要差別在于

advanceRunState(STOP);//1
interruptWorkers();//2
           

即會直接将線程池狀态設定為STOP,然後不是終止所有空閑線程(

interruptIdleWorkers()

),而是終止所有線程(

interruptWorkers()

)

,這個方法最終調用

Worker#interruptIfStarted()

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    //...
    void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
     //...       
        }
           

看上去

shutdown()

shutdownNow()

的實作沒什麼不同,都隻是對工作線程調用中斷方法,

為什麼前者是會是停止空閑線程,後者是停止所有線程呢?

原因在于線程池的狀态,和工作線程對線程池狀态的處理的處理

工作線程的

run()

方法是調用的線程池的

runWorker()

方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//如果線程池已經至少STOP狀态了,getTask()
                                                                  //會傳回null,任務不會被執行;
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
           

看着挺長但基本都是異常處理,比較難了解的就是有注釋那段對線程池狀态和中斷标志檢測的處理部分

這裡的關鍵在于負責配置設定任務的

getTask()

方法

首先每個工作線程建立的時候都聚合了一條線程和一個任務(firstTask),每次執行前都會取一個任務執行,如果線程池已經至少STOP狀态了,

getTask()

會傳回null,任務不會被執行(但如果工作線程是第一個任務(w.firstTask != null)或者已有任務正在運作,那麼仍然會繼續執行);

否則會再檢測一次線程池的狀态,并確定線程沒有中斷再開始執行任務(如果中間發生線程切換并且shutDownNow了,任務也不會執行)

如果是shutDown的同一時刻,task = getTask()不為空時會繼續執行,但最多執行一個任務就會停止運作(因為第二次檢測中斷不會通過)。

而shutDownNow則不會,同一時刻的所有線程都會停止工作。

可見shutDownNow也不是真正的立即關閉所有的線程,如果shutdownNow之前工作線程已經配置設定了任務,任務依然會執行,之後再關閉

總結

  • java線程中,使用設定和檢測中斷标志來關閉線程,
  • 中斷的處理有兩種,一種是等待狀态下檢測到中斷标志抛出異常,一種是運作時直接主動檢測
  • 使用中斷的目的是為了無副作用的關閉線程,為此正在執行的任務不能直接關閉,使用中斷就相當于發出指令,等所有工作線程忙完手頭任務後再立即結束工作
  • 線程池中為了把關閉操作區分出不同的等級,使用了複雜的狀态變化和檢測邏輯,區分出了調用時停止配置設定(shutdownNow),調用後停止配置設定(shutdown),調用後等待所有任務執行完(awaitTermination)3個等級的關閉操作