上一篇我们学完了NioServerSocketChannel创建,初始化,注册到selector,添加感兴趣事件,相当于完成了Nio的如下几步
//创建一个ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//创建一个Selector
Selector selector = Selector.open();
//绑定6666端口
serverSocketChannel.bind(new InetSocketAddress(6666));
// 设置连接非阻塞
serverSocketChannel.configureBlocking(false);
//注册到selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
接下来我们学习轮询接收客户端请求事件,即NioEventLoop的run方法
@Override
protected void run() {
for (;;) {
try {
try {
//hasTasks() 若taskQueue or tailTasks任务队列中有任务 返回false 没有则返回true
// //有任务返回selectnow的返回值 没任务返回-1
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
//首先轮询注册到reactor线程对应的selector上的所有的channel的IO事件
//wakenUp 表示是否应该唤醒正在阻塞的select操作,netty在每次进行新的loop之前,都会将wakeUp 被设置成false,标志新的一轮loop的开始
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
//处理jdk的空轮训bug
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
//2.处理产生网络IO事件的channel
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
//3.处理任务队列
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
//关闭所有通道
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
当执行完任务队列的任务后,selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())返回-1,进入SELECT,
/*
这里主要是处理阻塞情况。netty在阻塞中会额外处理一些事件,不会让reactor线程一直等待,这样也提高了效率
*/
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//当scheduledTaskQueue为空时 selectDeadLineNanos=当前时间加一秒
//这里就是定时任务开始执行的时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
//1.定时任务截止事时间快到了,中断本次轮询,也就是判断如果定时任务队列有任务开始<=0.5ms,则终止本次轮询
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
//当前的定时任务队列中有任务的截止事件快到了(<=0.5ms),就跳出循环。
if (timeoutMillis <= 0) {
//如果到目前还没有进行过select操作 调用selectNow()
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// 2.轮询过程中发现有任务加入,中断本次轮询 netty为了保证任务队列能够及时执行,在进行阻塞select操作之前会判断任务队列是否为空,如果不为空,就执行一次非阻塞select操作,跳出循环
//hasTasks() && wakenUp.compareAndSet(false, true) 如果队列中有任务 则设置wakenUp为true 并返回true
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
//进行一次不阻塞轮询
selector.selectNow();
//设置次数为1
selectCnt = 1;
break;
}
//阻塞式select操作
//执行到这一步,说明netty任务队列里面队列为空,并且所有定时任务延迟时间还未到(大于0.5ms),
//于是,在这里进行一次阻塞select操作,截止到第一个定时任务的截止时间
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
//轮询到IO事件进入
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - 轮询到io事件
// - oldWakenUp 参数为true
// - 用户主动唤醒
// - 任务队列里面有任务
// - 第一个定时任务即将要被执行
break;
}
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
//处理空轮训的bug
//现在的时间-select阻塞的时间=>运行之前的时间
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis在没有选择任何内容的情况下运行。
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//如果selectCnt>=512就重新创建新的selector并替换
//创建新的selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
当有客户端接入时,selectedKeys !=0,会跳出循环,从这里就显示出了netty对于效率的追求,不会一直阻塞线程,而是在阻塞的空闲时间完成taskQueue的任务
客户端接入后,我们进入处理IO事件的processSelectedKeys方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// k.isValid()告知此键是否有效。
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
//获取此键的 ready 操作集合。
int readyOps = k.readyOps();
//处理不同事件,连接事件,读写事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//第一次肯定是接收事件,进入此方法
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
当有客户端接入时,一定会走到unsafe.read();方法,进入方法,这里的unsafe是服务端的unsafe,所以走到AbstaractNioMessageChannel
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
//pipeline
final ChannelPipeline pipeline = pipeline();
//分配一块内存
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//NioServerSocketChannel接收客户端连接的方法
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
//处理多个客户端
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//在serverSocketChanne的pipeline中将得到的socketChannel向下传播
pipeline.fireChannelRead(readBuf.get(i));
}
//清空列表
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
我们重点关注两个方法,doReadMessages是NioServerSocketChannel接收客户端连接的方法,
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//javaChannel() jdk的ServerSocketChannel 接收到的ch为Nio的SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//对SocketChannel进行封装,将接收到的客户端封装成NioSocketChannel添加到列表中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
SocketUtils.accept(javaChannel())方法
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
//接收客户端连接,NIO的accept方法
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
new NioSocketChannel方法,和new NioServerSocketChannel方法类似,只是其父类是AbstractNioByteChannel,并且传入的感兴趣事件是READ事件
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
会将接收到的SocketChannel封装成NioSocketChannel,加入到缓存中,然后再关注pipeline.fireChannelRead(readBuf.get(i));方法循环处理接收到的SocketChannel,我们之前说到过目前NioServerSocketChannel的pipeline如下结构
我们关注ServerBootstartAcceptor的channelRead()方法,我们要重点分析ServerBootstrapAcceptor类
其构造方法如下
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
//workerGroup childGroup
this.childGroup = childGroup;
//我们自定义的ChannelInitializer
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
里面保存着我们的配置参数,现在NioSocketChannel中的pipeline如下:
再来看ServerBootstrapAcceptor的channelRead()方法
/*
处理socketChannel的方法
*/
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//msg NioSocketChannel
final Channel child = (Channel) msg;
//将channelInitializer添加到pipeline中
child.pipeline().addLast(childHandler);
//设置参数
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//将NioSocketChannel注册到Selector中
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
其中register方法和server端一样,这里不再详细说明,注册成功后,也会执行handlerAdded,channelRegister,ChannelRegister,channelActive方法,此时的pipeline如下
然后会执行pipeline.fireChannelReadComplete();这个方法也和NioServerSocketChannel添加感兴趣事件方法相同,会将我们在生成NioSocketChannel时添加的READ的事件注册到selector,到这里,整体流程就完成了。
下一篇将会讲对交互消息的处理