天天看點

Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理

    相關文章目錄:

    Java線程池ThreadPoolExecutor使用和分析(一)

    Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理

    Java線程池ThreadPoolExecutor使用和分析(三) - 終止線程池原理

    execute()是 java.util.concurrent.Executor接口中唯一的方法,JDK注釋中的描述是“在未來的某一時刻執行指令command”,即向線程池中送出任務,在未來某個時刻執行,送出的任務必須實作Runnable接口,該送出方式不能擷取傳回值。下面是對execute()方法内部原理的分析,分析前先簡單介紹線程池有哪些狀态,在一系列執行過程中涉及線程池狀态相關的判斷。以下分析基于JDK 1.7

    以下是本文的目錄大綱:

    一、線程池執行流程

    二、線程池狀态

    三、任務送出内部原理

        1、execute()  --  送出任務

        2、addWorker()  --  添加worker線程

        3、内部類Worker

        4、runWorker()  --  執行任務

        5、getTask()  --  擷取任務

        6、processWorkerExit()  --  worker線程退出

    若有不正之處請多多諒解,歡迎批評指正、互相讨論。

    請尊重作者勞動成果,轉載請标明原文連結:

    http://www.cnblogs.com/trust-freedom/p/6681948.html

Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理

1、如果線程池中的線程數量少于corePoolSize,就建立新的線程來執行新添加的任務

2、如果線程池中的線程數量大于等于corePoolSize,但隊列workQueue未滿,則将新添加的任務放到workQueue中

3、如果線程池中的線程數量大于等于corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小于maximumPoolSize,則會建立新的線程來處理被添加的任務

4、如果線程池中的線程數量等于了maximumPoolSize,就用RejectedExecutionHandler來執行拒絕政策

其中ctl這個AtomicInteger的功能很強大,其高3位用于維護線程池運作狀态,低29位維護線程池中線程數量

1、RUNNING:-1<<COUNT_BITS,即高3位為1,低29位為0,該狀态的線程池會接收新任務,也會處理在阻塞隊列中等待處理的任務

2、SHUTDOWN:0<<COUNT_BITS,即高3位為0,低29位為0,該狀态的線程池不會再接收新任務,但還會處理已經送出到阻塞隊列中等待處理的任務

3、STOP:1<<COUNT_BITS,即高3位為001,低29位為0,該狀态的線程池不會再接收新任務,不會處理在阻塞隊列中等待的任務,而且還會中斷正在運作的任務

4、TIDYING:2<<COUNT_BITS,即高3位為010,低29位為0,所有任務都被終止了,workerCount為0,為此狀态時還将調用terminated()方法

5、TERMINATED:3<<COUNT_BITS,即高3位為100,低29位為0,terminated()方法調用完成後變成此狀态

這些狀态均由int型表示,大小關系為 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,這個順序基本上也是遵循線程池從 運作 到 終止這個過程。

runStateOf(int c)  方法:c & 高3位為1,低29位為0的~CAPACITY,用于擷取高3位儲存的線程池狀态

workerCountOf(int c)方法:c & 高3位為0,低29位為1的CAPACITY,用于擷取低29位的線程數量

ctlOf(int rs, int wc)方法:參數rs表示runState,參數wc表示workerCount,即根據runState和workerCount打包合并成ctl

1、execute()  --  送出任務

Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理

execute(Runnable command)

參數:

    command    送出執行的任務,不能為空

執行流程:

1、如果線程池目前線程數量少于corePoolSize,則addWorker(command, true)建立新worker線程,如建立成功傳回,如沒建立成功,則執行後續步驟;

    addWorker(command, true)失敗的原因可能是:

    A、線程池已經shutdown,shutdown的線程池不再接收新任務

    B、workerCountOf(c) < corePoolSize 判斷後,由于并發,别的線程先建立了worker線程,導緻workerCount>=corePoolSize

2、如果線程池還在running狀态,将task加入workQueue阻塞隊列中,如果加入成功,進行double-check,如果加入失敗(可能是隊列已滿),則執行後續步驟;

    double-check主要目的是判斷剛加入workQueue阻塞隊列的task是否能被執行

    A、如果線程池已經不是running狀态了,應該拒絕添加新任務,從workQueue中删除任務

    B、如果線程池是運作狀态,或者從workQueue中删除任務失敗(剛好有一個線程執行完畢,并消耗了這個任務),確定還有線程執行任務(隻要有一個就夠了)

