天天看點

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

目錄

Executor架構

Executor簡介

Executor架構的兩級排程模型

Executor架構的結構

Executor架構的使用示意圖

Executor架構的成員簡介

FixedThreadPool

FixedThreadPool簡介

FixedThreadPool執行任務過程介紹

為什麼不推薦使用FixedThreadPool?

SingleThreadExecutor

SingleThreadExecutor簡介

SingleThreadExecutor執行任務過程介紹

為什麼不推薦使用SingleThreadExecutor?

CachedThreadPool

CachedThreadPool簡介

CachedThreadPool執行任務過程介紹

為什麼不推薦使用CachedThreadPool?

ScheduledThreadPoolExecutor詳解

ScheduledThreadPoolExecutor簡介

ScheduledThreadPoolExecutor的運作機制

ScheduledThreadPoolExecutor的實作

ScheduledThreadPoolExecutor 執行周期任務的步驟

FutureTask詳解

FutureTask簡介

FutureTask的使用

FutureTask的實作

在Java中,使用線程來異步執行任務。Java線程的建立與銷毀需要一定的開銷,如果我們為每一個任務建立一個新線程來執行,這些線程的建立與銷毀将消耗大量的計算資源。同時,為每一個任務建立一個新線程來執行,這種政策可能會使處于高負荷狀态的應用最終崩潰。

Java的線程既是工作單元,也是執行機制。從JDK5開始,把工作單元與執行機制分離開來。工作單元包括Runable和Callable, 而執行機制由Executor架構提供。

Executor架構

Executor簡介

Executor

架構是 Java5 之後引進的,在 Java 5 之後,通過

Executor

來啟動線程比使用

Thread

start

方法更好,除了更易管理,效率更好(用線程池實作,節約開銷)外,還有關鍵的一點:有助于避免 this 逃逸問題。

補充:this 逃逸是指在構造函數傳回之前其他線程就持有該對象的引用. 調用尚未構造完全的對象的方法可能引發令人疑惑的錯誤。

Executor

架構不僅包括了線程池的管理,還提供了線程工廠、隊列以及拒絕政策等,

Executor

架構讓并發程式設計變得更加簡單。

Executor架構的兩級排程模型

在HotSpotVM的線程模型中,Java線程(java.lang. Thread)被一對一映射為本地作業系統線 程。Java線程啟動時會建立一個本地作業系統線程;當該Java線程終止時,這個作業系統線程 也會被回收。作業系統會排程所有線程并将它們配置設定給可用的CPU。

在上層,Java多線程程式通常把應用分解為若幹個任務,然後使用使用者級的排程器 (Executor架構)将這些任務映射為固定數量的線程;在底層,橾作系統核心将這些線程映射到 硬體處理器上。這種兩級排程模型的示意圖如圖。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

從圖中可以看出,應用程式通過Executor架構控制上層的排程;而下層的排程由作業系統 核心控制,下層的排程不受應用程式的控制。

Executor架構的結構

Executor架構主要由3大部分組成如下。

任務。包括被執行任務需要實作的接口: Runnable接口或Callable接口。

Runnable

接口或

Callable

接口 實作類都可以被

ThreadPoolExecutor

ScheduledThreadPoolExecutor

執行。

