天天看點

ThreadPoolExecutor源碼解析(一)

1.ThreadPoolExcuter原理說明

  首先我們要知道為什麼要使用ThreadPoolExcuter,具體可以看看文檔中的說明:

  線程池可以解決兩個不同問題:由于減少了每個任務的調用開銷,在執行大量的異步任務時,它通常能夠提供更好的性能,并且還可以提供綁定和管理資源(包括執行集合任務時使用的線程)的方法。每個 ThreadPoolExecutor還維護着一些基本的統計資料,如完成的任務數。

  線程池做的其實可以看得很簡單,其實就是把你送出的任務(task)進行排程管理運作,但這個排程的過程以及其中的狀态控制是比較複雜的。

2.初始化參數介紹

可以直接看最完整的ThreadPoolExcuter的初始化函數:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}      

逐個介紹如下:

corePoolSize:核心線程數,在ThreadPoolExcutor中有一個與它相關的配置:allowCoreThreadTimeOut(預設為false),當allowCoreThreadTimeOut為false時,核心線程會一直存活,哪怕是一直空閑着。而當allowCoreThreadTimeOut為true時核心線程空閑時間超過keepAliveTime時會被回收。

maximumPoolSize:最大線程數,線程池能容納的最大線程數,當線程池中的線程達到最大時,此時添加任務将會采用拒絕政策,預設的拒絕政策是抛出一個運作時錯誤(RejectedExecutionException)。值得一提的是,當初始化時用的工作隊列為LinkedBlockingDeque時,這個值将無效。

keepAliveTime:存活時間,當非核心空閑超過這個時間将被回收,同時空閑核心線程是否回收受allowCoreThreadTimeOut影響。

unit:keepAliveTime的機關。

workQueue:任務隊列,常用有三種隊列,即SynchronousQueue,LinkedBlockingDeque(無界隊列),ArrayBlockingQueue(有界隊列)。

threadFactory:線程工廠,ThreadFactory是一個接口,用來建立worker。通過線程工廠可以對線程的一些屬性進行定制。預設直接建立線程。

RejectedExecutionHandler:也是一個接口,隻有一個方法,當線程池中的資源已經全部使用,添加新線程被拒絕時,會調用RejectedExecutionHandler的rejectedExecution法。

預設是抛出一個運作時異常。

  這麼多參數看起來好像很複雜,是以Java貼心得為我們準備了便捷的API,即可以直接用Executors建立各種線程池。分别是:

//建立一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則建立線程。

//通過設定corePoolSize為0,而maximumPoolSize為Integer.Max_VALUE(Int型資料最大值)實作。

ExecutorService cache = Executors.newCachedThreadPool();

//建立一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。

//通過将corePoolSize和maximumPoolSize的值設定為一樣的值來實作。
        ExecutorService fixed = Executors.newFixedThreadPool(num);

//建立一個定長線程池,支援定時及周期性任務執行。

//通過将隊列參數workQueue設定為DelayWorkQueue來實作。
        ExecutorService schedule = Executors.newScheduledThreadPool(5);

//建立一個單線程化的線程池,它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

//通過将corePoolSize和maximumPoolSize都設定為1來實作。
        ExecutorService single = Executors.newSingleThreadExecutor();      

   這幾個API會根據具體的情況而使用預設定好預設的初始化參數去建立一個ThreadPoolExecutor。

  這裡需要做一個額外說明,在ThreadPoolExcuter中,worker和task是有差別的,task是使用者送出的任務,而worker則是用來執行task的線程。在初始化參數中,corePoolSize和maximumPoolSize都是針對worker的,而workQueue是用來存放task的。

3.worker介紹

  前面有介紹了一下worker和task的差別,其中task是使用者送出的線程任務,而worker則是ThreadPoolExecutor自己内部實作的一個類了。

  具體源碼如下:

  