3、如果線程池不是running狀态 或者 無法入隊列,嘗試開啟新線程,擴容至maxPoolSize,如果addWork(command, false)失敗了,拒絕目前command

2、addWorker()  --  添加worker線程

Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理

addWorker(Runnable firstTask, boolean core)

    firstTask:    worker線程的初始任務,可以為空

    core:           true:将corePoolSize作為上限,false:将maximumPoolSize作為上限

addWorker方法有4種傳參的方式:

    1、addWorker(command, true)

    2、addWorker(command, false)

    3、addWorker(null, false)

    4、addWorker(null, true)

在execute方法中就使用了前3種,結合這個核心方法進行以下分析

    第一個:線程數小于corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就傳回false

    第二個:當隊列被放滿時,就嘗試将這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果線程池也滿了的話就傳回false

    第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker線上程執行的時候會去任務隊列裡拿任務,這樣就相當于建立了一個新的線程,隻是沒有馬上配置設定任務

    第四個:這個方法就是放一個null的task進Workers Set,而且是在小于corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就傳回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為線程池預先啟動corePoolSize個worker等待從workQueue中擷取任務執行

1、判斷線程池目前是否為可以添加worker線程的狀态,可以則繼續下一步,不可以return false:

    A、線程池狀态>shutdown,可能為stop、tidying、terminated,不能添加worker線程

    B、線程池狀态==shutdown,firstTask不為空,不能添加worker線程,因為shutdown狀态的線程池不接收新任務

    C、線程池狀态==shutdown,firstTask==null,workQueue為空,不能添加worker線程,因為firstTask為空是為了添加一個沒有任務的線程再從workQueue擷取task,而workQueue為空,說明添加無任務線程已經沒有意義

2、線程池目前線程數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了return false,沒超過則對workerCount+1,繼續下一步

3、線上程池的ReentrantLock保證下,向Workers Set中添加新建立的worker執行個體,添加完成後解鎖,并啟動worker線程,如果這一切都成功了,return true,如果添加worker入Set失敗或啟動失敗,調用addWorkerFailed()邏輯

3、内部類Worker

Worker類

Worker類本身既實作了Runnable,又繼承了AbstractQueuedSynchronizer(以下簡稱AQS),是以其既是一個可執行的任務,又可以達到鎖的效果

new Worker()

1、将AQS的state置為-1,在runWoker()前不允許中斷

2、待執行的任務會以參數傳入,并賦予firstTask

3、用Worker這個Runnable建立Thread

之是以Worker自己實作Runnable,并建立Thread,在firstTask外包一層,是因為要通過Worker控制中斷,而firstTask這個工作任務隻是負責執行業務

Worker控制中斷主要有以下幾方面:

1、初始AQS狀态為-1,此時不允許中斷interrupt(),隻有在worker線程啟動了,執行了runWoker(),将state置為0,才能中斷

    不允許中斷展現在:

    A、shutdown()線程池時,會對每個worker tryLock()上鎖,而Worker類這個AQS的tryAcquire()方法是固定将state從0->1,故初始狀态state==-1時tryLock()失敗,沒發interrupt()

    B、shutdownNow()線程池時,不用tryLock()上鎖,但調用worker.interruptIfStarted()終止worker,interruptIfStarted()也有state>0才能interrupt的邏輯

2、為了防止某種情況下,在運作中的worker被中斷,runWorker()每次運作任務時都會lock()上鎖,而shutdown()這類可能會終止worker的操作需要先擷取worker的鎖,這樣就防止了中斷正在運作的線程

Worker實作的AQS為不可重入鎖,為了是在獲得worker鎖的情況下再進入其它一些需要加鎖的方法

Worker和Task的差別:

Worker是線程池中的線程,而Task雖然是runnable,但是并沒有真正執行,隻是被Worker調用了run方法,後面會看到這部分的實作。

4、runWorker()  --  執行任務

Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理

runWorker(Worker w)

1、Worker線程啟動後,通過Worker類的run()方法調用runWorker(this)

