前言
談到java的線程池最熟悉的莫過于ExecutorService接口了,jdk1.5新增的java.util.concurrent包下的這個api,大大的簡化了多線程代碼的開發。而不論你用FixedThreadPool還是CachedThreadPool其背後實作都是ThreadPoolExecutor。
ThreadPoolExecutor是一個典型的緩存池化設計的産物,因為池子有大小,當池子體積不夠承載時,就涉及到拒絕政策。JDK中已經預設了4種線程池拒絕政策,下面結合場景詳細聊聊這些政策的使用場景,以及我們還能擴充哪些拒絕政策。
池化設計思想
池話設計應該不是一個新名詞。我們常見的如java線程池、jdbc連接配接池、redis連接配接池等就是這類設計的代表實作。
這種設計會初始預設資源,解決的問題就是抵消每次擷取資源的消耗,如建立線程的開銷,擷取遠端連接配接的開銷等。就好比你去食堂打飯,打飯的大媽會先把飯盛好幾份放那裡,你來了就直接拿着飯盒加菜即可,不用再臨時又盛飯又打菜,效率就高了。
除了初始化資源,池化設計還包括如下這些特征:池子的初始值、池子的活躍值、池子的最大值等,這些特征可以直接映射到java線程池和資料庫連接配接池的成員屬性中。推薦閱讀:教你如何監控 Java 線程池運作狀态。
線程池觸發拒絕政策的時機
和資料源連接配接池不一樣,線程池除了初始大小和池子最大值,還多了一個阻塞隊列來緩沖。
資料源連接配接池一般請求的連接配接數超過連接配接池的最大值的時候就會觸發拒絕政策,政策一般是阻塞等待設定的時間或者直接抛異常。
而線程池的觸發時機如下圖:
如圖,想要了解線程池什麼時候觸發拒絕粗略,需要明确上面三個參數的具體含義,是這三個參數總體協調的結果,而不是簡單的超過最大線程數就會觸發線程拒絕粗略,當送出的任務數大于corePoolSize時,會優先放到隊列緩沖區,隻有填滿了緩沖區後,才會判斷目前運作的任務是否大于maxPoolSize,小于時會建立線程處理。大于時就觸發了拒絕政策,總結就是:目前送出任務數大于(maxPoolSize + queueCapacity)時就會觸發線程池的拒絕政策了。推薦閱讀:java進階應用:線程池全面解析。
JDK内置4種線程池拒絕政策
拒絕政策接口定義
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
接口定義很明确,當觸發拒絕政策時,線程池會調用你設定的具體的政策,将目前送出的任務以及線程池執行個體本身傳遞給你處理,具體作何處理,不同場景會有不同的考慮,下面看JDK為我們内置了哪些實作:
CallerRunsPolicy(調用者運作政策)
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
功能:當觸發拒絕政策時,隻要線程池沒有關閉,就由送出任務的目前線程處理。
使用場景:一般在不允許失敗的、對性能要求不高、并發量較小的場景下使用,因為線程池一般情況下不會關閉,也就是送出的任務一定會被運作,但是由于是調用者線程自己執行的,當多次送出任務時,就會阻塞後續任務執行,性能和效率自然就慢了。
AbortPolicy(中止政策)
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
功能:當觸發拒絕政策時,直接抛出拒絕執行的異常,中止政策的意思也就是打斷目前執行流程
使用場景:這個就沒有特殊的場景了,但是一點要正确處理抛出的異常。
ThreadPoolExecutor中預設的政策就是AbortPolicy,ExecutorService接口的系列ThreadPoolExecutor因為都沒有顯示的設定拒絕政策,是以預設的都是這個。但是請注意,ExecutorService中的線程池執行個體隊列都是無界的,也就是說把記憶體撐爆了都不會觸發拒絕政策。當自己自定義線程池執行個體時,使用這個政策一定要處理好觸發政策時抛的異常,因為他會打斷目前的執行流程。
DiscardPolicy(丢棄政策)
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
功能:直接靜悄悄的丢棄這個任務,不觸發任何動作
使用場景:如果你送出的任務無關緊要,你就可以使用它 。因為它就是個空實作,會悄無聲息的吞噬你的的任務。是以這個政策基本上不用了
DiscardOldestPolicy(棄老政策)
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
功能:如果線程池未關閉,就彈出隊列頭部的元素,然後嘗試執行
使用場景:這個政策還是會丢棄任務,丢棄時也是毫無聲息,但是特點是丢棄的是老的未執行的任務,而且是待執行優先級較高的任務。基于這個特性,我能想到的場景就是,釋出消息,和修改消息,當消息釋出出去後,還未執行,此時更新的消息又來了,這個時候未執行的消息的版本比現在送出的消息版本要低就可以被丢棄了。因為隊列中還有可能存在消息版本更低的消息會排隊執行,是以在真正處理消息的時候一定要做好消息的版本比較。
第三方實作的拒絕政策
dubbo中的線程拒絕政策
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
//省略實作
}
}
可以看到,當dubbo的工作線程觸發了線程拒絕後,主要做了三個事情,原則就是盡量讓使用者清楚觸發線程拒絕政策的真實原因。
1)輸出了一條警告級别的日志,日志内容為線程池的詳細設定參數,以及線程池目前的狀态,還有目前拒絕任務的一些詳細資訊。可以說,這條日志,使用dubbo的有過生産運維經驗的或多或少是見過的,這個日志簡直就是日志列印的典範,其他的日志列印的典範還有spring。得益于這麼詳細的日志,可以很容易定位到問題所在
2)輸出目前線程堆棧詳情,這個太有用了,當你通過上面的日志資訊還不能定位問題時,案發現場的dump線程上下文資訊就是你發現問題的救命稻草。
3)繼續抛出拒絕執行異常,使本次任務失敗,這個繼承了JDK預設拒絕政策的特性
Netty中的線程池拒絕政策
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
Netty中的實作很像JDK中的CallerRunsPolicy,舍不得丢棄任務。不同的是,CallerRunsPolicy是直接在調用者線程執行的任務。而 Netty是建立了一個線程來處理的。
是以,Netty的實作相較于調用者執行政策的使用面就可以擴充到支援高效率高性能的場景了。但是也要注意一點,Netty的實作裡,在建立線程時未做任何的判斷限制,也就是說隻要系統還有資源就會建立新的線程來處理,直到new不出新的線程了,才會抛建立線程失敗的異常
activeMq中的線程池拒絕政策
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
});
activeMq中的政策屬于最大努力執行任務型,當觸發拒絕政策時,在嘗試一分鐘的時間重新将任務塞進任務隊列,當一分鐘逾時還沒成功時,就抛出異常
pinpoint中的線程池拒絕政策
public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {
private final RejectedExecutionHandler[] handlerChain;
public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {
Objects.requireNonNull(chain, "handlerChain must not be null");
RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);
return new RejectedExecutionHandlerChain(handlerChain);
}
private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {
this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {
rejectedExecutionHandler.rejectedExecution(r, executor);
}
}
}
pinpoint的拒絕政策實作很有特點,和其他的實作都不同。他定義了一個拒絕政策鍊,包裝了一個拒絕政策清單,當觸發拒絕政策時,會将政策鍊中的rejectedExecution依次執行一遍。
結語
前文從線程池設計思想,以及線程池觸發拒絕政策的時機引出java線程池拒絕政策接口的定義。并輔以JDK内置4種以及四個第三方開源軟體的拒絕政策定義描述了線程池拒絕政策實作的各種思路和使用場景。
希望閱讀此文後能讓你對java線程池拒絕政策有更加深刻的認識,能夠根據不同的使用場景更加靈活的應用。