任務的執行。包括任務執行機制的核心接口Executor, 以及繼承自Executor的 ExecutorService接口。Executor架構有兩個關鍵類實作了ExecutorService接口 (ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

這裡提了很多底層的類關系,但是,實際上我們需要更多關注的是

ThreadPoolExecutor

這個類,這個類在我們實際使用線程池的過程中,使用頻率還是非常高的。

注意: 通過檢視

ScheduledThreadPoolExecutor

源代碼我們發現

ScheduledThreadPoolExecutor

實際上是繼承了

ThreadPoolExecutor

并實作了 ScheduledExecutorService ,而

ScheduledExecutorService

又實作了

ExecutorService

,正如我們下面給出的類關系圖顯示的一樣。

異步計算的結果。包括接口Future和實作Future接口的FutureTask類。當我們把

Runnable

接口 或

Callable

接口 的實作類送出給

ThreadPoolExecutor

ScheduledThreadPoolExecutor

執行。(調用

submit()

方法時會傳回一個

FutureTask

對象)

Executor架構包含的主要的類與接口如圖所示。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

下面是這些類和接口的簡介。

Executor是一個接口,它是Executor架構的基礎,它将任務的送出與任務的執行分離開 來。

ThreadPoolExecutor是線程池的核心實作類,用來執行被送出的任務。

ScheduledThreadPoolExecutor是一個實作類,可以在給定的延遲後運作指令,或者定期執 行指令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強大。

Future接口和實作Future接口的FutureTask類,代表異步計算的結果。

Runnable接口和Callable接口的實作類,都可以被ThreadPoolExecutor或Scheduled­ThreadPoolExecutor執行。

Executor架構的使用示意圖

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

主線程首先要建立實作Runnable或者Callable接口的任務對象。工具類Executors可以把一 個Runnable對象封裝為一個Callable對象(Executors.callable (Runnable task)或Executors.callable (Runnable task, Object result))。

然後可以把Runnable對象直接交給ExecutorService執行(ExecutorService.execute (Runnable command)); 或者也可以把Runnable對象或Callable對象送出給ExecutorService執行(Executor­Service.submit(Runnable task)或ExecutorService.submit (Callable<T>task))。

如果執行ExecutorService.submit(...), ExecutorService将傳回一個實作Future接口的對象(到目前為止的JDK中,傳回的是FutureTask對象)。由于FutureTask實作了Runnable, 程式員也可以建立FutureTask, 然後直接交給ExecutorService執行。

最後,主線程可以執行FutureTask.get()方法來等待任務執行完成。主線程也可以執行FutureTask.cancel (boolean maylnterruptlfRunning)來取消此任務的執行。

Executor架構的成員簡介

Executor架構的主要成員: ThreadPoolExecutor、ScheduledThreadPoolExecutor、 Future接口、Runnable接口、Callable接口和Executors。

(1) ThreadPoolExecutor

ThreadPoolExecutor通常使用工廠類Executors來建立。Executors可以建立3種類型的 ThreadPoolExecutor: SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

下面分别介紹這3種ThreadPoolExecutor。

1) FixedThreadPool。下面是Executors提供的,建立使用固定線程數的FixedThreadPool的API。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

FixedThreadPool适用于為了滿足資源管理的需求,而需要限制目前線程數量的應用場 景,它适用于負載比較重的伺服器。

2) SingleThreadExecutor。下面是Executors提供的,建立使用單個線程的Single Thread­Executor的API。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

Single ThreadExecutor适用于需要保證順序地執行各個任務;并且在任意時間點,不會有多 個線程是活動的應用場景。

3) CachedThreadPool。下面是Executors提供的建立一個會根據需要建立新線程的 CachedThreadPool的API。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

CachedThreadPool是大小無界的線程池,适用于執行很多的短期異步任務的小程式,或者 是負載較輕的伺服器。

(2) ScheduledThreadPoolExecutor

Schedul edThreadPoo !Executor通常使用工廠類Executors來建立。Executors可以建立2種類 型的ScheduledThreadPoolExecutor, 如下。

ScheduledThreadPoolExecutor。包含若幹個線程的ScheduledThreadPoolExecutor。

SingleThreadScheduledExecutor。隻包含一個線程的ScheduledThreadPoolExecutor。

下面分别介紹這兩種ScheduledThreadPoolExecutor。

下面是工廠類Executors提供的建立固定個數線程的ScheduledThreadPoolExecutor的API。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

ScheduledThreadPoolExecutor适用于需要多個背景線程執行周期任務,同時為了滿足資源 管理的需求而需要限制背景線程的數量的應用場景。下面是Executors提供的,建立單個線程 的SingleThreadScheduledExecutor的API。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

Single ThreadSchedul edExecutor适用于需要單個背景線程執行周期任務,同時需要保證順 序地執行各個任務的應用場景。

(3) Future接口

Future接口和實作Future接口的FutureTask類用來表示異步計算的結果。當我們把Runnable 接口或Callable接口的實作類送出(submit)給ThreadPoolExecutor或 Schedul edThreadPoolExecutor時,ThreadPoolExecutor或Schedul edThreadPoolExecutor會向我們 傳回一個FutureTask對象。下面是對應的API。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

