天天看点

Netty源码解析(四)接收客户端

上一篇我们学完了NioServerSocketChannel创建,初始化,注册到selector,添加感兴趣事件,相当于完成了Nio的如下几步

//创建一个ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //创建一个Selector
        Selector selector = Selector.open();

        //绑定6666端口
        serverSocketChannel.bind(new InetSocketAddress(6666));

        // 设置连接非阻塞
        serverSocketChannel.configureBlocking(false);

        //注册到selector
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
           

接下来我们学习轮询接收客户端请求事件,即NioEventLoop的run方法

@Override
    protected void run() {
        for (;;) {
            try {
                try {
                    //hasTasks()  若taskQueue or  tailTasks任务队列中有任务  返回false  没有则返回true
//                  //有任务返回selectnow的返回值   没任务返回-1
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        //首先轮询注册到reactor线程对应的selector上的所有的channel的IO事件
                        //wakenUp 表示是否应该唤醒正在阻塞的select操作,netty在每次进行新的loop之前,都会将wakeUp 被设置成false,标志新的一轮loop的开始
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                //处理jdk的空轮训bug
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        //2.处理产生网络IO事件的channel
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //3.处理任务队列
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    //关闭所有通道
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

           

当执行完任务队列的任务后,selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())返回-1,进入SELECT,

/*
    这里主要是处理阻塞情况。netty在阻塞中会额外处理一些事件,不会让reactor线程一直等待,这样也提高了效率
     */
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            //当scheduledTaskQueue为空时 selectDeadLineNanos=当前时间加一秒
            //这里就是定时任务开始执行的时间
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                //1.定时任务截止事时间快到了,中断本次轮询,也就是判断如果定时任务队列有任务开始<=0.5ms,则终止本次轮询
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                //当前的定时任务队列中有任务的截止事件快到了(<=0.5ms),就跳出循环。
                if (timeoutMillis <= 0) {
                    //如果到目前还没有进行过select操作  调用selectNow()
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.

                // 2.轮询过程中发现有任务加入,中断本次轮询 netty为了保证任务队列能够及时执行,在进行阻塞select操作之前会判断任务队列是否为空,如果不为空,就执行一次非阻塞select操作,跳出循环
                //hasTasks() && wakenUp.compareAndSet(false, true)  如果队列中有任务  则设置wakenUp为true  并返回true
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    //进行一次不阻塞轮询
                    selector.selectNow();
                    //设置次数为1
                    selectCnt = 1;
                    break;
                }

                //阻塞式select操作
                //执行到这一步,说明netty任务队列里面队列为空,并且所有定时任务延迟时间还未到(大于0.5ms),
                //于是,在这里进行一次阻塞select操作,截止到第一个定时任务的截止时间
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
                //轮询到IO事件进入
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - 轮询到io事件
                    // - oldWakenUp 参数为true
                    // - 用户主动唤醒
                    // - 任务队列里面有任务
                    // - 第一个定时任务即将要被执行
                    break;
                }
                if (Thread.interrupted()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                //处理空轮训的bug
                //现在的时间-select阻塞的时间=>运行之前的时间
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis在没有选择任何内容的情况下运行。
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    //如果selectCnt>=512就重新创建新的selector并替换
                    //创建新的selector
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }
                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }
           

当有客户端接入时,selectedKeys !=0,会跳出循环,从这里就显示出了netty对于效率的追求,不会一直阻塞线程,而是在阻塞的空闲时间完成taskQueue的任务

客户端接入后,我们进入处理IO事件的processSelectedKeys方法

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        // k.isValid()告知此键是否有效。
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            //获取此键的 ready 操作集合。
            int readyOps = k.readyOps();
            //处理不同事件,连接事件,读写事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            //第一次肯定是接收事件,进入此方法
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
           

当有客户端接入时,一定会走到unsafe.read();方法,进入方法,这里的unsafe是服务端的unsafe,所以走到AbstaractNioMessageChannel

@Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            //pipeline
            final ChannelPipeline pipeline = pipeline();
            //分配一块内存
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        //NioServerSocketChannel接收客户端连接的方法
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                //处理多个客户端
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //在serverSocketChanne的pipeline中将得到的socketChannel向下传播
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //清空列表
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
           

我们重点关注两个方法,doReadMessages是NioServerSocketChannel接收客户端连接的方法,

@Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        //javaChannel()  jdk的ServerSocketChannel   接收到的ch为Nio的SocketChannel
        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {
            if (ch != null) {
                //对SocketChannel进行封装,将接收到的客户端封装成NioSocketChannel添加到列表中
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
        return 0;
    }
           

SocketUtils.accept(javaChannel())方法

public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    //接收客户端连接,NIO的accept方法
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }
           

new NioSocketChannel方法,和new NioServerSocketChannel方法类似,只是其父类是AbstractNioByteChannel,并且传入的感兴趣事件是READ事件

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }
           

会将接收到的SocketChannel封装成NioSocketChannel,加入到缓存中,然后再关注pipeline.fireChannelRead(readBuf.get(i));方法循环处理接收到的SocketChannel,我们之前说到过目前NioServerSocketChannel的pipeline如下结构

Netty源码解析(四)接收客户端

我们关注ServerBootstartAcceptor的channelRead()方法,我们要重点分析ServerBootstrapAcceptor类

其构造方法如下

ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            //workerGroup  childGroup
            this.childGroup = childGroup;
            //我们自定义的ChannelInitializer
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;
            enableAutoReadTask = new Runnable() {
                @Override
                public void run() {
                    channel.config().setAutoRead(true);
                }
            };
        }
           

里面保存着我们的配置参数,现在NioSocketChannel中的pipeline如下:

Netty源码解析(四)接收客户端

再来看ServerBootstrapAcceptor的channelRead()方法

/*
        处理socketChannel的方法
         */
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            //msg  NioSocketChannel
            final Channel child = (Channel) msg;
            //将channelInitializer添加到pipeline中
            child.pipeline().addLast(childHandler);
            //设置参数
            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                //将NioSocketChannel注册到Selector中
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
           

其中register方法和server端一样,这里不再详细说明,注册成功后,也会执行handlerAdded,channelRegister,ChannelRegister,channelActive方法,此时的pipeline如下

Netty源码解析(四)接收客户端

然后会执行pipeline.fireChannelReadComplete();这个方法也和NioServerSocketChannel添加感兴趣事件方法相同,会将我们在生成NioSocketChannel时添加的READ的事件注册到selector,到这里,整体流程就完成了。

下一篇将会讲对交互消息的处理

继续阅读