天天看點

深度解析dubbo線程池技術

注:本文基于dubbo版本v2.6.1

1.介紹

當我們在使用dubbo的時候,是可以通過調整線程池來達到調優的效果,我們可以在

<dubbo:protocol>

标簽中使用用threadpool屬性選擇自己想要使用的線程池,通過threads屬性配置服務線程數,queues屬性配置使用的隊列。例如:

具體其他配置屬性請參考:官方文檔

dubbo線程在dubbo-common子產品的threadpool包下面,我們可以看一下它的包結構

深度解析dubbo線程池技術

接下來我們就一起看下具體源碼

2.ThreadPool接口

@SPI("fixed")
public interface ThreadPool {
    /**
     * Thread pool
     * @param url URL contains thread parameter
     * @return thread pool
     */
    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);

}
           

我們可以看到ThreadPool接口是個擴充點,然後預設實作是fixed,然後裡面有個getExecutor方法,被@Adaptive注解修飾。在dubbo中ThreadPool有4個實作類,分别是:

深度解析dubbo線程池技術
  1. CachedThreadPool 緩存線程池,超過keepAliveTime時間删除,使用的時候再建立
  2. FixedThreadPool 固定線程數量線程池,一旦建立,一直持有。
  3. LimitedThreadPool 可伸縮線程池,線程隻增長不收縮。
  4. EagerThreadPool 當core線程數忙的時候,建立新線程,而不是将任務放入阻塞隊列。這個使用自己隊列TaskQueue。

3.CachedThreadPool

public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        //帶有keepalive的
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
           
  1. core:核心線程數,預設是0;
  2. maxThread:最大線程數,預設是Integer.MAX_VALUE,可以看作是無限大。
  3. queues:如果queues=0,使用SynchronousQueue,如果是小于0,就是個new LinkedBlockingQueue 隊列,這個隊列大小是Integer.MAX_VALUE,這個檢視LinkedBlockingQueue源碼可以看到。如果是queues大于0 ,就建立queues大小的LinkedBlockingQueue,預設是0 。
  4. keepalive:最大空閑時間,預設是 60 * 1000ms ,也就是1分鐘。

    我們可以看到CachedThreadPool使用預設參數的話,就會無限建立線程,然後超過空閑時間,線程就會被銷毀,然後再使用的時候,就會再建立。

4.FixedThreadPool

/**
 * Creates a thread pool that reuses a fixed number of threads
 *
 * @see java.util.concurrent.Executors#newFixedThreadPool(int)
 */
public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 預設是200個線程
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

           
  1. core:核心線程數,預設是200
  2. maxThreads: 最大線程數。預設是200
  3. queues:當queues<0,使用一個無限大的LinkedBlockingQueue隊列,當queues>=0 的時候建立queues大小的LinkedBlockingQueue。預設是0,也是建立0大小的LinkedBlockingQueue隊列。
  4. keepalive:直接就是0,表示不銷毀。

    我們可以看出FixedThreadPool 建立固定大小的線程池,預設是200,期間不會銷毀,使用了FixedThreadPool線程池,keepalive配置也沒有作用。

5.LimitedThreadPool

/**
 * Creates a thread pool that creates new threads as needed until limits reaches. This thread pool will not shrink
 * automatically.
 *
 *
 * 可以控制基本線程數  與 最大線程數的
 */
public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);


        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

           
  1. core:核心線程數,預設是0
  2. maxThreads: 最大線程數,預設是200
  3. queues:當queues=0的時候使用SynchronousQueue,當queues < 0的時候建立一個無限大小LinkedBlockingQueue隊列,當queues>0的時候,建立queues大小的LinkedBlockingQueue隊列。預設是0
  4. keepalive:直接就是Long.MAX_VALUE。

    我們可以看到keepalive直接是最大的,也就是線程可以增大,但是不會收縮,原因是防止大流量請求過來,還得現建立線程。

6.EagerThreadPool

/**
 * EagerThreadPool
 * When the core threads are all in busy,
 * create new thread instead of putting task into blocking queue.
 */
public class EagerThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

        // init queue and executor
        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedInternalThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }
}

           

我們可以上面的注釋,當core線程都很忙的時候,建立新線程,而不是将任務放入阻塞隊列。

EagerThreadPool與上面幾個線程池不一樣的地方就是使用了自定義的EagerThreadPoolExecutor與自定義的taskQueue。我們先來看下線程池參數。

  1. core:核心線程數,預設0
  2. maxThreads:最大線程數,預設是Integer.MAX_VALUE
  3. queue: 當queue>0的時候建立大小queue的TaskQueue,queue<=0的時候,就是0大小TaskQueue。預設0
  4. keepalive:空閑時間,預設60 * 1000ms,也就是一分鐘。

    我們來看下TaskQueue隊列源碼

/**
 * TaskQueue in the EagerThreadPoolExecutor
 * It offer a task if the executor's submittedTaskCount less than currentPoolThreadSize
 * or the currentPoolThreadSize more than executor's maximumPoolSize.
 * That can make the executor create new worker
 * when the task num is bigger than corePoolSize but less than maximumPoolSize.
 */
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;

    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // currentPoolThreadSize >= max
        return super.offer(runnable);
    }

    /**
     * retry offer task
     *
     * @param o task
     * @return offer success or not
     * @throws RejectedExecutionException if executor is terminated.
     */
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}

           

我們可以看到TaskQueue 繼承LinkedBlockingQueue隊列,然後重寫了offer方法。我們看下這個方法,首先判斷EagerThreadPoolExecutor對象是否存在,然後判斷線程池目前的任務小于線程池大小,就說明空閑等待的線程,這時候,将任務放入隊列,然後讓線程去處理。如果目前的任務有很多,這時候判斷目前線程數小于最大線程數的時候讓線程池去建立線程,這些條件都不滿足的時候才往隊列裡扔。

我們可以縷縷這個流程,當core有閑着的線程的時候,扔隊列中讓空閑線程處理,沒有空閑線程的時候先建立線程,直到線程數到達最大線程數,這時候才會往隊列裡面扔。我們在源碼中看到getSubmittedTaskCount這個方法,這個方法其實dubbo自定義實作ThreadPoolExecutor來維護的一個計數器。我們可以看下EagerThreadPoolExecutor的源碼。

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * task count
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * @return current tasks which are executed
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.");
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
        }
    }
}

           

我們可以看到這個類裡面維護了一個AtomicInteger類型的計數器。并重寫了execute方法跟afterExecute方法。當來一個任務的時候,計數器先自增長1,然後任務交給父類處理,處理完會調用afterExecute方法,計數器自減1。父類拒絕的時候,重新往隊列裡offer,沒成功的話計數器自減1,并抛出拒絕政策。中斷異常的時候也是減1,抛出拒絕政策。父類抛出其他異常的時候也都是減1。

繼續閱讀