到目前最新的JDK8為止,Java通過上述API傳回的是一個 FutureTask對象。但從API可以看到,Java僅僅保證傳回的是一個實作了Future接口的對象。在将 來的JDK實作中,傳回的可能不一定是FutureTask。

(4)Runnable接口和Callable接口

Runnable接口和Callable接口的實作類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執行。它們之間的差別是Runnable不會傳回結果,而Callable可以傳回結 果。

除了可以自己建立實作Callable接口的對象外,還可以使用工廠類Executors來把一個 Runnable包裝成一個Callable。

下面是Executors提供的,把一個Runnable包裝成一個Callable的API。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

前面講過,當我們把一個Callable對象(比如上面的Callable1或Callable2)送出給 ThreadPoolExecutor或ScheduledThreadPoolExecutor執行時,submit(...)會向我們傳回一個 FutureTask對象。我們可以執行FutureTask get()方法來等待任務執行完成。當任務成功完成後 FutureTask get()将傳回該任務的結果。例如,如果送出的是對象Callable1, FutureTask.get()方法 将傳回null; 如果送出的是對象Callable2, FutureTask.get()方法将傳回result對象。

FixedThreadPool

FixedThreadPool簡介

FixedThreadPool被稱為可重用固定線程數的線程池。下面是FixedThreadPool的源代碼實 現。

