天天看點

線程池之深度了解-ThreadPoolExecutor

代碼的世界和現實的世界還真是像,解決辦法也真一樣。

最近遇到個交通阻塞的問題。

場景:

服務A 提供了兩個接口,get和delete

問題:

兩個接口共享一個出口(如同時最多有20個連接配接),當其中一個接口響應慢的時候,會把幾乎全部的出口帶寬占完,這樣會導緻原本正常的get接口得不到響應,用戶端在調用的時候,總是連接配接不上。

看起來其實有點像家用的路由器似的,3個人共享上網,當其中一個人占用了很大的帶寬的時候,會導緻另外連個人不能正常上網。

解決辦法

也有點像路由器似的,可以根據不同的ip進行限制帶寬。

服務A 也可以根據不同的接口限制多少的最大連接配接數

代碼還原

由于項目用到了netty,裡面有個executionhandler的概念,大意也就是用異步的方式(線程池)執行比較耗時的業務代碼。

executeThreadPool = new ThreadPoolExecutor(minThreadPoolSize,//8

maxThreadPoolSize, 0L, TimeUnit.MILLISECONDS, // 200

new LinkedBlockingQueue<Runnable>(maxQueueBlockSize)); //1000

初步看起來感覺沒什麼問題,但真正運作起來後,會發現當并發數達到 20 個時候,發現進入業務代碼裡面的線程也就隻有8個,而其餘的12個是放在了queue裡面了,等待核心線程去執行。

這就是問題所在,和我們的初衷并不一樣(最多啟動200個線程去執行客戶的請求,多的時候在放到隊列裡面等待執行)

還是繼續跟到ThreadPoolExecutor裡面看吧

之前簡單的寫過

http://blog.csdn.net/chaofanwei/article/details/17286619

先看以下代碼,感覺sun的工程師搞的挺好玩的,也算是腦洞大開啊

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;   //29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //536870911   00011111111111111111111111111111


    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; //-536870912   11100000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS; //0   00000000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS; //536870912   00100000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS; //1073741824   01000000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS; //1610612736   01100000000000000000000000000000


    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; } //根據int c傳回表示的狀态
    private static int workerCountOf(int c)  { return c & CAPACITY; }  //根據int c傳回表示的線程個數
    private static int ctlOf(int rs, int wc) { return rs | wc; }  // 用指定的狀态和線程個數初始化 int  c
           

目的就是用一個int類型的數,前面3位用來标示目前線程池的狀态(RUNNING,SHUTDOWN ...),後面的29位表示目前線程池裡面的線程個數。

再來看核心的接口execute方法内部實作

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //如果目前線程數小于core線程數,則直接啟動新的線程并把command當做線程的第一個任務來執行
            if (addWorker(command, true)) //啟動新線程,成功的話,直接傳回
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //①
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0) //沒有線程的話,則新啟動一個線程
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
           

上面代碼①意思

試圖把command塞到隊列裡面,看是否能賽成功,這個地方根據queue的不同又分為兩種情況,LinkedBlockingQueue和SynchronousQueue

LinkedBlockingQueue

應用場景就是

Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory){

 return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue<Runnable>(),

                                      threadFactory);

}

這種情況下,如果隊列沒有滿的話,則直接塞到裡面就傳回true了,但并不會啟動新的線程去執行此task,而是等待别的線程空閑下來時候再從隊列裡面取出來執行。

隻有當此隊列滿了,offer傳回失敗,才會走到下面的else語句塊裡面,試圖啟動新的線程去執行。

一句話總結也就是優先啟動corePoolSize個線程,然後多的任務放到隊列裡面等待corePoolSize的線程去執行,隻有當隊列滿的時候,才會啟動最多maximumPoolSize的線程去執行任務。

SynchronousQueue

應用場景就是

Executors.newCachedThreadPool() {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                      60L, TimeUnit.SECONDS,

                                      new SynchronousQueue<Runnable>());

    }

SynchronousQueue用一句話來說就是相當于隻能裝一個entry的阻塞隊列,也相當于一個transfer,即隻有把offer進來的entry給傳遞到另外一個線程的時候才傳回true(即如果剛好有另外一個線程在等待接收的時候才傳回true)

這種情況下就是,如果offer成功了,就表示已經有另外一個線程在執行了,如果傳回失敗,表示所有的線程都在忙碌,就走到了下面的else語句塊裡面,嘗試(總是成功)啟動新的線程去執行此任務。

代碼裡面的坑太多,需要一個一個去填平!

繼續閱讀