NioEventLoop處理io的一些操作方法
io.netty.channel.nio.NioEventLoop#run
/**
* 啟動服務 輪詢selector
*/
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// 重置 wakenUp 标記為 false
//wakenUp辨別目前selector是否是喚醒狀态
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = ;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == ) {
try {
//處理輪詢到的key
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//啟動task
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * ( - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
...
}
}
這個run方法是在NioEventLoop線程啟動的時候調用的,這裡面主要:
- 輪詢準備好的key
- 處理輪訓出來的key
- 運作task任務
io.netty.channel.nio.NioEventLoop#select
/**
* 輪詢selector方法
* @param oldWakenUp
* @throws IOException
*/
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = ;
long currentTimeNanos = System.nanoTime();
//delayNanos(currentTimeNanos)計算目前定時任務隊列第一個任務的延遲時間
//select的時間不能超過selectDeadLineNanos這個時間
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + L) / L;
if (timeoutMillis <= ) {
//如果逾時 并且第一次輪詢
//那麼就再進行一次非阻塞的select 然後break結束輪詢
if (selectCnt == ) {
selector.selectNow();
selectCnt = ;
}
break;
}
//nioEventLoop有任務在進行,那麼就進行一次非阻塞的select 然後break結束輪詢
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = ;
break;
}
//定時任務為空 截止時間沒到,進行阻塞select
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = ;
break;
}
long time = System.nanoTime();
/**
* time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
* 屬于正常的 證明我進行了一次阻塞select
* else :屬于空輪訓 就是進行了阻塞select但是立即傳回了,并沒有阻塞
*/
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = ;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//當輪詢次數超過一個門檻值 512 要解決jdk的空輪訓bug了
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
//重新建立一個selector 把oldSelector上的key指派給他
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = ;
break;
}
currentTimeNanos = time;
}
...
}
這個方法又一個jdk空輪訓的方法,這個是nio原生就存在的bug,就是進行select操作立即傳回,并沒有阻塞式的select,是以for循環方法會一直走下去,這樣就會導緻cpu占用率很高,當輪詢次數超過一個門檻值 512 要解決jdk的空輪訓bug了,下面我們來分析解決空輪訓怎麼實作
/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
* 解決jdk空輪訓導緻cpu占用100%的bug
* 方案:
* 建立新的selector 把oldSelector上面的key都遷移到新的上面
*/
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
這裡會判斷目前執行的線程是否在NioEventLoop的線程中,如果不在NioEventLoop線程,則添加到task隊列等待執行
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//判斷正在執行任務的線程是否是eventloop已經儲存的線程
boolean inEventLoop = inEventLoop();
addTask(task);
//如果不是在eventloop線程證明還沒啟動輪詢和任務排程run方法
//則啟動線程
if (!inEventLoop) {
//啟動線程
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
這裡判斷不是在NioEventLoop線程的話就去啟動線程,那麼會每個不是在NioEventLoop線程執行的任務都去啟動線程麼?答案是否定的,且看startThread方法
private void startThread() {
//狀态值是未啟動過
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
//真正啟動的邏輯
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
下面我們來看真正啟動線程的邏輯
private void doStartThread() {
assert thread == null;
//交給線程執行器去執行代碼 ThreadPerTaskExecutor
executor.execute(new Runnable() {
@Override
public void run() {
//儲存目前線程異步任務的線程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//啟動NioEventLoop的run()方法 進行selector輪詢
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
.....
}
這裡會把目前啟動的線程儲存到成員變量thread,後續判斷是否是NioEventLoop線程也是比較線程是否是一個,然後調用NioEventLoop的run方法,是以run方法執行的代碼都是在NioEventLoop線程裡面運作的
回頭再看io.netty.channel.nio.NioEventLoop#rebuildSelector0
/**
* 重新建構selector
*/
private void rebuildSelector0() {
//儲存舊的selector
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
//建立新的selector 建立過程也會走優化的流程
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// 把所有的 channels 注冊到新的 Selector.
int nChannels = ;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
//key失效 跳過
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
//注冊的關心事件
int interestOps = key.interestOps();
key.cancel();
//取出來key裡面的channel 注冊到新的selector裡面 傳回新的selectionKey
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey 如果attachment是AbstractNioChannel
//那麼将AbstractNioChannel上的selectionKey也更新一下
((AbstractNioChannel) a).selectionKey = newKey;
}
//channel的數量增加
nChannels ++;
} catch (Exception e) {
....
}
//把新的selector引用傳遞給成員變量
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
//關閉舊的selector
oldSelector.close();
} catch (Throwable t) {
....
}
}
這裡主要就是為了解決空輪訓的方法,建立一個新的selector,把oldSelector上面的key都注冊到新的上面,這樣就解決了
承接上文,解決完空輪訓,也輪詢除了key,那麼接着就是處理key了
io.netty.channel.nio.NioEventLoop#processSelectedKeys
/**
* selectedKeys不為null證明我們用的是優化後的selector
* 那麼就按照優化後的邏輯去處理
*/
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
這裡主要是對優化和未優化的selector分别做的處理,大緻邏輯都一樣,隻不過因為底層selectKeys的資料結構不一樣,周遊方法不一樣,我們隻分析processSelectedKeysOptimized
/**
* 處理優化後的selector,selectedKeys是一個數組
*/
private void processSelectedKeysOptimized() {
for (int i = ; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
//拿到selectedKey,把數組的引用置為null
selectedKeys.keys[i] = null;
//拿到key的attachment 也就是AbstractNioChannel
final Object a = k.attachment();
//如果是AbstractNioChannel
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + );
selectAgain();
i = -;
}
}
}
NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey,io.netty.channel.nio.AbstractNioChannel)
/**
* 處理select出來的key
* @param k SelectionKey
* @param ch AbstractNioChannel注冊進去attachment
*/
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//拿到Unsafe對象 NioMessageUnsafe是服務端的channel操作對象
//NioByteUnsafe 用戶端的channel操作對象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//key失效
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
//拿到key的事件辨別
int readyOps = k.readyOps();
//連接配接事件
if ((readyOps & SelectionKey.OP_CONNECT) != ) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//寫事件
if ((readyOps & SelectionKey.OP_WRITE) != ) {
ch.unsafe().forceFlush();
}
/**
* 讀事件和 accept事件都會經過這裡,但是拿到的unsafe對象不同 是以後續執行的read操作也不一樣
* NioServerChannel進行accept操作
* NioChannel進行read操作
*/
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != || readyOps == ) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
我們分析當服務端channel accept到新的連接配接時候unsafe.read();走的是NioMessageUnsafe,當用戶端連接配接read事件到來時候,走的是NioByteUnsafe.read()方法
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
//拿到channel處理的pipline
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//readBuf是一個集合 來儲存accept出的channel
//具體read操作
int localRead = doReadMessages(readBuf);
if (localRead == ) {
break;
}
if (localRead < ) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = ; i < size; i ++) {
readPending = false;
//調用pipline的read方法通知事件
pipeline.fireChannelRead(readBuf.get(i));
}
//清除readBuf集合
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
.....
} finally {
.....
}
}
}
- 把accept到的新連接配接channel添加到集合
- 調用pipline的的通知事件pipeline.fireChannelRead(readBuf.get(i));
- 通知channel read完畢pipeline.fireChannelReadComplete();
接着我們看doReadMessages方法
- 服務端io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages處理的是accept到的channel,并且進行後續的注冊到worker group中的NioEventLoop中去處理
-
用戶端的NioSocketChannel#doReadMessages處理的是worker group selector 輪詢到的read事件
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//拿到新連接配接的channel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//添加到集合中去
buf.add(new NioSocketChannel(this, ch));
return ;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return ;
}