死磕Tomcat系列(2)——EndPoint源碼解析
在上一節中我們描述了Tomcat的整體架構,我們知道了Tomcat分為兩個大元件,一個連接配接器和一個容器。而我們這次要講的
EndPoint
的元件就是屬于連接配接器裡面的。它是一個通信的端點,就是負責對外實作TCP/IP協定。
EndPoint
是個接口,它的具體實作類就是
AbstractEndpoint
,而
AbstractEndpoint
具體的實作類就有
AprEndpoint
、
Nio2Endpoint
NioEndpoint
。
-
:對應的是APR模式,簡單了解就是從作業系統級别解決異步IO的問題,大幅度提高伺服器的處理和響應性能。但是啟用這種模式需要安裝一些其他的依賴庫。AprEndpoint
-
:利用代碼來實作異步IONio2Endpoint
-
:利用了JAVA的NIO實作了非阻塞IO,Tomcat預設啟動是以這個來啟動的,而這個也是我們的講述重點。NioEndpoint
NioEndpoint中重要的元件
我們知道
NioEndpoint
的原理還是對于Linux的多路複用器的使用,而在多路複用器中簡單來說就兩個步驟。
- 建立一個Selector,在它身上注冊各種Channel,然後調用select方法,等待通道中有感興趣的事件發生。
- 如果有感興趣的事情發生了,例如是讀事件,那麼就将資訊從通道中讀取出來。
而
NioEndpoint
為了實作上面這兩步,用了五個元件來。這五個元件是
LimitLatch
Acceptor
Poller
SocketProcessor
Executor
/**
* Threads used to accept new connections and pass them to worker threads.
*/
protected List<Acceptor<U>> acceptors;
/**
* counter for nr of connections handled by an endpoint
*/
private volatile LimitLatch connectionLimitLatch = null;
/**
* The socket pollers.
*/
private Poller[] pollers = null;
内部類
SocketProcessor
/**
* External Executor based thread pool.
*/
private Executor executor = null;
我們可以看到在代碼中定義的這五個元件。具體這五個元件是幹嘛的呢?
-
:連接配接控制器,負責控制最大的連接配接數LimitLatch
-
:負責接收新的連接配接,然後傳回一個Acceptor
對象給Channel
Poller
-
:可以将其看成是NIO中Poller
,負責監控Selector
的狀态Channel
-
:可以看成是一個被封裝的任務類SocketProcessor
-
:Tomcat自己擴充的線程池,用來執行任務類Executor
用圖簡單表示就是以下的關系
接下來我們就來分别的看一下每個元件裡面關鍵的代碼
LimitLatch
我們上面說了
LimitLatch
主要是用來控制Tomcat所能接收的最大數量連接配接,如果超過了此連接配接,那麼Tomcat就會将此連接配接線程阻塞等待,等裡面有其他連接配接釋放了再消費此連接配接。那麼
LimitLatch
是如何做到呢?我們可以看
LimitLatch
這個類
public class LimitLatch {
private static final Log log = LogFactory.getLog(LimitLatch.class);
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int ignored) {
long newCount = count.incrementAndGet();
if (!released && newCount > limit) {
// Limit exceeded
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync;
//目前連接配接數
private final AtomicLong count;
//最大連接配接數
private volatile long limit;
private volatile boolean released = false;
}
我們可以看到它内部實作了
AbstractQueuedSynchronizer
,AQS其實就是一個架構,實作它的類可以自定義控制線程什麼時候挂起什麼時候釋放。
limit
參數就是控制的最大連接配接數。我們可以看到
AbstractEndpoint
調用
LimitLatch
的
countUpOrAwait
方法來判斷是否能擷取連接配接。
public void countUpOrAwait() throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
}
sync.acquireSharedInterruptibly(1);
}
AQS是如何知道什麼時候阻塞線程呢?即不能擷取連接配接呢?這些就靠使用者自己實作
AbstractQueuedSynchronizer
自己來定義什麼時候擷取連接配接,什麼時候釋放連接配接了。可以看到Sync類重寫了
tryAcquireShared
和
tryReleaseShared
方法。在
tryAcquireShared
方法中定義了一旦目前連接配接數大于了設定的最大連接配接數,那麼就會傳回
-1
表示将此線程放入AQS隊列中等待。
Acceptor
Acceptor
是接收連接配接的,我們可以看到
Acceptor
實作了
Runnable
接口,那麼在哪會新開啟線程來執行
Acceptor
的run方法呢?在
AbstractEndpoint
startAcceptorThreads
方法中。
protected void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
可以看到這裡可以設定開啟幾個
Acceptor
,預設是一個。而一個端口隻能對應一個
ServerSocketChannel
,那麼這個
ServerSocketChannel
在哪初始化呢?我們可以看到在
Acceptor<U> acceptor = new Acceptor<>(this);
這句話中傳入了this進去,那麼應該是由
Endpoint
元件初始化的連接配接。在
NioEndpoint
initServerSocket
方法中初始化了連接配接。
// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
這裡面我們能夠看到兩點
- 在bind方法中的第二個參數表示作業系統的等待隊列長度,即Tomcat不再接受連接配接時(達到了設定的最大連接配接數),但是在作業系統層面還是能夠接受連接配接的,此時就将此連接配接資訊放入等待隊列,那麼這個隊列的大小就是此參數設定的。
-
被設定成了阻塞的模式,也就是說是以阻塞方式接受連接配接的。或許會有疑問。在平時的NIO程式設計中Channel不是都要設定成非阻塞模式嗎?這裡解釋一下,如果是設定成非阻塞模式那麼就必須設定一個ServerSocketChannel
不斷的輪詢,但是接受連接配接隻需要阻塞一個通道即可。Selector
這裡需要注意一點,每個
Acceptor
在生成
PollerEvent
對象放入
Poller
隊列中時都是随機取出
Poller
對象的,具體代碼可以看如下,是以
Poller
中的
Queue
對象設定成了
SynchronizedQueue<PollerEvent>
,因為可能有多個
Acceptor
同時向此
Poller
的隊列中放入
PollerEvent
對象。
public Poller getPoller0() {
if (pollerThreadCount == 1) {
return pollers[0];
} else {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
}
什麼是作業系統級别的連接配接呢?在TCP的三次握手中,系統通常會每一個LISTEN狀态的Socket維護兩個隊列,一個是半連接配接隊列(SYN):這些連接配接已經收到用戶端SYN;另一個是全連接配接隊列(ACCEPT):這些連結已經收到用戶端的ACK,完成了三次握手,等待被應用調用accept方法取走使用。
所有的
Acceptor
共用這一個連接配接,在
Acceptor
run
方法中,放一些重要的代碼。
@Override
public void run() {
// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
try {
//如果到了最大連接配接數,線程等待
endpoint.countUpOrAwaitConnection();
U socket = null;
try {
//調用accept方法獲得一個連接配接
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// 出異常以後目前連接配接數減掉1
endpoint.countDownConnection();
}
// 配置Socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
}
裡面我們可以得到兩點
- 運作時會先判斷是否到達了最大連接配接數,如果到達了那麼就阻塞線程等待,裡面調用的就是
元件判斷的。LimitLatch
- 最重要的就是配置socket這一步了,是
這段代碼endpoint.setSocketOptions(socket)
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
// 設定Socket為非阻塞模式,供Poller調用
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//注冊ChannelEvent,其實是将ChannelEvent放入到隊列中,然後Poller從隊列中取
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
其實裡面重要的就是将
Acceptor
與一個
Poller
綁定起來,然後兩個元件通過隊列通信,每個Poller都維護着一個
SynchronizedQueue
隊列,
ChannelEvent
放入到隊列中,然後
Poller
從隊列中取出事件進行消費。
Poller
我們可以看到
Poller
是
NioEndpoint
的内部類,而它也是實作了
Runnable
接口,可以看到在其類中維護了一個Quene和Selector,定義如下。是以本質上
Poller
就是
Selector
private Selector selector;
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
重點在其run方法中,這裡删減了一些代碼,隻展示重要的。
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
//檢視是否有連接配接進來,如果有就将Channel注冊進Selector中
hasEvents = events();
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
其中主要的就是調用了
events()
方法,就是不斷的檢視隊列中是否有
Pollerevent
事件,如果有的話就将其取出然後把裡面的
Channel
取出來注冊到該
Selector
中,然後不斷輪詢所有注冊過的
Channel
檢視是否有事件發生。
SocketProcessor
Poller
在輪詢
Channel
有事件發生時,就會調用将此事件封裝起來,然後交給線程池去執行。那麼這個包裝類就是
SocketProcessor
。而我們打開此類,能夠看到它也實作了
Runnable
接口,用來定義線程池
Executor
中線程所執行的任務。那麼這裡是如何将
Channel
中的位元組流轉換為Tomcat需要的
ServletRequest
對象呢?其實就是調用了
Http11Processor
來進行位元組流與對象的轉換的。
Executor
Executor
其實是Tomcat定制版的線程池。我們可以看它的類的定義,可以發現它其實是擴充了Java的線程池。
public interface Executor extends java.util.concurrent.Executor, Lifecycle
線上程池中最重要的兩個參數就是核心線程數和最大線程數,正常的Java線程池的執行流程是這樣的。
- 如果目前線程小于核心線程數,那麼來一個任務就建立一個線程。
- 如果目前線程大于核心線程數,那麼就再來任務就将任務放入到任務隊列中。所有線程搶任務。
- 如果隊列滿了,那麼就開始建立臨時線程。
- 如果總線程數到了最大的線程數并且隊列也滿了,那麼就抛出異常。
但是在Tomcat自定義的線程池中是不一樣的,通過重寫了
execute
方法實作了自己的任務處理邏輯。
- 如果總線程數到了最大的線程數,再次獲得任務隊列,再嘗試一次将任務加入隊列中。
- 如果此時還是滿的,就抛異常。
差别就在于第四步的差别,原生線程池的處理政策是隻要目前線程數大于最大線程數,那麼就抛異常,而Tomcat的則是如果目前線程數大于最大線程數,就再嘗試一次,如果還是滿的才會抛異常。下面是定制化線程池
execute
的執行邏輯。
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
//獲得任務隊列
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
在代碼中,我們可以看到有這麼一句
submittedCount.incrementAndGet();
,為什麼會有這句呢?我們可以看看這個參數的定義。簡單來說這個參數就是定義了任務已經送出到了線程池中,但是還沒有執行的任務個數。
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet.
* This number is always greater or equal to {@link #getActiveCount()}.
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);
為什麼會有這麼一個參數呢?我們知道定制的隊列是繼承了
LinkedBlockingQueue
LinkedBlockingQueue
隊列預設是沒有邊界的。于是我們就傳入了一個參數,
maxQueueSize
給構造的隊列。但是在Tomcat的任務隊列預設情況下是無限制的,那麼這樣就會出一個問題,如果目前線程達到了核心線程數,則開始向隊列中添加任務,那麼就會一直是添加成功的。那麼就不會再建立新的線程。那麼在什麼情況下要建立線程呢?
線程池中建立新線程會有兩個地方,一個是小于核心線程時,來一個任務建立一個線程。另一個是超過核心線程并且任務隊列已滿,則會建立臨時線程。
那麼如何規定任務隊列是否已滿呢?如果設定了隊列的最大長度當然好了,但是Tomcat預設情況下是沒有設定,是以預設是無限的。是以Tomcat的
TaskQueue
繼承了
LinkedBlockingQueue
,重寫了
offer
方法,在裡面定義了什麼時候傳回false。
@Override
public boolean offer(Runnable o) {
if (parent==null) return super.offer(o);
//如果目前線程數等于最大線程數,此時不能建立新線程,隻能添加進任務隊列中
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//如果已送出但是未完成的任務數小于等于目前線程數,說明能處理過來,就放入隊列中
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//到這一步說明,已送出但是未完成的任務數大于目前線程數,如果目前線程數小于最大線程數,就傳回false建立線程
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
return super.offer(o);
}
這就是
submittedCount
的意義,目的就是為了在任務隊列長度無限的情況下,讓線程池有機會建立新的線程。
總結
上面的知識有部分是看着李号雙老師的深入拆解Tomcat總結的,又結合着源碼深入了解了一下,當時剛看文章的時候覺得自己都懂了,但是再深入源碼的時候又會發現自己不懂。是以知識如果隻是看了而不運用,那麼知識永遠都不會是自己的。通過Tomcat連接配接器這一小塊的源碼學習,除了一些常用知識的實際運用,例如AQS、鎖的應用、自定義線程池需要考慮的點、NIO的應用等等。還有總體上的設計思維的學習,子產品化設計,和如今的微服務感覺很相似,将一個功能點内部分為多種子產品,這樣無論是在以後替換或者是更新時都能遊刃有餘。