天天看点

java线程和线程池的关闭Thread的关闭interrupt相关的处理总结

Thread的关闭

关于线程的关闭,众所周知,有几个已经废弃的API

Thread#stop()

Thread#destroy()

废弃的原因是会无条件的终止任务,而且不会让出资源的锁,不够安全

正确的外部关闭方法是使用

Thread#interrupt()

这个方法只是改变线程的一个状态标志,并且这个标志位不是线程的NEW,RUNNABLE,BLOCKED这些状态(如不熟悉请自行查阅线程状态图),而是一个单独的状态标志

public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
           

interrupt0()

是本地方法

blocker

是用来处理中断的对象。如果在线程执行可中断IO操作时调用

Thread#interupt()

,会调用该对象的处理方法

/* The object in which this thread is blocked in an interruptible I/O
     * operation, if any.  The blocker's interrupt method should be invoked
     * after setting this thread's interrupt status.
     */
    private volatile Interruptible blocker;
           

Interruptible是内部接口,不公开,但是nio中的InterruptibleChannel和Selector的实现类中

都聚合了这个对象,因此也可以说是为nio提供了实现的基础的接口(不熟悉nio也就是非阻塞IO请自行查阅)

interrupt相关的处理

首先,当线程处于WAITING、TIMED_WAITING状态时(注意不包含BLOCKED状态),会检测中断的状态标志并重置,并抛出

InterruptException

这个异常是受检异常,因此必须要捕获。

只有一些特殊的方法才会让线程进入这些状态比如

Object#wait()

Thread.sleep()

或者

LockSupport#park()

等,也就是只对锁的等待队列中元素进行检测,并抛出异常。

在调用这些方法时注意对中断异常的处理

但是线程处于BLOCKED状态也就是锁的同步队列中的元素时并不会检查这个状态(这也很合理,毕竟阻塞状态不应该能检测变化),所以需要在线程运行方法中

检测中断状态并处理

一个经典的模式是这样

class Task implements Runnable {
    public void run() {
        while (!Thread.interrupted()) {//clean the interrupt flag
            doWork();
        }
        //some finalize work
        doClose();//当检测到当前线程已中断后,做一些关闭处理,之后run()运行结束,即线程关闭

    } 
    
    private void doWork() {
        //...
    }
    
    private void doClose() {
        //...
    }
    
   
}
           

当检测到当前线程已中断后,做一些关闭处理,之后run()运行结束,即线程关闭

检测中断的方法有几个,常用的两个中一个是

Thread.interrupted()

,这是静态方法,检测当前线程,检测后会将中断标志清除,

因此即使这个任务已经中断,但线程可以重用

另一个是

Thread#isInterrupted()

只会检测不会清除中断标志并且是实例方法,如果当前线程中只用这个方法检测中断,就意味着

这个线程不会再重用,通常在向线程池提交可中断的任务时不使用这个方法

#ThreadPoolExecutor 的 shutdown()方法和shutdownNow() 方法

线程池的关闭有

awaitTermination()

shutdown()

shutdownNow()

几个方法

awaitTermination()

会等待所有已提交的任务执行完

shutdown()

会修改线程池状态为SHUTDOWN,此状态下工作线程执行分配的任务后会停止,但调用的同一时刻可以继续分配任务,调用后不会再分配任务

(已提交但未分配的任务不会执行,)

shutdownNow()

会修改线程池状态为STOP,此状态下工作线程执行现有任务后会停止,但关闭的同时就不会分配任务,并且会返回未完成的任务列表

shutdown()

为例分析

/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);//1
            interruptIdleWorkers();//2
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
           

advanceRunState(SHUTDOWN);

只是使用CAS操作设置线程池的状态

关键在于

interruptIdleWorkers();

这一步。

它最终调用的是interruptIdleWorkers(false);

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
           

基本上就是对工作线程依次设置中断

shutdown()

最后的

tryTerminate();

方法主要是将线程池的状态设为TERMINATED,

然后调用

protected void terminate()

方法,这是个空方法,可以被继承用来做一些关闭后额外的工作。

调用顺序在

onShutdown()

之后(这里的onShutdown()是空方法,但是ScheduledThreadPoolExecutor不空)

shutdownNow()

方法的主要区别在于

advanceRunState(STOP);//1
interruptWorkers();//2
           

即会直接将线程池状态设置为STOP,然后不是终止所有空闲线程(

interruptIdleWorkers()

),而是终止所有线程(

interruptWorkers()

)

,这个方法最终调用

Worker#interruptIfStarted()

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    //...
    void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
     //...       
        }
           

看上去

shutdown()

shutdownNow()

的实现没什么不同,都只是对工作线程调用中断方法,

为什么前者是会是停止空闲线程,后者是停止所有线程呢?

原因在于线程池的状态,和工作线程对线程池状态的处理的处理

工作线程的

run()

方法是调用的线程池的

runWorker()

方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//如果线程池已经至少STOP状态了,getTask()
                                                                  //会返回null,任务不会被执行;
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
           

看着挺长但基本都是异常处理,比较难理解的就是有注释那段对线程池状态和中断标志检测的处理部分

这里的关键在于负责分配任务的

getTask()

方法

首先每个工作线程创建的时候都聚合了一条线程和一个任务(firstTask),每次执行前都会取一个任务执行,如果线程池已经至少STOP状态了,

getTask()

会返回null,任务不会被执行(但如果工作线程是第一个任务(w.firstTask != null)或者已有任务正在运行,那么仍然会继续执行);

否则会再检测一次线程池的状态,并确保线程没有中断再开始执行任务(如果中间发生线程切换并且shutDownNow了,任务也不会执行)

如果是shutDown的同一时刻,task = getTask()不为空时会继续执行,但最多执行一个任务就会停止运行(因为第二次检测中断不会通过)。

而shutDownNow则不会,同一时刻的所有线程都会停止工作。

可见shutDownNow也不是真正的立即关闭所有的线程,如果shutdownNow之前工作线程已经分配了任务,任务依然会执行,之后再关闭

总结

  • java线程中,使用设置和检测中断标志来关闭线程,
  • 中断的处理有两种,一种是等待状态下检测到中断标志抛出异常,一种是运行时直接主动检测
  • 使用中断的目的是为了无副作用的关闭线程,为此正在执行的任务不能直接关闭,使用中断就相当于发出命令,等所有工作线程忙完手头任务后再立即结束工作
  • 线程池中为了把关闭操作区分出不同的等级,使用了复杂的状态变化和检测逻辑,区分出了调用时停止分配(shutdownNow),调用后停止分配(shutdown),调用后等待所有任务执行完(awaitTermination)3个等级的关闭操作