代碼的世界和現實的世界還真是像,解決辦法也真一樣。
最近遇到個交通阻塞的問題。
場景:
服務A 提供了兩個接口,get和delete
問題:
兩個接口共享一個出口(如同時最多有20個連接配接),當其中一個接口響應慢的時候,會把幾乎全部的出口帶寬占完,這樣會導緻原本正常的get接口得不到響應,用戶端在調用的時候,總是連接配接不上。
看起來其實有點像家用的路由器似的,3個人共享上網,當其中一個人占用了很大的帶寬的時候,會導緻另外連個人不能正常上網。
解決辦法
也有點像路由器似的,可以根據不同的ip進行限制帶寬。
服務A 也可以根據不同的接口限制多少的最大連接配接數
代碼還原
由于項目用到了netty,裡面有個executionhandler的概念,大意也就是用異步的方式(線程池)執行比較耗時的業務代碼。
executeThreadPool = new ThreadPoolExecutor(minThreadPoolSize,//8
maxThreadPoolSize, 0L, TimeUnit.MILLISECONDS, // 200
new LinkedBlockingQueue<Runnable>(maxQueueBlockSize)); //1000
初步看起來感覺沒什麼問題,但真正運作起來後,會發現當并發數達到 20 個時候,發現進入業務代碼裡面的線程也就隻有8個,而其餘的12個是放在了queue裡面了,等待核心線程去執行。
這就是問題所在,和我們的初衷并不一樣(最多啟動200個線程去執行客戶的請求,多的時候在放到隊列裡面等待執行)
還是繼續跟到ThreadPoolExecutor裡面看吧
之前簡單的寫過
http://blog.csdn.net/chaofanwei/article/details/17286619
先看以下代碼,感覺sun的工程師搞的挺好玩的,也算是腦洞大開啊
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //536870911 00011111111111111111111111111111
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //-536870912 11100000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS; //0 00000000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS; //536870912 00100000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS; //1073741824 01000000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS; //1610612736 01100000000000000000000000000000
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } //根據int c傳回表示的狀态
private static int workerCountOf(int c) { return c & CAPACITY; } //根據int c傳回表示的線程個數
private static int ctlOf(int rs, int wc) { return rs | wc; } // 用指定的狀态和線程個數初始化 int c
目的就是用一個int類型的數,前面3位用來标示目前線程池的狀态(RUNNING,SHUTDOWN ...),後面的29位表示目前線程池裡面的線程個數。
再來看核心的接口execute方法内部實作
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //如果目前線程數小于core線程數,則直接啟動新的線程并把command當做線程的第一個任務來執行
if (addWorker(command, true)) //啟動新線程,成功的話,直接傳回
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //①
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0) //沒有線程的話,則新啟動一個線程
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
上面代碼①意思
試圖把command塞到隊列裡面,看是否能賽成功,這個地方根據queue的不同又分為兩種情況,LinkedBlockingQueue和SynchronousQueue
LinkedBlockingQueue
應用場景就是
Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory){
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
這種情況下,如果隊列沒有滿的話,則直接塞到裡面就傳回true了,但并不會啟動新的線程去執行此task,而是等待别的線程空閑下來時候再從隊列裡面取出來執行。
隻有當此隊列滿了,offer傳回失敗,才會走到下面的else語句塊裡面,試圖啟動新的線程去執行。
一句話總結也就是優先啟動corePoolSize個線程,然後多的任務放到隊列裡面等待corePoolSize的線程去執行,隻有當隊列滿的時候,才會啟動最多maximumPoolSize的線程去執行任務。
SynchronousQueue
應用場景就是
Executors.newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
SynchronousQueue用一句話來說就是相當于隻能裝一個entry的阻塞隊列,也相當于一個transfer,即隻有把offer進來的entry給傳遞到另外一個線程的時候才傳回true(即如果剛好有另外一個線程在等待接收的時候才傳回true)
這種情況下就是,如果offer成功了,就表示已經有另外一個線程在執行了,如果傳回失敗,表示所有的線程都在忙碌,就走到了下面的else語句塊裡面,嘗試(總是成功)啟動新的線程去執行此任務。
代碼裡面的坑太多,需要一個一個去填平!