天天看點

Netty源碼解析(四) —— NioEventLoop處理io

NioEventLoop處理io的一些操作方法

io.netty.channel.nio.NioEventLoop#run

/**
     * 啟動服務  輪詢selector
     */
    @Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        // 重置 wakenUp 标記為 false
                        //wakenUp辨別目前selector是否是喚醒狀态
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }

                cancelledKeys = ;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == ) {
                    try {
                        //處理輪詢到的key
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        //啟動task
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * ( - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                ...
        }
    }
           

這個run方法是在NioEventLoop線程啟動的時候調用的,這裡面主要:

  1. 輪詢準備好的key
  2. 處理輪訓出來的key
  3. 運作task任務

io.netty.channel.nio.NioEventLoop#select

/**
     * 輪詢selector方法
     * @param oldWakenUp
     * @throws IOException
     */
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = ;
            long currentTimeNanos = System.nanoTime();
            //delayNanos(currentTimeNanos)計算目前定時任務隊列第一個任務的延遲時間
            //select的時間不能超過selectDeadLineNanos這個時間
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + L) / L;
                if (timeoutMillis <= ) {
                    //如果逾時  并且第一次輪詢
                    //那麼就再進行一次非阻塞的select 然後break結束輪詢
                    if (selectCnt == ) {
                        selector.selectNow();
                        selectCnt = ;
                    }
                    break;
                }

                //nioEventLoop有任務在進行,那麼就進行一次非阻塞的select 然後break結束輪詢
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = ;
                    break;
                }

                //定時任務為空  截止時間沒到,進行阻塞select
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys !=  || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = ;
                    break;
                }

                long time = System.nanoTime();
                /**
                 * time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
                 * 屬于正常的  證明我進行了一次阻塞select
                 * else :屬于空輪訓  就是進行了阻塞select但是立即傳回了,并沒有阻塞
                 */
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = ;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD >  &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    //當輪詢次數超過一個門檻值  512  要解決jdk的空輪訓bug了
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    //重新建立一個selector  把oldSelector上的key指派給他
                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = ;
                    break;
                }

                currentTimeNanos = time;
            }

           ...
    }
           

這個方法又一個jdk空輪訓的方法,這個是nio原生就存在的bug,就是進行select操作立即傳回,并沒有阻塞式的select,是以for循環方法會一直走下去,這樣就會導緻cpu占用率很高,當輪詢次數超過一個門檻值 512 要解決jdk的空輪訓bug了,下面我們來分析解決空輪訓怎麼實作

/**
     * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
     * around the infamous epoll 100% CPU bug.
     * 解決jdk空輪訓導緻cpu占用100%的bug
     * 方案:
     *      建立新的selector 把oldSelector上面的key都遷移到新的上面
     */
    public void rebuildSelector() {
        if (!inEventLoop()) {
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector0();
                }
            });
            return;
        }
        rebuildSelector0();
    }
           

這裡會判斷目前執行的線程是否在NioEventLoop的線程中,如果不在NioEventLoop線程,則添加到task隊列等待執行

@Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        //判斷正在執行任務的線程是否是eventloop已經儲存的線程
        boolean inEventLoop = inEventLoop();
        addTask(task);
        //如果不是在eventloop線程證明還沒啟動輪詢和任務排程run方法
        //則啟動線程
        if (!inEventLoop) {
            //啟動線程
            startThread();
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
           

這裡判斷不是在NioEventLoop線程的話就去啟動線程,那麼會每個不是在NioEventLoop線程執行的任務都去啟動線程麼?答案是否定的,且看startThread方法

private void startThread() {
        //狀态值是未啟動過
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    //真正啟動的邏輯
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }
           

下面我們來看真正啟動線程的邏輯

private void doStartThread() {
        assert thread == null;
        //交給線程執行器去執行代碼 ThreadPerTaskExecutor
        executor.execute(new Runnable() {
            @Override
            public void run() {
                //儲存目前線程異步任務的線程
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                updateLastExecutionTime();
                try {
                    //啟動NioEventLoop的run()方法  進行selector輪詢
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {

                .....
    }
           

這裡會把目前啟動的線程儲存到成員變量thread,後續判斷是否是NioEventLoop線程也是比較線程是否是一個,然後調用NioEventLoop的run方法,是以run方法執行的代碼都是在NioEventLoop線程裡面運作的

回頭再看io.netty.channel.nio.NioEventLoop#rebuildSelector0

/**
     * 重新建構selector
     */
    private void rebuildSelector0() {
        //儲存舊的selector
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;

        if (oldSelector == null) {
            return;
        }

        try {
            //建立新的selector  建立過程也會走優化的流程
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        // 把所有的 channels 注冊到新的 Selector.
        int nChannels = ;
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();
            try {
                //key失效 跳過
                if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                    continue;
                }

                //注冊的關心事件
                int interestOps = key.interestOps();
                key.cancel();
                //取出來key裡面的channel  注冊到新的selector裡面  傳回新的selectionKey
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey  如果attachment是AbstractNioChannel
                    //那麼将AbstractNioChannel上的selectionKey也更新一下
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                //channel的數量增加
                nChannels ++;
            } catch (Exception e) {
               ....
        }

        //把新的selector引用傳遞給成員變量
        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;

        try {
            // time to close the old selector as everything else is registered to the new one
            //關閉舊的selector
            oldSelector.close();
        } catch (Throwable t) {
            ....
        }
    }
           

這裡主要就是為了解決空輪訓的方法,建立一個新的selector,把oldSelector上面的key都注冊到新的上面,這樣就解決了

承接上文,解決完空輪訓,也輪詢除了key,那麼接着就是處理key了

io.netty.channel.nio.NioEventLoop#processSelectedKeys

/**
     * selectedKeys不為null證明我們用的是優化後的selector
     * 那麼就按照優化後的邏輯去處理
     */
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
           

這裡主要是對優化和未優化的selector分别做的處理,大緻邏輯都一樣,隻不過因為底層selectKeys的資料結構不一樣,周遊方法不一樣,我們隻分析processSelectedKeysOptimized

/**
     * 處理優化後的selector,selectedKeys是一個數組
     */
    private void processSelectedKeysOptimized() {
        for (int i = ; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            //拿到selectedKey,把數組的引用置為null
            selectedKeys.keys[i] = null;

            //拿到key的attachment  也就是AbstractNioChannel
            final Object a = k.attachment();

            //如果是AbstractNioChannel
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + );

                selectAgain();
                i = -;
            }
        }
    }
           

NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey,io.netty.channel.nio.AbstractNioChannel)

/**
     * 處理select出來的key
     * @param k SelectionKey
     * @param ch AbstractNioChannel注冊進去attachment
     */
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //拿到Unsafe對象 NioMessageUnsafe是服務端的channel操作對象
        //NioByteUnsafe 用戶端的channel操作對象
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        //key失效
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            //拿到key的事件辨別
            int readyOps = k.readyOps();
            //連接配接事件
            if ((readyOps & SelectionKey.OP_CONNECT) != ) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
            //寫事件
            if ((readyOps & SelectionKey.OP_WRITE) != ) {
                ch.unsafe().forceFlush();
            }

            /**
             * 讀事件和 accept事件都會經過這裡,但是拿到的unsafe對象不同  是以後續執行的read操作也不一樣
             * NioServerChannel進行accept操作
             * NioChannel進行read操作
             */
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) !=  || readyOps == ) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
           

我們分析當服務端channel accept到新的連接配接時候unsafe.read();走的是NioMessageUnsafe,當用戶端連接配接read事件到來時候,走的是NioByteUnsafe.read()方法

private final class NioMessageUnsafe extends AbstractNioUnsafe {

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            //拿到channel處理的pipline
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        //readBuf是一個集合  來儲存accept出的channel
                        //具體read操作
                        int localRead = doReadMessages(readBuf);
                        if (localRead == ) {
                            break;
                        }
                        if (localRead < ) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = ; i < size; i ++) {
                    readPending = false;
                    //調用pipline的read方法通知事件
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //清除readBuf集合
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

               .....
            } finally {
              .....
            }
        }
    }
           
  1. 把accept到的新連接配接channel添加到集合
  2. 調用pipline的的通知事件pipeline.fireChannelRead(readBuf.get(i));
  3. 通知channel read完畢pipeline.fireChannelReadComplete();

接着我們看doReadMessages方法

  1. 服務端io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages處理的是accept到的channel,并且進行後續的注冊到worker group中的NioEventLoop中去處理
  2. 用戶端的NioSocketChannel#doReadMessages處理的是worker group selector 輪詢到的read事件

    io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages

@Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        //拿到新連接配接的channel
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                //添加到集合中去
                buf.add(new NioSocketChannel(this, ch));
                return ;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return ;
    }