2、執行任務之前,首先worker.unlock(),将AQS的state置為0,允許中斷目前worker線程

3、開始執行firstTask,調用task.run(),在執行任務前會上鎖wroker.lock(),在執行完任務後會解鎖,為了防止在任務運作時被線程池一些中斷操作中斷

4、在任務執行前後,可以根據業務場景自定義beforeExecute() 和 afterExecute()方法

5、無論在beforeExecute()、task.run()、afterExecute()發生異常上抛,都會導緻worker線程終止,進入processWorkerExit()處理worker退出的流程

6、如正常執行完目前task後,會通過getTask()從阻塞隊列中擷取新任務,當隊列中沒有任務,且擷取任務逾時,那麼目前worker也會進入退出流程

5、getTask()  --  擷取任務

Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理

getTask()

1、首先判斷是否可以滿足從workQueue中擷取任務的條件,不滿足return null

    A、線程池狀态是否滿足:

        (a)shutdown狀态 + workQueue為空 或 stop狀态,都不滿足,因為被shutdown後還是要執行workQueue剩餘的任務,但workQueue也為空,就可以退出了

        (b)stop狀态,shutdownNow()操作會使線程池進入stop,此時不接受新任務,中斷正在執行的任務,workQueue中的任務也不執行了,故return null傳回

    B、線程數量是否超過maximumPoolSize 或 擷取任務是否逾時

        (a)線程數量超過maximumPoolSize可能是線程池在運作時被調用了setMaximumPoolSize()被改變了大小,否則已經addWorker()成功不會超過maximumPoolSize

        (b)如果 目前線程數量>corePoolSize,才會檢查是否擷取任務逾時,這也展現了當線程數量達到maximumPoolSize後,如果一直沒有新任務,會逐漸終止worker線程直到corePoolSize

2、如果滿足擷取任務條件,根據是否需要定時擷取調用不同方法:

    A、workQueue.poll():如果在keepAliveTime時間内,阻塞隊列還是沒有任務,傳回null

    B、workQueue.take():如果阻塞隊列為空,目前線程會被挂起等待;當隊列中有任務加入時,線程被喚醒,take方法傳回任務

3、在阻塞從workQueue中擷取任務時,可以被interrupt()中斷,代碼中捕獲了InterruptedException,重置timedOut為初始值false,再次執行第1步中的判斷,滿足就繼續擷取任務,不滿足return null,會進入worker退出的流程

6、processWorkerExit()  --  worker線程退出

processWorkerExit(Worker w, boolean completedAbruptly)

    worker:                      要結束的worker

    completedAbruptly: 是否突然完成(是否因為異常退出)

1、worker數量-1

    A、如果是突然終止,說明是task執行時異常情況導緻,即run()方法執行時發生了異常,那麼正在工作的worker線程數量需要-1

    B、如果不是突然終止,說明是worker線程沒有task可執行了,不用-1,因為已經在getTask()方法中-1了

2、從Workers Set中移除worker,删除時需要上鎖mainlock

3、tryTerminate():在對線程池有負效益的操作時,都需要“嘗試終止”線程池,大概邏輯:

    判斷線程池是否滿足終止的狀态

    A、如果狀态滿足,但還有線程池還有線程,嘗試對其發出中斷響應,使其能進入退出流程

    B、沒有線程了,更新狀态為tidying->terminated

4、是否需要增加worker線程,如果線程池還沒有完全終止,仍需要保持一定數量的線程

    線程池狀态是running 或 shutdown

    A、如果目前線程是突然終止的,addWorker()

    B、如果目前線程不是突然終止的,但目前線程數量 < 要維護的線程數量,addWorker()

    故如果調用線程池shutdown(),直到workQueue為空前,線程池都會維持corePoolSize個線程,然後再逐漸銷毀這corePoolSize個線程

參考資料:

深入分析java線程池的實作原理 - 占小狼

JUC源碼分析-線程池-ThreadPoolExecutor

原文連結:

作者:Trust_FreeDom - 部落格園

部落格首頁:http://www.cnblogs.com/trust-freedom/

歡迎轉載,但請保留作者和本文連結,謝謝!

歡迎在下面的評論區與我交流。

如果覺得寫的不錯,請點選下面的“推薦”按鈕,讓我更有動力寫出更好的文章。