/**
     * Woker主要維護着運作task的worker的中斷控制資訊,以及其他小記錄。這個類拓展AbstractQueuedSynchronizer
     * 而來簡化擷取和釋放每一個任務執行中的鎖。這可以防止中斷那些打算喚醒正在等待其他線程任務的任務,而不是
     * 中斷正在運作的任務。我們實作一個簡單的不可重入鎖而不是ReentrantLo,因為我們不想當其調用setCorePoolSize
     * 這樣的方法的時候能獲得鎖。
     */
    //worker主要是對進行中的任務進行中斷控制,順帶着對其他行為進行記錄
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        //正在跑的線程,如果是null辨別factory失敗
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        //初始化一個任務以運作
        Runnable firstTask;
        /** Per-thread task counter */
        //每個線程計數
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         * 用給定的first task和從threadFactory建立
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }


        /** Delegates main run loop to outer runWorker  */
        //主要調用了runWorker
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        //鎖方法

        //
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        //嘗試擷取鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //嘗試釋放鎖
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }      

   Worker其實可以看作進階一點的線程。其中繼承AbstractQueuedSynchronizer主要是為了實作鎖控制。ThreadPoolExecutor會持有并管理Worker,在Worker中firstTask其實就是存放task的,而thread則是存放目前Worker本身的線程。

其中比較重要的就是run方法了,但這個方法其實又是去調用ThreadPoolExecutor裡面的runWorker()方法,具體可以看下一節的介紹。

4.ctl介紹以及運作狀态說明

首先需要介紹線程池有五種運作狀态:

RUNNING(狀态值-1): 接收新任務并處理隊列中的任務

SHUTDOWN(狀态值0): 不接收新任務但會處理隊列中的任務。

STOP(狀态值1): 不接收新任務,不處理隊列中的任務,并中斷正在處理的任務

TIDYING(狀态值2): 所有任務已終止,workerCount為0,處于TIDYING狀态的線程将調用鈎子方法terminated()。

TERMINATED(狀态值3): terminated()方法完成。

  然後我們可以看看ThreadPoolExcuter中的ctl這個變量。

ctl是ThreadPoolExcuter中比較有意思的一個實作,它是一個AtomicInteger,這裡不對AtomicInteger多做讨論,隻要知道可以把它看成有原子性的Integer就夠了,其實它具有原子性的原理是使用了CAS的技術,這是一種樂觀鎖的具體實作。

ThreadPoolExcuter是将兩個内部值打包成一個值,即将workerCount和runState(運作狀态)這兩個值打包在一個ctl中,因為runState有5個值,需要3位,是以有3位表示

runState,而其他29位表示為workerCount。

而運作時要擷取其他資料時,隻需要對ctl進行拆包即可。具體這部分代碼如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl  

//拆包ctl,分别擷取runState和WorkerCount
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }

//打包操作
    private static int ctlOf(int rs, int wc) { return rs | wc; }      

5.拒絕政策

當執行器(Executor)處于終止狀态,或者執行器在max threads和工作隊列都是有界并且處于飽和的時候,新送出的任務會被拒絕。在任一情況下,執行的任務将調用RejectedExecutionHandler的方法rejectedExecution(Runnable, ThreadPoolExecutor)。有以下四種拒絕政策:

1.預設的是ThreadPoolExecutor.AbortPolicy,在這種政策下,處理器會在拒絕後抛出一個運作異常RejectedExecutionException。

2.在ThreadPoolExecutor.CallerRunsPolicy的政策下,線程會調用它直接的execute來運作這個任務。這種方式提供簡單的回報控制機制來減緩新任務送出的速度。

3.在ThreadPoolExecutor.DiscardPolicy政策下,無法執行的任務将被簡單得删除掉。

4.在ThreadPoolExecutor.DiscardOldestPolicy政策下,如果executor沒有處于終止狀态,在工作隊列頭的任務将被删除,然後會重新執行(可能會再次失敗,這會導緻重複這個過程)。

總結:本篇初步介紹了ThreadPoolExcuter的基本原理,解決了什麼問題。而後說明了ThreadPoolExcuter中的初始化參數,對其中的各個參數做初步介紹。再之後介紹ctl變量的作用,并初步介紹了任務送出失敗後的拒絕政策。