/**
     * 建立一個可重用固定數量線程的線程池
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

另外還有一個 FixedThreadPool 的實作方法,和上面的類似,是以這裡不多做闡述:     

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
           

 FixedThreadPool的corePoolSize和maximumPoolSize都被設定為建立FixedThreadPool時指 定的參數nThreads。

當線程池中的線程數大于corePoolSize時,keep AliveTime為多餘的空閑線程等待新任務的最長時間,超過這個時間後多餘的線程将被終止。這裡把keepAliveTime設定為0L, 意味着多餘的空閑線程會被立即終止。

FixedThreadPool

執行任務過程介紹

FixedThreadPool的execute()方法的運作示意圖如圖

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

1)如果目前運作的線程數少于corePoolSize, 則建立新線程來執行任務。

2)線上程池完成預熱之後(目前運作的線程數等于corePoolSize), 将任務加入 LinkedBlockingQueue。

3)線程執行完1中的任務後,會在循環中反複從LinkedBlockingQueue擷取任務來執行。

FixedThreadPool使用無界隊列LinkedBlockingQueue作為線程池的工作隊列(隊列的容量為 Integer.MAX_VALUE)。使用無界隊列作為工作隊列會對線程池帶來如下影響。

1)當線程池中的線程數達到corePoolSize後,新任務将在無界隊列中等待,是以線程池中 的線程數不會超過corePoolSize。

2)由于1, 使用無界隊列時maxmumPoolSize将是一個無效參數。

3)由于1和2, 使用無界隊列時keepAliveTime将是一個無效參數。

4)由于使用無界隊列,運作中的FixedThreadPool (未執行方法shutdown()或 shutdownNow())不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecuti on方法)。

為什麼不推薦使用

FixedThreadPool

FixedThreadPool

使用無界隊列

LinkedBlockingQueue

(隊列的容量為 Integer.MAX_VALUE)作為線程池的工作隊列會對線程池帶來如下影響 :

1 當線程池中的線程數達到

corePoolSize

後,新任務将在無界隊列中等待,是以線程池中的線程數不會超過 corePoolSize;

2 由于使用無界隊列時

maximumPoolSize

将是一個無效參數,因為不可能存在任務隊列滿的情況。是以,通過建立

FixedThreadPool

的源碼可以看出建立的

FixedThreadPool

corePoolSize

maximumPoolSize

被設定為同一個值。

3 由于 1 和 2,使用無界隊列時

keepAliveTime

将是一個無效參數;

4 運作中的

FixedThreadPool

(未執行

shutdown()

shutdownNow()

)不會拒絕任務,在任務比較多的時候會導緻 OOM(記憶體溢出)。

SingleThreadExecutor

SingleThreadExecutor簡介

SingleThreadExecutor是使用單個worker線程的Executor。下面是SingleThreadExecutor的代碼實作。

/**
     *傳回隻有一個線程的線程池
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
           

Single ThreadExecutor的corePoolSize和maximumPoolSize被設定為1。其他參數與FixedThreadPool相同。

Single ThreadExecutor使用無界隊列LinkedBlockingQueue作為線程池的工作隊列(隊列的容量為Integer.MAX_VALUE)。Single ThreadExecutor使用無界隊列作為工作隊列 對線程池帶來的影響與FixedThreadPool相同,這裡就不贅述了。

SingleThreadExecutor執行任務過程介紹

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

1)如果目前運作的線程數少于corePoolSize (即線程池中無運作的線程),則建立一個新線程來執行任務。

2)線上程池完成預熱之後(目前線程池中有一個運作的線程),将任務加入LinkedBlockingQueue。

3)線程執行完1中的任務後,會在一個無限循環中反複從LinkedBlockingQueue擷取任務來 執行。

為什麼不推薦使用

SingleThreadExecutor

SingleThreadExecutor

使用無界隊列

LinkedBlockingQueue

作為線程池的工作隊列(隊列的容量為 Intger.MAX_VALUE)。

SingleThreadExecutor

使用無界隊列作為線程池的工作隊列會對線程池帶來的影響與

FixedThreadPool

相同。說簡單點就是可能會導緻 OOM

CachedThreadPool

CachedThreadPool簡介

CachedThreadPool是一個會根據需要建立新線程的線程池。下面是建立Cached Thread­Pool的源代碼。

/**
     * 建立一個線程池,根據需要建立新線程,但會在先前建構的線程可用時重用它。
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }


public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           

CachedThreadPool的corePoolSize被設定為0, 即corePool為空; maximumPoolSize被設定為Integer.MAX_VALUE, 即maximumPool是無界的。這裡把keep Ali veTime設定為60L, 意味着CachedThreadPool中的空閑線程等待新任務的最長時間為60秒,空閑線程超過60秒後将會被 終止。

FixedThreadPool和Single ThreadExecutor使用無界隊列LinkedBlockingQueue作為線程池的 工作隊列。CachedThreadPool使用沒有容量的Synchronous Queue作為線程池的工作隊列,但 CachedThreadPool的maximumPool是無界的。這意味着如果主線程送出任務的速度高于 maximurnPool中線程處理任務的速度時,CachedThreadPool會不斷建立新線程。極端情況下, CachedThreadPool會因為建立過多線程而耗盡CPU和記憶體資源。

CachedThreadPool執行任務過程介紹

CachedThreadPool的execute()方法的執行示意圖

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

1)首先執行SynchronousQueue.offer (Runnable task).如果目前maximumPool中有空閑線程 正在執行SynchronousQueue.poll (keepAliveTime, Time Unit.NANOSECONDS), 那麼主線程執行 offer操作與空閑線程執行的poll操作配對成功,主線程把任務交給空閑線程執行,execute()方 法執行完成;否則執行下面的步驟2。

2)當初始maximumPool為空,或者maximumPool中目前沒有空閑線程時,将沒有線程執行 SynchronousQueue.poll(keepAliveTime, Time Unit.NANOSECONDS)。這種情況下,步驟1将失 敗。此時CachedThreadPool會建立一個新線程執行任務,execute()方法執行完成。

3)在步驟2中新建立的線程将任務執行完後,會執行SynchronousQueue.poll (keepAliveTime, Time Unit.NANOSECONDS)。這個poll操作會讓空閑線程最多在Synchronous Queue中等待60秒鐘。如果60秒鐘内主線程送出了一個新任務(主線程執 行步驟1)), 那麼這個空閑線程将執行主線程送出的新任務;否則,這個空閑線程将終止。由于 空閑60秒的空閑線程會被終止,是以長時間保持空閑的CachedThreadPool不會使用任何資源。

前面提到過SynchronousQueue是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個線程的對應移除橾作,反之亦然。CachedThreadPool使用Synchronous Queue, 把主線程送出的任務傳遞給空閑線程執行。CachedThreadPool中任務傳遞的示意圖如圖

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

為什麼不推薦使用

CachedThreadPool

CachedThreadPool

允許建立的線程數量為

Integer.MAX_VALUE

,可能會建立大量線程,進而導緻 OOM。 

ScheduledThreadPoolExecutor詳解

ScheduledThreadPoolExecutor簡介

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲之後運作任務,或者定期執行任務。ScheduledThreadPoolExecutor的功能與Timer類似,但 ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個背景線程,而 ScheduledThreadPoolExecutor可以在構造函數中指定多個對應的背景線程數。

ScheduledThreadPoolExecutor

使用的任務隊列

DelayQueue

封裝了一個

PriorityQueue

PriorityQueue

會對隊列中的任務進行排序,執行所需時間短的放在前面先被執行(

ScheduledFutureTask

time

變量小的先執行),如果執行所需時間相同則先送出的任務将被先執行(

ScheduledFutureTask

squenceNumber

變量小的先執行)。

ScheduledThreadPoolExecutor

Timer

的比較:

Timer

對系統時鐘的變化敏感,

ScheduledThreadPoolExecutor

不是;

Timer

隻有一個執行線程,是以長時間運作的任務可以延遲其他任務。

ScheduledThreadPoolExecutor

可以配置任意數量的線程。 此外,如果你想(通過提供 ThreadFactory),你可以完全控制建立的線程;

TimerTask

中抛出的運作時異常會殺死一個線程,進而導緻

Timer

當機:-( ...即計劃任務将不再運作。

ScheduledThreadExecutor

不僅捕獲運作時異常,還允許您在需要時處理它們(通過重寫

afterExecute

方法

ThreadPoolExecutor

)。抛出異常的任務将被取消,但其他任務将繼續運作。

綜上,在 JDK1.5 之後,你沒有理由再使用 Timer 進行任務排程了。

ScheduledThreadPoolExecutor的運作機制

Schedul edThreadPoolExecutor的執行示意圖(本文基于JDK6)如圖所示。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

DelayQueue是一個無界隊列,是以ThreadPoolExecutor的maximumPoolSize在Scheduled­ThreadPoolExecutor中沒有什麼意義(設定maximumPoolSize的大小沒有什麼效果)。

ScheduledThreadPoolExecutor的執行主要分為兩大部分。

1)當調用Schedul edThreadPoolExecutor的schedul eAtFixedRate()方法或者schedule With­FixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實作了 RunnableScheduledFutur接口的ScheduledFuture Task。

2)線程池中的線程從DelayQueue中擷取ScheduledFutureTask, 然後執行任務。

ScheduledThreadPoolExecutor

為了實作周期性的執行任務,對

ThreadPoolExecutor

做了如下修改:

使用

DelayQueue

作為任務隊列;

擷取任務的方不同

執行周期任務後,增加了額外的處理

ScheduledThreadPoolExecutor的實作

前面我們提到過ScheduledThreadPoolExecutor會把待排程的任務(ScheduledFutureTask) 放到一個DelayQueue中。

ScheduledFutureTask主要包含3個成員變量,如下。

long型成員變量time, 表示這個任務将要被執行的具體時間。

long型成員變量sequenceNumber, 表示這個任務被添加到ScheduledThreadPoolExecutor中的序号。

long型成員變量period, 表示任務執行的間隔周期。

DelayQueue封裝了一個PriorityQueue, 這個PriorityQueue會對隊列中的ScheduledFutureTask排序。排序時,time小的排在前面(時間早的任務将被先執行)。如果兩個 ScheduledFuture Task的time相同,就比較sequenceNumber, sequenceNumber小的排在前面(也就是說如果兩個任務的執行時間相同,那麼先送出的任務将被先執行)。

ScheduledThreadPoolExecutor 執行周期任務的步驟

首先,讓我們看看ScheduledThreadPoolExecutor中的線程執行周期任務的過程。圖是ScheduledThreadPoolExecutor中的線程1執行某個周期任務的4個步驟。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

下面是對這4個步驟的說明。

1)線程1從DelayQueue中擷取已到期的ScheduledFutureTask (DelayQueue. take())。到期任務 是指ScheduledFutureTask的time大于等于目前時間。

2)線程1執行這個ScheduledFutureTask。

3)線程1修改ScheduledFutureTask的time變量為下次将要被執行的時間。

4)線程1把這個修改time之後的ScheduledFutureTask放回DelayQueue中(Delay­Queue.add())。

接下來,讓我們看看上面的步驟1)擷取任務的過程。下面是DelayQueue. take()方法的源代碼實作。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

DelayQueue. take()的執行示意圖。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

如圖所示,擷取任務分為3大步驟。

1)擷取Lock。

2)擷取周期任務。

如果PriorityQueue為空,目前線程到Condition中等待否則執行下面的2.2。

如果PriorityQueue的頭元素的time時間比目前時間大,到Condition中等待到time時間;否則執行下面的2.3。

擷取Priori tyQueue的頭元素(2.3.1) ; 如果Priori tyQueue不為空,則喚醒在Condition中等待的所有線程(2.3.2)。

3)釋放Lock。

ScheduledThreadPoolExecutor在一個循環中執行步驟2, 直到線程從Priori tyQueue擷取到一個元素之後(執行2.3.1之後),才會退出無限循環(結束步驟2)。

最後,讓我們看看Schedul edThreadPoolExecutor中的線程執行任務的步驟4, 把 ScheduledFutureTask放入DelayQueue中的過程。下面是DelayQueue.add()的源代碼實作。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

DelayQueue.add()的執行示意圖。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

如圖所示,添加任務分為3大步驟。

1)擷取Lock。

2)添加任務。 向PriorityQueue添加任務。 如果在上面2.1中添加的任務是PriorityQueue的頭元素,喚醒在Condition中等待的所有線 程。

3)釋放Lock。

FutureTask詳解

Future接口和實作Future接口的FutureTask類,代表異步計算的結果。

FutureTask簡介

FutureTask除了實作Future接口外,還實作了Runnable接口。是以,FutureTask可以交給 Executor執行,也可以由調用線程直接執行(Future Task.run())。根據Future Task.run()方法被執行 的時機,FutureTask可以處于下面3種狀态。

1)未啟動。Future Task.run()方法還沒有被執行之前,FutureTask處于未啟動狀态。當建立一 個FutureTask, 且沒有執行Future Task.run()方法之前,這個FutureTask處于未啟動狀态。

2)Task.run()方法被執行的過程中,FutureTask處于已啟動狀态。

3)Task.run()方法執行完後正常結束,或被取消(Future Task.cancel (...)) , 或 執行Future Task.run()方法時抛出異常而異常結束,FutureTask處于已完成狀态。

圖是FutureTask的狀态遷移的示意圖。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

當FutureTask處于已完成狀态時,執行Future Task. get()方法将導緻調用線程立即傳回結果或抛 出異常。

當FutureTask處于未啟動狀态時,執行Future Task.cancel()方法将導緻此任務永遠不會被執 行

當FutureTask處于已啟動狀态時,執行FutureTask.cancel (true)方法将以中斷執行此任務線程 的方式來試圖停止任務

當FutureTask處于已啟動狀态時,執行FutureTask.cancel (false)方法将 不會對正在執行此任務的線程産生影響(讓正在執行的任務運作完成)

當FutureTask處于已完 成狀态時,執行Future Task.cancel (...)方法将傳回false。

get方法和cancel方法的執行示意圖。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

FutureTask的使用

可以把FutureTask交給Executor執行;也可以通過ExecutorService.submit (...)方法傳回一個 FutureTask, 然後執行Future Task.get()方法或Future Task.cancel (...)方法。除此以外,還可以單獨 使用FutureTask。

當一個線程需要等待另一個線程把某個任務執行完後它才能繼續執行,此時可以使用 FutureTask。假設有多個線程執行若幹任務,每個任務最多隻能被執行一次。當多個線程試圖 同時執行同一個任務時,隻允許一個線程執行任務,其他線程需要等待這個任務執行完後才 能繼續執行。下面是對應的示例代碼。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解
java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

當兩個線程試圖同時執行同一個任務時,如果Thread 1執行1.3後Thread 2執行2.1, 那麼接 下來Thread 2将在2.2等待,直到Thread 1執行完1.4後Thread 2才能從2.2 (Future Task.get())傳回。

FutureTask的實作

FutureTask的實作基于AbstractQueuedSynchronizer (以下簡稱為AQS)。java.util.concurrent中 的很多可阻塞類(比如ReentrantLock)都是基于AQS來實作的。AQS是一個同步架構,它提供通 用機制來原子性管理同步狀态、阻塞和喚醒線程,以及維護被阻塞線程的隊列。JDK6中AQS 被廣泛使用,基于AQS實作的同步器包括: ReentrantLock、Semaphore、ReentrantReadWri teLock、 CountDownLatch和FutureTask。

每一個基于AQS實作的同步器都會包含兩種類型的操作,如下。

至少一個acquire操作。這個操作阻塞調用線程,除非/直到AQS的狀态允許這個線程繼續 執行。FutureTask的acquire操作為get()/get(long timeout, TimeUnit unit)方法調用。

至少一個release橾作。這個操作改變AQS的狀态,改變後的狀态可允許一個或多個阻塞 線程被解除阻塞。FutureTask的release操作包括run()方法和cancel(. ..)方法。

基于“複合優先于繼承"的原則,FutureTask聲明了一個内部私有的繼承于AQS的子類 Sync, 對FutureTask所有公有方法的調用都會委托給這個内部子類。

AQS被作為“模闆方法模式”的基礎類提供給FutureTask的内部子類Sync, 這個内部子類隻 需要實作狀态檢查和狀态更新的方法即可,這些方法将控制FutureTask的擷取和釋放橾作。具 體來說,Sync實作了AQS的tryAcquireShared (int)方法和tryReleaseShared (int)方法,Sync通過這 兩個方法來檢查和更新同步狀态。

FutureTask的設計示意圖如圖

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

如圖所示,Sync是FutureTask的内部私有類,它繼承自AQS。建立FutureTask時會建立内部 私有的成員對象Sync, FutureTask所有的的公有方法都直接委托給了内部私有的Sync。

Future Task get()方法會調用AQS.acquireSharedinterruptibly(int arg)方法,這個方法的執行 過程如下。

1)調用AQS.acquireSharedlnterruptibly(int arg)方法,這個方法首先會回調在子類Sync中實 現的try Acquire Shared()方法來判斷acquire操作是否可以成功。acquire操作可以成功的條件為: state為執行完成狀态RAN或已取消狀态CANCELLED, 且runner不為null。

2)如果成功則get()方法立即傳回。如果失敗則到線程等待隊列中去等待其他線程執行 release橾作。

3)當其他線程執行release操作(比如Future Task.run()或Future Task.cancel (...))喚醒目前線 程後,目前線程再次執行try Acquire Shared()将傳回正值1, 目前線程将離開線程等待隊列并喚 醒它的後繼線程(這裡會産生級聯喚醒的效果,後面會介紹)。

4)最後傳回計算的結果或抛出異常。

Future Task.run()的執行過程如下。

1)執行在構造函數中指定的任務(Callable.call())。

2)以原子方式來更新同步狀态(調用AQS.compareAndSetState (int expect, int upclate), 設定 state為執行完成狀态RAN,如果這個原子操作成功,就設定代表計算結果的變量result的值為 Callable.call()的傳回值,然後調用AQS.releaseShared(int arg)。

3) AQS.releaseShared (int arg)首先會回調在子類Sync中實作的tryR.eleaseShared (arg)來執 行release操作(設定運作任務的線程runner為null, 然會傳回true) ; AQS.releaseShared (int arg), 然後喚醒線程等待隊列中的第一個線程。

4)調用Future Task. done()。

當執行Future Task. get()方法時,如果FutureTask不是處于執行完成狀态RAN或已取消狀态 CANCELLED, 目前執行線程将到AQS的線程等待隊列中等待(見下圖的線程A、B、C和D)。當 某個線程執行Future Task.run()方法或FutureTask.cancel (…)方法時,會喚醒線程等待隊列的第一 個線程(見圖10-16所示的線程E喚醒線程A)。

java多線程 其他Executor架構與FutureTaskExecutor架構FixedThreadPoolSingleThreadExecutorCachedThreadPoolScheduledThreadPoolExecutor詳解FutureTask詳解

假設開始時FutureTask處于未啟動狀态或已啟動狀态,等待隊列中已經有3個線程(A、B和 C)在等待。此時,線程D執行get()方法将導緻線程D也到等待隊列中去等待。

當線程E執行run()方法時,會喚醒隊列中的第一個線程A。線程A被喚醒後,首先把自己從 隊列中删除,然後喚醒它的後繼線程B, 最後線程A從get()方法傳回。線程B、C和D重複A線程 的處理流程。最終,在隊列中等待的所有線程都被級聯喚醒并從get()方法傳回