1. 關于ThreadPoolExecutor
為了更好地控制多線程,JDK提供了一套Executor架構,幫助開發人員有效的進行線程控制,其本質就是一個線程池。其中ThreadPoolExecutor是線程池中最核心的一個類,後面提到的四種線程池都是基于ThreadPoolExecutor實作的。
ThreadPoolExecutor提供了四個構造方法,我們看下最重要的一個構造函數:
public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
}
函數的參數含義如下:
- corePoolSize: 線程池維護線程的最少數量
- maximumPoolSize:線程池維護線程的最大數量
- keepAliveTime: 線程池維護線程所允許的空閑時間
- unit: 線程池維護線程所允許的空閑時間的機關
- workQueue: 線程池所使用的緩沖隊列
- handler: 線程池對拒絕任務的處理政策
線程池執行的過程:
- 線程池剛建立時,裡面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列裡面有任務,線程池也不會馬上執行它們。
-
當調用 execute() 方法添加一個任務時,線程池會做如下判斷:
a. 如果正在運作的線程數量小于 corePoolSize,那麼馬上建立線程運作這個任務;
b. 如果正在運作的線程數量大于或等于 corePoolSize,那麼将這個任務放入隊列。
c. 如果這時候隊列滿了,而且正在運作的線程數量小于 maximumPoolSize,那麼還是要建立線程運作這個任務;
d. 如果隊列滿了,而且正在運作的線程數量大于或等于 maximumPoolSize,那麼線程池會抛出異常,告訴調用者“我不能再接受任務了”。
- 當一個線程完成任務時,它會從隊列中取下一個任務來執行。
- 當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果目前運作的線程數大于corePoolSize,那麼這個線程就被停掉。是以線程池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
ThreadPoolExecutor的繼承關系:
ThreadPoolExecutor中的隊列:
ThreadPoolExecutor内部應用了任務緩存隊列,即workQueue,它用來存放等待執行的任務。
workQueue的類型為BlockingQueue,通常可以取下面三種類型:
- ArrayBlockingQueue:基于數組的先進先出隊列,此隊列建立時必須指定大小;
- LinkedBlockingQueue:基于連結清單的先進先出隊列,如果建立時沒有指定此隊列大小,則預設為Integer.MAX_VALUE;
- synchronousQueue:這個隊列比較特殊,它不會儲存送出的任務,而是将直接建立一個線程來執行新來的任務。
任務拒絕政策:
當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕政策,通常有以下四種政策:
ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
擴充線程池(記錄任務執行日志):
在預設的ThreadPoolExecutor實作中,提供了空的beforeExecutor和afterExecutor的實作,在實際應用中可以對其進行擴充來實作對線程池運作狀态的追蹤,輸出一些有用的調試資訊,以幫助系統故障診斷,這對于多線程程式錯誤排查是很有幫助的。
ThreadPoolExecutor例子:
import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicLong;public class ThreadPool { private int corePoolSize = 1; // 線程池維護線程的最少數量
private int maximumPoolSize = 10;// 線程池維護線程的最大數量
private long keepAliveTime = 3; // 線程池維護線程所允許的空閑時間
private TimeUnit unit = TimeUnit.SECONDS;// 線程池維護線程所允許的空閑時間的機關
private BlockingQueue<Runnable> workQueue; // 線程池所使用的緩沖隊列
private RejectedExecutionHandler handler; // 線程池對拒絕任務的處理政策
private static AtomicLong along = new AtomicLong(0); public void run() throws InterruptedException {
ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.DiscardOldestPolicy()) { // 線程執行之前運作
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("...............beforeExecute");
} // 線程執行之後運作
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("...............afterExecute");
} // 整個線程池停止之後
protected void terminated() {
System.out.println("...............thread stop");
}
}; for (int i = 1; i <= 10; i++) {
pool.execute(new ThreadPoolTask(i, along));
} for (int i = 1; i <= 10; i++) {
pool.execute(new ThreadPoolTask(-i, along));
}
pool.shutdown();
Thread.sleep(25000);
System.out.println(along.get());
} public static void main(String[] args) { try { new ThreadPool().run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}class ThreadPoolTask implements Runnable { private int i = 0; private AtomicLong along; ThreadPoolTask(int i, AtomicLong along) { this.i = i; this.along = along;
}
@Override
public void run() { try { // 模拟業務邏輯
Thread.sleep(1000);
along.addAndGet(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
我們可以利用這個特性實作線上程池中列印出異常堆棧資訊(正常是不會列印出來的),這裡就不示範了。
2. 關于Executors提供的四種線程池
Executors 提供了一系列工廠方法用于創先線程池,傳回的線程池都實作了 ExecutorService 接口。
public static ExecutorService newFixedThreadPool(int nThreads)建立固定數目線程的線程池。public static ExecutorService newCachedThreadPool()建立一個可緩存的線程池,調用execute将重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則建立一個新線 程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。public static ExecutorService newSingleThreadExecutor()建立一個單線程化的Executor。public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)建立一個支援定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。
這四種方法都是用的 Executors 中的 ThreadFactory 建立的線程。
newCachedThreadPool()
- 緩存型池子,先檢視池中有沒有以前建立的線程,如果有,就 reuse 如果沒有,就建一個新的線程加入池中
- 緩存型池子通常用于執行一些生存期很短的異步型任務 是以在一些面向連接配接的 daemon 型 SERVER 中用得不多。但對于生存期短的異步任務,它是 Executor 的首選。
- 能 reuse 的線程,必須是 timeout IDLE 内的池中線程,預設 timeout 是 60s,超過這個 IDLE 時長,線程執行個體将被終止及移出池。
newFixedThreadPool(int)
- newFixedThreadPool 與 cacheThreadPool 差不多,也是能 reuse 就用,但不能随時建新的線程。
- 其獨特之處:任意時間點,最多隻能有固定數目的活動線程存在,此時如果有新的線程要建立,隻能放在另外的隊列中等待,直到目前的線程中某個線程終止直接被移出池子。
- 和 cacheThreadPool 不同,FixedThreadPool 沒有 IDLE 機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的 TCP 或 UDP IDLE 機制之類的),是以 FixedThreadPool 多數針對一些很穩定很固定的正規并發線程,多用于伺服器。
- 從方法的源代碼看,cache池和fixed 池調用的是同一個底層 池,隻不過參數不同:
- fixed 池線程數固定,并且是0秒IDLE(無IDLE)。
- cache 池線程數支援 0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60 秒 IDLE 。
newScheduledThreadPool(int)
- 排程型線程池
- 這個池子裡的線程可以按 schedule 依次 delay 執行,或周期執行
SingleThreadExecutor()
- 單例線程,任意時間池中隻能有一個線程
- 用的是和 cache 池和 fixed 池相同的底層池,但線程數目是 1-1,0 秒 IDLE(無 IDLE)
一般來說,CachedTheadPool 在程式執行過程中通常會建立與所需數量相同的線程,然後在它回收舊線程時停止建立新線程,是以它是合理的 Executor 的首選,隻有當這種方式會引發問題時(比如需要大量長時間面向連接配接的線程時),才需要考慮用 FixedThreadPool。
----《Thinking in Java》第四版
以上引用自極客學院,總結的太精彩了。
3. Spring中的線程池管理
Spring的TaskExecutor接口等同于java.util.concurrent.Executor接口。 實際上,它存在的主要原因是為了在使用線程池的時候,将對Java 5的依賴抽象出來。 這個接口隻有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初建立TaskExecutor是為了在需要時給其他Spring元件提供一個線程池的抽象。例如ApplicationEventMulticaster元件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 當然,如果你的bean需要線程池行為,你也可以使用這個抽象層。
介紹下使用比較多的ThreadPoolTaskExecutor 類,這個實作隻能在Java 5以上環境使用(現在應該沒有低于1.5的老環境了吧~),它暴露的bean properties可以用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個TaskExecutor中。
spring中ThreadPoolTaskExecutor最常用方式就是做為BEAN注入到容器中,其暴露的各個屬性其實是ThreadPoolExecutor的屬性,而且這展現了DI容器的優勢:
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="2"/>
<property name="keepAliveSeconds" value="200"/>
<property name="maxPoolSize" value="10"/>
<property name="queueCapacity" value="60"/> </bean>
4. 優化線程池線程數量
線程池的理想大小取決于被送出任務的類型以及所部署系統的特性。在代碼中不會固定線程池的大小,而應該通過某種配置機制來來提供,或者根據Runtime.getRuntime().availableProcessors()來動态計算。
如果一台伺服器上隻部署這一個應用并且隻有一個線程池(N為CPU總核數):
- 如果是CPU密集型應用,則線程池大小設定為N+1
- 如果是IO密集型應用,則線程池大小設定為2N+1
線程等待時間所占比例越高,需要越多線程。線程CPU時間所占比例越高,需要越少線程。
【黃金公式】最佳線程數目 = (線程等待時間與線程CPU時間之比 + 1)* CPU數目
一個實際的計算過程(慕課網):
假設值
- tasks :每秒的任務數,假設為500~1000
- taskcost:每個任務花費時間,假設為0.1s
- responsetime:系統允許容忍的最大響應時間,假設為1s
計算
- corePoolSize = 每秒需要多少個線程處理?
threadcount = tasks/(1/taskcost) =taskstaskcout = (500~1000)0.1 = 50~100 個線程。corePoolSize設定應該大于50
根據8020原則,如果80%的每秒任務數小于800,那麼corePoolSize設定為80即可
- queueCapacity = (coreSizePool/taskcost)*responsetime
計算可得 queueCapacity = 80/0.1*1 = 80。意思是隊列裡的線程可以等待1s,超過了的需要新開線程來執行
切記不能設定為Integer.MAX_VALUE,這樣隊列會很大,線程數隻會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會随之陡增。
- maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
計算可得 maxPoolSize = (1000-80)/10 = 92
(最大任務數-隊列容量)/每個線程每秒處理能力 = 最大線程數