天天看點

【高并發】通過源碼深度解析ThreadPoolExecutor類是如何保證線程池正确運作的

大家好,我是冰河~~

對于線程池的核心類ThreadPoolExecutor來說,有哪些重要的屬性和内部類為線程池的正确運作提供重要的保障呢?今天我們就一起來深入探讨下這些問題!!

ThreadPoolExecutor類中的重要屬性

在ThreadPoolExecutor類中,存在幾個非常重要的屬性和方法,接下來,我們就介紹下這些重要的屬性和方法。

ctl相關的屬性

AtomicInteger類型的常量ctl是貫穿線程池整個生命周期的重要屬性,它是一個原子類對象,主要用來儲存線程的數量和線程池的狀态,我們看下與這個屬性相關的代碼如下所示。

//主要用來儲存線程數量和線程池的狀态,高3位儲存線程狀态,低29位儲存線程數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程池中線程的數量的位數(32-3)
private static final int COUNT_BITS = Integer.SIZE - 3;
//表示線程池中的最大線程數量
//将數字1的二進制值向右移29位,再減去1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//線程池的運作狀态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
//擷取線程狀态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//擷取線程數量
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}           

對于線程池的各狀态說明如下所示。

  • RUNNING:運作狀态,能接收新送出的任務,并且也能處理阻塞隊列中的任務
  • SHUTDOWN: 關閉狀态,不能再接收新送出的任務,但是可以處理阻塞隊列中已經儲存的任務,當線程池處于RUNNING狀态時,調用shutdown()方法會使線程池進入該狀态
  • STOP: 不能接收新任務,也不能處理阻塞隊列中已經儲存的任務,會中斷正在處理任務的線程,如果線程池處于RUNNING或SHUTDOWN狀态,調用shutdownNow()方法,會使線程池進入該狀态
  • TIDYING: 如果所有的任務都已經終止,有效線程數為0(阻塞隊列為空,線程池中的工作線程數量為0),線程池就會進入該狀态。
  • TERMINATED: 處于TIDYING狀态的線程池調用terminated ()方法,會使用線程池進入該狀态

也可以按照ThreadPoolExecutor類的注釋,将線程池的各狀态之間的轉化總結成如下圖所示。

【高并發】通過源碼深度解析ThreadPoolExecutor類是如何保證線程池正确運作的
  • RUNNING -> SHUTDOWN:顯式調用shutdown()方法, 或者隐式調用了finalize()方法
  • (RUNNING or SHUTDOWN) -> STOP:顯式調用shutdownNow()方法
  • SHUTDOWN -> TIDYING:當線程池和任務隊列都為空的時候
  • STOP -> TIDYING:當線程池為空的時候
  • TIDYING -> TERMINATED:當 terminated() hook 方法執行完成時候

其他重要屬性

除了ctl相關的屬性外,ThreadPoolExecutor類中其他一些重要的屬性如下所示。

//用于存放任務的阻塞隊列  
private final BlockingQueue<Runnable> workQueue;
//可重入鎖
private final ReentrantLock mainLock = new ReentrantLock();
//存放線程池中線程的集合,通路這個集合時,必須獲得mainLock鎖
private final HashSet<Worker> workers = new HashSet<Worker>();
//在鎖内部阻塞等待條件完成
private final Condition termination = mainLock.newCondition();
//線程工廠,以此來建立新線程
private volatile ThreadFactory threadFactory;
//拒絕政策
private volatile RejectedExecutionHandler handler;
//預設的拒絕政策
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();           

ThreadPoolExecutor類中的重要内部類

在ThreadPoolExecutor類中存在對于線程池的執行至關重要的内部類,Worker内部類和拒絕政策内部類。接下來,我們分别看這些内部類。

Worker内部類

Worker類從源代碼上來看,實作了Runnable接口,說明其本質上是一個用來執行任務的線程,接下來,我們看下Worker類的源代碼,如下所示。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    private static final long serialVersionUID = 6138294804551838833L;
    //真正執行任務的線程
    final Thread thread;
    //第一個Runnable任務,如果在建立線程時指定了需要執行的第一個任務
    //則第一個任務會存放在此變量中,此變量也可以為null
    //如果為null,則線程啟動後,通過getTask方法到BlockingQueue隊列中擷取任務
    Runnable firstTask;
    //用于存放此線程完全的任務數,注意:使用了volatile關鍵字
    volatile long completedTasks;
    
    //Worker類唯一的構造放大,傳遞的firstTask可以為null
    Worker(Runnable firstTask) {
        //防止在調用runWorker之前被中斷
        setState(-1);
        this.firstTask = firstTask;
        //使用ThreadFactory 來建立一個新的執行任務的線程
        this.thread = getThreadFactory().newThread(this);
    }
    //調用外部ThreadPoolExecutor類的runWorker方法執行任務
    public void run() {
        runWorker(this);
    }

    //是否擷取到鎖 
    //state=0表示鎖未被擷取
    //state=1表示鎖被擷取
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}           

在Worker類的構造方法中,可以看出,首先将同步狀态state設定為-1,設定為-1是為了防止runWorker方法運作之前被中斷。這是因為如果其他線程調用線程池的shutdownNow()方法時,如果Worker類中的state狀态的值大于0,則會中斷線程,如果state狀态的值為-1,則不會中斷線程。

Worker類實作了Runnable接口,需要重寫run方法,而Worker的run方法本質上調用的是ThreadPoolExecutor類的runWorker方法,在runWorker方法中,會首先調用unlock方法,該方法會将state置為0,是以這個時候調用shutDownNow方法就會中斷目前線程,而這個時候已經進入了runWork方法,就不會在還沒有執行runWorker方法的時候就中斷線程。

注意:大家需要重點了解Worker類的實作。

拒絕政策内部類

線上程池中,如果workQueue阻塞隊列滿了,并且沒有空閑的線程池,此時,繼續送出任務,需要采取一種政策來處理這個任務。而線程池總共提供了四種政策,如下所示。

  • 直接抛出異常,這也是預設的政策。實作類為AbortPolicy。
  • 用調用者所在的線程來執行任務。實作類為CallerRunsPolicy。
  • 丢棄隊列中最靠前的任務并執行目前任務。實作類為DiscardOldestPolicy。
  • 直接丢棄目前任務。實作類為DiscardPolicy。

在ThreadPoolExecutor類中提供了4個内部類來預設實作對應的政策,如下所示。

public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

public static class AbortPolicy implements RejectedExecutionHandler {

    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}

public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

    public DiscardOldestPolicy() { }


    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}           

我們也可以通過實作RejectedExecutionHandler接口,并重寫RejectedExecutionHandler接口的rejectedExecution方法來自定義拒絕政策,在建立線程池時,調用ThreadPoolExecutor的構造方法,傳入我們自己寫的拒絕政策。

例如,自定義的拒絕政策如下所示。

public class CustomPolicy implements RejectedExecutionHandler {

    public CustomPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            System.out.println("使用調用者所在的線程來執行任務")
            r.run();
        }
    }
}           

使用自定義拒絕政策建立線程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>(),
                       Executors.defaultThreadFactory(),
               new CustomPolicy());           

好了,今天就到這兒吧,我是冰河,我們下期見~~