注:本文基于dubbo版本v2.6.1
1.介紹
當我們在使用dubbo的時候,是可以通過調整線程池來達到調優的效果,我們可以在
<dubbo:protocol>
标簽中使用用threadpool屬性選擇自己想要使用的線程池,通過threads屬性配置服務線程數,queues屬性配置使用的隊列。例如:
具體其他配置屬性請參考:官方文檔
dubbo線程在dubbo-common子產品的threadpool包下面,我們可以看一下它的包結構
接下來我們就一起看下具體源碼
2.ThreadPool接口
@SPI("fixed")
public interface ThreadPool {
/**
* Thread pool
* @param url URL contains thread parameter
* @return thread pool
*/
@Adaptive({Constants.THREADPOOL_KEY})
Executor getExecutor(URL url);
}
我們可以看到ThreadPool接口是個擴充點,然後預設實作是fixed,然後裡面有個getExecutor方法,被@Adaptive注解修飾。在dubbo中ThreadPool有4個實作類,分别是:
- CachedThreadPool 緩存線程池,超過keepAliveTime時間删除,使用的時候再建立
- FixedThreadPool 固定線程數量線程池,一旦建立,一直持有。
- LimitedThreadPool 可伸縮線程池,線程隻增長不收縮。
- EagerThreadPool 當core線程數忙的時候,建立新線程,而不是将任務放入阻塞隊列。這個使用自己隊列TaskQueue。
3.CachedThreadPool
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
//帶有keepalive的
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
- core:核心線程數,預設是0;
- maxThread:最大線程數,預設是Integer.MAX_VALUE,可以看作是無限大。
- queues:如果queues=0,使用SynchronousQueue,如果是小于0,就是個new LinkedBlockingQueue 隊列,這個隊列大小是Integer.MAX_VALUE,這個檢視LinkedBlockingQueue源碼可以看到。如果是queues大于0 ,就建立queues大小的LinkedBlockingQueue,預設是0 。
-
keepalive:最大空閑時間,預設是 60 * 1000ms ,也就是1分鐘。
我們可以看到CachedThreadPool使用預設參數的話,就會無限建立線程,然後超過空閑時間,線程就會被銷毀,然後再使用的時候,就會再建立。
4.FixedThreadPool
/**
* Creates a thread pool that reuses a fixed number of threads
*
* @see java.util.concurrent.Executors#newFixedThreadPool(int)
*/
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 預設是200個線程
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
- core:核心線程數,預設是200
- maxThreads: 最大線程數。預設是200
- queues:當queues<0,使用一個無限大的LinkedBlockingQueue隊列,當queues>=0 的時候建立queues大小的LinkedBlockingQueue。預設是0,也是建立0大小的LinkedBlockingQueue隊列。
-
keepalive:直接就是0,表示不銷毀。
我們可以看出FixedThreadPool 建立固定大小的線程池,預設是200,期間不會銷毀,使用了FixedThreadPool線程池,keepalive配置也沒有作用。
5.LimitedThreadPool
/**
* Creates a thread pool that creates new threads as needed until limits reaches. This thread pool will not shrink
* automatically.
*
*
* 可以控制基本線程數 與 最大線程數的
*/
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
- core:核心線程數,預設是0
- maxThreads: 最大線程數,預設是200
- queues:當queues=0的時候使用SynchronousQueue,當queues < 0的時候建立一個無限大小LinkedBlockingQueue隊列,當queues>0的時候,建立queues大小的LinkedBlockingQueue隊列。預設是0
-
keepalive:直接就是Long.MAX_VALUE。
我們可以看到keepalive直接是最大的,也就是線程可以增大,但是不會收縮,原因是防止大流量請求過來,還得現建立線程。
6.EagerThreadPool
/**
* EagerThreadPool
* When the core threads are all in busy,
* create new thread instead of putting task into blocking queue.
*/
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
我們可以上面的注釋,當core線程都很忙的時候,建立新線程,而不是将任務放入阻塞隊列。
EagerThreadPool與上面幾個線程池不一樣的地方就是使用了自定義的EagerThreadPoolExecutor與自定義的taskQueue。我們先來看下線程池參數。
- core:核心線程數,預設0
- maxThreads:最大線程數,預設是Integer.MAX_VALUE
- queue: 當queue>0的時候建立大小queue的TaskQueue,queue<=0的時候,就是0大小TaskQueue。預設0
-
keepalive:空閑時間,預設60 * 1000ms,也就是一分鐘。
我們來看下TaskQueue隊列源碼
/**
* TaskQueue in the EagerThreadPoolExecutor
* It offer a task if the executor's submittedTaskCount less than currentPoolThreadSize
* or the currentPoolThreadSize more than executor's maximumPoolSize.
* That can make the executor create new worker
* when the task num is bigger than corePoolSize but less than maximumPoolSize.
*/
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
我們可以看到TaskQueue 繼承LinkedBlockingQueue隊列,然後重寫了offer方法。我們看下這個方法,首先判斷EagerThreadPoolExecutor對象是否存在,然後判斷線程池目前的任務小于線程池大小,就說明空閑等待的線程,這時候,将任務放入隊列,然後讓線程去處理。如果目前的任務有很多,這時候判斷目前線程數小于最大線程數的時候讓線程池去建立線程,這些條件都不滿足的時候才往隊列裡扔。
我們可以縷縷這個流程,當core有閑着的線程的時候,扔隊列中讓空閑線程處理,沒有空閑線程的時候先建立線程,直到線程數到達最大線程數,這時候才會往隊列裡面扔。我們在源碼中看到getSubmittedTaskCount這個方法,這個方法其實dubbo自定義實作ThreadPoolExecutor來維護的一個計數器。我們可以看下EagerThreadPoolExecutor的源碼。
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
}
}
}
我們可以看到這個類裡面維護了一個AtomicInteger類型的計數器。并重寫了execute方法跟afterExecute方法。當來一個任務的時候,計數器先自增長1,然後任務交給父類處理,處理完會調用afterExecute方法,計數器自減1。父類拒絕的時候,重新往隊列裡offer,沒成功的話計數器自減1,并抛出拒絕政策。中斷異常的時候也是減1,抛出拒絕政策。父類抛出其他異常的時候也都是減1。