天天看點

Netty的EventLoop

Netty在啟動的時候需要配置相應的NioEventLoopGroup,才能保證當channel進行注冊的時候能夠注冊相應的eventloop,并且保證當channel接收到請求的時候有相應的eventloop交給相應的channelPipeline進行處理。

在NioEventLoopGroup的繼承鍊中,NioEventLoopGroup的構造方法實際在其超類的超類MultithreadEventExecutorGroup中進行。

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (threadFactory == null) {
        threadFactory = newDefaultThreadFactory();
    }

    children = new SingleThreadEventExecutor[nThreads];
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(threadFactory, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
           

在其構造方法中,首先保證傳入的nThread參數大于0,而該參數就是該eventLoopGroup中線程的數量。在構造方法的一開始,就會建立一個SingleThreadEventExecutor數組,顧名思義,這個數組就是存放單個線程的容器,而這個數組的大小也恰恰就是傳入的nThread的值。

之後便會在這個數組中一個個通過newChild()方法獲得新的NioEventLoop,而NioEventLoop恰恰繼承自SingleThreadEventExecutor。

newChild()方法在NioEventLoopGroup中被實作。

protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
           

在這裡,會生成新的NioEventLoop,也就是需要和channel綁定的eventLoop,但在這裡隻是單純的建立,所需要的SelectorProvider參數,在一開始的NioEventLoopGroup提供。

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    provider = selectorProvider;
    selector = openSelector();
}
           

NioEventLoop的構造方法首先是執行父類的構造方法,其次就是打開selector以便接下來的channel的注冊的時候和channel綁定。在其繼承鍊上SingleThreadEventExector給出了更詳細的構造方法。

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }

    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }
                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error(
                            "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                            "before run() implementation terminates.");
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                    "non-empty task queue (" + taskQueue.size() + ')');
                        }

                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });

    taskQueue = newTaskQueue();
}
           

這裡的重點就是在這裡生成了一個線程賦給了thread成員,也就是說每一個eventloop都與一個線程綁定,生命周期同步,由此可見,每一個channel對應的處理的線程,恰恰就是這裡的thread成員,線上程中,直接調用了該eventLoop的run()方法,這裡的run()方法實作在了NioEventLoop當中。

在完成了線程的建立之後,則生成一個新的阻塞連結清單隊列作為正在排隊等待完成task隊列。

是以,在這裡生成的線程的run()方法真正的實作在NioEventLoop當中。

在channel向eventLoopGroup注冊的時候,就會打開這裡的線程。可以看到注冊的時候的代碼。

在當channel的注冊走到unsafe的時候的register()方法的時候。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
           

首先,在這裡通過給channel的eventLoop的指派,完成了channel與一個eventLoop的綁定,但此時,eventLoop中的selector還未礽與channel綁定,需要在register0()繼續這個操作,但是,這裡有一個inEventLoop()方法的判斷,這個方法很簡單,隻是判斷目前線程是不是就是eventLoop在構造方法中建立的時候的那個線程,顯然,這裡的線程應該仍舊是在netty啟動中的主線程,顯然不是eventLoop所綁定的線程,那麼将會調用eventLoop的execute()方法,顯然這裡的execute()方法與線程池的方法不一樣,實作在了SingleThreadEventExecutor裡面。

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
           

在剛剛的注冊channel場景下,這裡傳入的是是注冊task實作的是register0()方法。這裡仍舊會通過inEventLoop()方法去判斷,但顯然結果與剛才一樣。那麼将會執行startThread()方法,在startThread()中,在構造方法實作的線程終于被開啟,而剛剛作為參數傳入的注冊task也會在開啟線程之後交給阻塞隊列完成。

那麼,就可以把目光放到NioEventLoop對于線程的run()方法的實作,也就是重點。

protected void run() {
    for (;;) {
        boolean oldWakenUp = wakenUp.getAndSet(false);
        try {
            if (hasTasks()) {
                selectNow();
            } else {
                select(oldWakenUp); 
                if (wakenUp.get()) {
                    selector.wakeup();
                }
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                processSelectedKeys();
                runAllTasks();
            } else {
                final long ioStartTime = System.nanoTime();

                processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }

            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    break;
                }
            }
        } catch (Throwable t) {
            logger.warn("Unexpected exception in the selector loop.", t);

            // Prevent possible consecutive immediate failures that lead to
            // excessive CPU consumption.
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Ignore.
            }
        }
    }
}
           

在run()方法中是一個死循環,在循環得到一開始,首先會通過hasTasks()判斷這時在阻塞隊列中是否還有未完成的任務。如果隊列中仍舊存在,則會直接調用selectNow,否則會通過select()方法去取得io資訊。selectNow()與select()的差別在于selectNow()沒有tiemout,及時channel沒有已經就緒的資訊也會立即傳回,這也符合隊列中仍舊還有未完成的task任務的場景。而如果阻塞隊列已空,則會直接調用nioEventLoop的select()方法。

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                        selectCnt);

                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
        }
        // Harmless exception - log anyway
    }
}
           

在這裡,将會有時間timeout的嘗試去取io資訊,在不斷嘗試取得io資訊的過程中,一旦取到或者阻塞隊列 中又有新的任務或者有了新的定時任務需要執行都會導緻select過程的中斷。

在完成select()方法之後,回到NioEventLoop的run()方法。

可以看到一個ioRatio參數,表示了執行io資訊與執行隊列中的task配置的時間百分比。如果配置了100,那麼執行隊列中的任務會直到處理完資訊之後開始,并直到處理完隊列中的task之後才會繼續嘗試去取得select key,如果不是100,那麼将會給執行隊伍中task任務的時間設為執行io資料時間的(100- ioRatio)/ioRatio百分比的timeout。

首先看到處理io資料的processSelectedKeys()方法,在processSelectedKeys()方法中,如果在剛剛select()方法中取得到了select key,那麼将會直接進入processSelectedKeyOptimized()方法處理剛剛取得到的select key。

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            for (;;) {
                if (selectedKeys[i] == null) {
                    break;
                }
                selectedKeys[i] = null;
                i++;
            }

            selectAgain();
            // Need to flip the optimized selectedKeys to get the right reference to the array
            // and reset the index to -1 which will then set to 0 on the for loop
            // to start over again.
            //
            // See https://github.com/netty/netty/issues/1523
            selectedKeys = this.selectedKeys.flip();
            i = -1;
        }
    }
}
           

這裡會不斷循環全部處理接收到的selectKey。并且 通過取得到selectkey得到相應的channel去繼續在processSelectedKey()方法中進行處理。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
                return;
            }
        }
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}
           

這裡将是處理取得到的io資料的重點,通過select key取得他的ops來判斷這次io請求的目的。如果ops為read或者accept,那麼将會直接進入unsafe的read()方法,開始讀取去接收到的byte資料。

其他的write或者connect都與read和accept類似,都是直接通過unsafe開始業務邏輯的操作,并通過pipeline開始自己編寫的業務邏輯對上述情況的操作。

以上就是eventloop對于io資料的操作。

而在完成io操作之後,将會通過runAllTasks()方法開始處理阻塞隊列中的任務。

protected boolean runAllTasks() {
    fetchFromDelayedQueue();
    Runnable task = pollTask();
    if (task == null) {
        return false;
    }

    for (;;) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception.", t);
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            return true;
        }
    }
}
           

首先會通過fetchFromDelayedQueue()方法中嘗試将延時隊列中已經超過deadline時間的定時任務從延遲隊列中取出,保證定時任務的開啟,放入阻塞隊列中準備開始執行。

private void fetchFromDelayedQueue() {
    long nanoTime = 0L;
    for (;;) {
        ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
        if (delayedTask == null) {
            break;
        }

        if (nanoTime == 0L) {
            nanoTime = ScheduledFutureTask.nanoTime();
        }

        if (delayedTask.deadlineNanos() <= nanoTime) {
            delayedTaskQueue.remove();
            taskQueue.add(delayedTask);
        } else {
            break;
        }
    }
}
           

然後在沒有timeout情況下,将不斷從阻塞隊列中擷取任務,直到将隊列中的任務全部處理完成。

以上就是eventloop的功能。