天天看點

Netty系列9-服務端源源碼分析

我們以一個入門級别的服務端代碼進行分析。netty其實就是對原生的java nio程式設計進行了封裝,是以分析中會看到最後調用的都是java原生的API。因為整個過程代碼比較多,這裡會把一些不是核心流程的代碼省略。

先整體看下服務端的代碼。

/*線程組*/
        EventLoopGroup group = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();//2
            b.group(group,work)//3
                    /*指明使用NIO進行網絡通訊*/
            .channel(NioServerSocketChannel.class)//4
                    //.channel(EpollServerSocketChannel.class)
                    /*指明伺服器監聽端口*/
                    .localAddress(new InetSocketAddress(port))
                    /*接收到連接配接請求,新啟一個socket通信,也就是channel,每個channel
                    * 有自己的事件的handler*/
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                            //ch.pipeline().addLast(new EchoServerHandler2());
                        }
                    });
            ChannelFuture f = b.bind().sync();//5

           /*阻塞,直到channel關閉*/
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
           

1 建立EventLoopGroup

EventLoopGroup group = new NioEventLoopGroup();
 EventLoopGroup work = new NioEventLoopGroup();
           

這裡的建立了兩個NioEventLoopGroup,隻建立一個也可以,不明白的可以看下前面文章講的Reactor模式。NioEventLoopGroup可以認為是個線程池,它會給每個連接配接的channel配置設定EventLoop,EventLoop就是個線程。

這裡建立的是NioEventLoopGroup,分析下建立它的過程做了什麼。

1.1 NioEventLoopGroup

先看下構造函數

public NioEventLoopGroup() {
        this(0);
    }
           

整個過程中構造函數調用的比較多,我們隻看幾個關鍵點。

public NioEventLoopGroup(int nThreads, Executor executor) {
//這裡nThreads就是上面的0,executor為null,SelectorProvider.provider()這個java原生api,得到一個SelectorProvider
        this(nThreads, executor, SelectorProvider.provider());
    }
           

接着調用了父類MultithreadEventLoopGroup的構造方法。

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  //這裡判定如果沒有指定線程數,設定預設線程數為系統處理器數量*2
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

           

接着再次調用另外一個父類MultithreadEventExecutorGroup的構造方法

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        if (executor == null) {
        //建立執行器
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());//步驟1
        }
        //建立EventLoop數組
        children = new EventExecutor[nThreads];//步驟2
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
            //初始化數組的對象
                children[i] = newChild(executor, args);//步驟3
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                   ...//省略代碼
                }
            }
        }
        //建立選擇器,選擇器會根據算法每次從上述數組選擇一個EventLoop傳回
        chooser = chooserFactory.newChooser(children);//步驟4
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);//步驟5
        }
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);//步驟6
    }
           

(1)步驟1

前面我們沒有指定執行器,是以步驟1建立了預設執行器,看下這個執行器是什麼。

public final class ThreadPerTaskExecutor implements Executor {
//這裡ThreadFactory 就是用來建立線程的
    private final ThreadFactory threadFactory;
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
    //建立線程,執行任務
        threadFactory.newThread(command).start();
    }
}

           

執行器構造函數參數是newDefaultThreadFactory方法傳回的。

protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass());
    }

           

DefaultThreadFactory繼承了JDK的ThreadFactory,就是用來建立線程的。

(2)步驟2

步驟2就是建立了一個EventExecutor數組,數組的大小就是我們之前指定的線程大小。

(3)步驟3

newChild建立資料對象,這裡newChild調用的是NioEventLoopGroup方法,建立NioEventLoop對象。

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
           

看下建立NioEventLoop的過程。

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
          ...//省略代碼
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        //JDK的選擇器
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
           

構造器裡面建立了我們熟悉的JDK的選擇器selector。

這裡也調用了父類的構造方法,這裡的父類是SingleThreadEventLoop,看名字就知道這是個單線程的。

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        //建立了線程等待隊列的大小,這裡是2147483647
        tailTasks = newTaskQueue(maxPendingTasks);
    }
           

(4)步驟4

建立選擇器,這個選擇器決定以什麼算法從EventLoop數組中選擇一個EventLoop執行任務。

public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
           

這裡使用的是PowerOfTwoEventExecutorChooser。

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }
           

(5)步驟5和步驟6

給每一個EventLoop添加了一個線程終止時的監聽器。将上述EventLoop數組添加到了一個隻讀集合。

2 建立ServerBootstrap

ServerBootstrap b = new ServerBootstrap();
           

ServerBootstrap構造函數什麼都沒做。

3 設定ServerBootstrap參數

b.group(group,work)//3
                    /*指明使用NIO進行網絡通訊*/
            .channel(NioServerSocketChannel.class)//4
                    //.channel(EpollServerSocketChannel.class)
                    /*指明伺服器監聽端口*/
                    .localAddress(new InetSocketAddress(port))
                    /*接收到連接配接請求,新啟一個socket通信,也就是channel,每個channel
                    * 有自己的事件的handler*/
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                            //ch.pipeline().addLast(new EchoServerHandler2());
                        }
                    });
           

3.1 group方法

首先調用了ServerBootstrap的group方法。

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
               ...//省略代碼
        this.childGroup = childGroup;
        return this;
    }
           

這裡将parentGroup指派給了父類AbstractBootstrap的group對象,childGroup指派給了ServerBootstrap的childGroup對象。

這裡如果group對象隻傳入了一個EventLoopGroup,上述兩個對象是同一個。

3.2 channel方法

public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
           

這裡建立了一個channel工廠,内部持有了一個反射工廠。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
		//構造器反射建立Channel
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

}
           

我們傳入的是NioServerSocketChannel的class對象。

1.5 childOption和childHandler

這兩個方法就是給ServerBootstrap對象指派。

4. bind

ChannelFuture f = b.bind().sync();
           

看下ServerBootstrap對象的bind方法。

public ChannelFuture bind() {
	 //校驗參數
        validate();
        //端口參數
        SocketAddress localAddress = this.localAddress;
        if (localAddress == null) {
            throw new IllegalStateException("localAddress not set");
        }
        return doBind(localAddress);
    }
           
private ChannelFuture doBind(final SocketAddress localAddress) {
		//初始化channel并注冊到selector選擇器
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            //綁定端口
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
          ...//省略代碼
        }
    }
           

4.1 initAndRegister

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
		//用前面将的channel工廠,利用反射建立channel對象
		//這裡建立的是NioServerSocketChannel
            channel = channelFactory.newChannel();
            //初始化
            init(channel);
        } catch (Throwable t) {
         ...//省略代碼
        }
		//注冊
        ChannelFuture regFuture = config().group().register(channel);
       ...//省略代碼
        return regFuture;
    }
           

(1)NioServerSocketChannel

這裡先建立了NioServerSocketChannel對象,看下它的構造函數。

public NioServerSocketChannel() {
  這裡的DEFAULT_SELECTOR_PROVIDER是JDK的SelectorProvider對象。
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
           

newSocket建立的是ServerSocketChannelImpl對象。

private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            ...//省略代碼
        }
    }
           

this調用了本類的構造方法。

public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
           

繼續調用了父類的構造方法。

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
		//這裡是SelectionKey.OP_ACCEPT,SelectionKey.OP_ACCEPT對應的整數是16,
        this.readInterestOp = readInterestOp;
        try {
		//設定Channel非阻塞
            ch.configureBlocking(false);
        } catch (IOException e) {
            ...//省略代碼
        }
    }
           

再看下super調用的父類構造方法。

protected AbstractChannel(Channel parent) {
        this.parent = parent;
		//建立id
        id = newId();
        unsafe = newUnsafe();
		建立Pipeline對象
        pipeline = newChannelPipeline();
    }

           
protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
           

這裡建立DefaultChannelPipeline對象。

(2)DefaultChannelPipeline

看下DefaultChannelPipeline的構造方法。

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
           

這裡建立了兩個Handler對象,一個是HeadContext,HeadContext實作了ChannelOutboundHandler和ChannelInboundHandler接口,可以處理入站和出站事件。另外一個是TailContext,TailContext實作了ChannelInboundHandler,可以處理入站事件。

4.1.1 init方法

建立Channel對象後,調用init方法初始化該對象。

void init(Channel channel) throws Exception {
		//将上述options指派給該channel
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
		//将上述attrs指派給該channel
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
		//Channel的Pipeline對象
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
		//給Pipeline對象增加一個Handler
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
           

(1)addLast方法

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
			//建立DefaultChannelHandlerContext對象
            newCtx = newContext(group, filterName(name, handler), handler);
			//将這個Handler加入到TailContext前一個
            addLast0(newCtx);
            if (!registered) {
            //channel還未注冊到selector上時,走如下邏輯
                newCtx.setAddPending();
                //這裡建立了一個PendingHandlerAddedTask,作用是調用ChannelInitializer的initChannel方法,并将自己從pipeline移除。
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
           

addLast0方法:

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
           

callHandlerCallbackLater方法

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
		//這裡的added為true
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
		//将PendingHandlerAddedTask對象指派給pendingHandlerCallbackHead對象,作用在後面會講到
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
           

4.1.2 register方法

這裡将調用MultithreadEventLoopGroup的register方法。

public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
           
public EventExecutor next() {
   //選擇器從EventLoopGroup中選取一個EventLoop
        return chooser.next();
    }
           

這裡next方法就是用上面講的選擇器從EventLoopGroup中選取一個EventLoop,調用它的register方法。

這裡調用的是SingleThreadEventLoop的register方法。

public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
           
public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //這裡調用的是AbstractChannel内部類AbstractUnsafe的register方法
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
           
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
          ...//省略代碼
            AbstractChannel.this.eventLoop = eventLoop;
			//目前線程是否是eventLoop的線程
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                //如果不是放入eventLoop的等待隊列,等待執行
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                       ...//省略代碼
                }
            }
        }
           

這裡就是netty中的多線程安全解決方案,在前邊netty介紹我們講過所有channel的IO事件都會和eventLoop綁定,一個eventLoop會和一個線程綁定。這裡eventLoop.inEventLoop()會判斷目前線程是不是EventLoop指定的線程,

如果不是加入eventLoop的等待隊列。這樣channel的IO事件都會在一個線程執行。

(1)inEventLoop方法

public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
           

就是将目前線程和eventLoop線程進行對比,這裡由于EventLoop還未指定線程,目前為null,目前線程為主線程,是以執行else代碼塊。

(2)execute

public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
		//加入線程等待隊列
        addTask(task);
        if (!inEventLoop) {
		//啟動線程
            startThread();
            if (isShutdown() && removeTask(task)) {
			//如果線程池已經關閉,移除任務
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
           
private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }
           

(3)doStartThread

private void doStartThread() {
        assert thread == null;
		//這裡調用的就是前面将的用線程工廠建立一個線程,然後執行下面的run方法。
        executor.execute(new Runnable() {
            @Override
            public void run() {
			//将目前線程指派給NioEventLoop的thread
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
				//這裡執行的是NioEventLoop的run方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    ...//省略代碼
                    );
    }
           

(4)run

protected void run() {
        for (;;) {
            try {
		
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
					//處理注冊時間
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
						//執行隊列中的任務
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
             ...//省略代碼
        }
    }
           

這裡還沒有注冊事件,是以會直接執行隊列中的任務。

(5)runAllTasks

protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        //取出任務
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
        //執行任務
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

           

這裡safeExecute方法就是調用隊列任務的run方法。這裡将調用是AbstractChannel的register0方法。

(6)register0

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //向selector注冊channel
                doRegister();
                neverRegistered = false;
                registered = true;
				//調用我們之前建立的PendingHandlerAddedTask中的方法
				//作用在下面分析
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
				//這裡把Register事件傳播,基本沒做什麼
                pipeline.fireChannelRegistered();
          				//這時候channel還沒激活,因為端口還沒綁定,isActive是false。
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                      
                        beginRead();
                    }
                }
            } catch (Throwable t) {
               ...//省略代碼
            }
        }
           

(7)doRegister

這裡調用的是AbstractNioChannel類的doRegister方法。

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
			//向Selector選擇器注冊前面建立的ServerSocketChannel對象,并且感興趣的事件為0,注意這裡注冊的還不是accept事件,因為端口還沒綁定
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
               ...//省略代碼
            }
        }
    }
           

(8)invokeHandlerAddedIfNeeded

final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            callHandlerAddedForAllHandlers();
        }
    
           
private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;

            // This Channel itself was registered.
			//registered設為true,之後再向Pipeline中加入Handler和第一次加入不一樣了
            registered = true;

            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            this.pendingHandlerCallbackHead = null;
        }

        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }

           

這裡調用的是之前建立的PendingHandlerAddedTask對象,調用它的execute方法。

void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                         ...//省略代碼
                }
            }
        }
           

callHandlerAdded0方法

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
         
            ctx.setAddComplete();
            ctx.handler().handlerAdded(ctx);
        } catch (Throwable t) {
            ...//省略代碼
    }
           
handlerAdded方法:
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }
           

上述方法最後調用了ChannelInitializer類的initChannel。

(9)initChannel

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
			//這裡調用的就是之前定義ChannelInitializer的initChannel,注意這裡還不是我們定義的ChannelInitializer。
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
			//将ChannelInitializer從Pipeline中移除
                remove(ctx);
            }
            return true;
        }
        return false;
    }
           

我們看下之前的ChannelInitializer.

public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                    //将ServerBootstrapAcceptor加入到Pipeline
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
           

現在Pipeline有三個Handler,順序為HeadContext、ServerBootstrapAcceptor、TailContext。到這裡initAndRegister方法分析完了,回到doBind方法繼續分析。

4.2 doBind0

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
            
              channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
           

這裡執行AbstractChannel的bind方法。

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
           

這裡調用了pipeline的bind方法。先說明netty中所有出站事件都TailContext傳播,入站事件從HeadContext傳播。

(1)bind

這裡的bind方法是從TailContext節點傳播,是個出站事件

public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }
           
這裡調用的是父類AbstractChannelHandlerContext的bind方法。
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
            ...//省略代碼
            //找到下一個可以處理出站事件的handler
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }
           

(2)findContextOutbound

findContextOutbound方法從目前節點向前找OutboundHandler:
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
           

目前我們的handler隻有三個,能處理出站事件隻有HeadContext。看下它的bind方法。

(3)bind

public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }
           

調用了AbstractChannel内部類AbstractUnsafe的bind方法。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
               ...//省略代碼
            boolean wasActive = isActive();
            try {
            /綁定端口
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
			//這時候channel已經激活
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
					//傳播Active事件
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }
           

(4)doBind

protected void doBind(SocketAddress localAddress) throws Exception {
   //java原生api綁定端口,并且考慮了版本相容
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
           

端口綁定後,channel已激活,這時候會調用fireChannelActive方法。這是個入站事件,從HeadContext開始傳播。看下頭結點的channelActive方法。

public void channelActive(ChannelHandlerContext ctx) throws Exception {
	//先把Active向後傳播,這裡ServerBootstrapAcceptor、TailContext都沒做什麼
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }
           

(5)readIfIsAutoRead

private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }
           
public Channel read() {
   //read是個出站事件
        pipeline.read();
        return this;
    }
           

read是個出站事件,是以從TailContext開始傳播,但最終調用的是HeadContext的方法。

public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

           

(6)beginRead

public final void beginRead() {
             ...//省略代碼

            try {
                doBeginRead();
            } catch (final Exception e) {
                 ...//省略代碼
            }
        }
           

這裡最終調用了AbstractNioChannel的doBeginRead方法。

protected void doBeginRead() throws Exception {
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
		//将感興趣的事件注冊成了OP_ACCEPT
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
           

到這裡伺服器啟動過程已經分析完畢。

5 服務端啟動過程總結

  • 首先建立EventLoopGroup和ServerBootstrap對象,給EventLoopGroup和ServerBootstrap對象設定了相關屬性,比如EventLoop數組。
  • 建立NioServerSocketChannel對象,設定相關屬性,比如初始化它的Pipeline,建立了JDK的ServerSocketChannelImpl對象,設定通道為非阻塞。
  • 從EventLoop數組選取一個EventLoop,并且建立一個線程和EventLoop綁定,啟動線程執行任務,在任務中将上述channel注冊到selector上,但暫時不阻塞OP_ACCEPT事件,最後将ServerBootstrapAcceptor加入Pipeline。
  • 将上述channel綁定監聽端口,在selector上注冊OP_ACCEPT事件等待用戶端連接配接。

6 服務端處理OP_ACCEPT事件

上面我們分析了服務端啟動的過程,接下來分析如果有用戶端發起連接配接後,服務端怎麼處理。

我們之前分析啟動的過程中分析了NioEventLoop的run方法。這個方法會不斷自旋處理隊列中的任務以及各種選擇器事件,其實處理選擇器事件的是其中的processSelectedKeys方法。

private void processSelectedKeys() {
        if (selectedKeys != null) {
        //執行的是這個方法
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
           

6.1 processSelectedKeysOptimized

private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
              selectedKeys.keys[i] = null;
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
           //我們使用的是NIO,是以走這段代碼
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            if (needsToSelectAgain) {
              selectedKeys.reset(i + 1);
                selectAgain();
                i = -1;
            }
        }
    }
           

這裡循環所有事件,調用processSelectedKey方法處理

(1)processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        //檢查事件是否有效
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
            //獲得channel的eventLoop 
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
           
                return;
            }
        
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
           // 處理連接配接事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
           // 處理寫事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
          // 處理讀事件或者ACCEPT事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
           

我們這裡是ACCEPT事件,調用的是NioMessageUnsafe的read方法。

6.2 read

public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                   		//建立用戶端連接配接
                        int localRead = doReadMessages(readBuf);
                         .//省略代碼
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //傳播read事件
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //清除readBuf
                readBuf.clear();
                allocHandle.readComplete();
                   //傳播ReadComplete事件
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);
                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
               
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
           

(1)doReadMessages

protected int doReadMessages(List<Object> buf) throws Exception {
 //擷取用戶端連接配接,這裡的SocketChannel是通過java原生的api獲得
        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {
            if (ch != null) {
            //将上述SocketChannel 封裝成NioSocketChannel對象,将入list對象
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
           .//省略代碼
        return 0;
    }
           

這裡的list對象是NioMessageUnsafe類的成員變量,這裡儲存的所有連接配接上服務端的SocketChannel 。這裡建立的NioSocketChannel對象,服務端啟動時候建立的是NioServerSocketChannel對象。看下兩個的類圖。

Netty系列9-服務端源源碼分析
Netty系列9-服務端源源碼分析

兩個很相似,前面講過NioServerSocketChannel對象建立時候調用了父類AbstractNioMessageChannel的構造函數,AbstractNioMessageChannel又調用了父類AbstractNioChannel。NioSocketChannel建立的時候先調用的是AbstractNioByteChannel的構造函數,然後也調用了AbstractNioChannel。是以同樣NioSocketChannel建立的和NioServerSocketChannel建立過程差不多,内部同樣建立了Pipeline等對象。不同的是NioServerSocketChannel預設的InterestOp是OP_ACCEPT,而NioSocketChannel是OP_READ。

(2)fireChannelRead

注意現在有兩個Pipeline,一個服務端NioServerSocketChannel的Pipeline,這裡叫主Pipeline,一個是用戶端連接配接後建立的NioSocketChannel的Pipeline,這個叫副Pipeline。這裡的傳播事件Pipeline是主Pipeline。

這裡的read是個入站事件,主Pipeline裡面現在有三個handle,HeadContext、ServerBootstrapAcceptor、TailContext。是以這裡從HeadContext開始向下傳播,HeadContext隻是向下傳播,TailContext也沒做什麼,主要看下ServerBootstrapAcceptor的channelRead方法。

(3)channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {
			 //這裡是NioSocketChannel
            final Channel child = (Channel) msg;
            //這裡的pipeline是副pipeline
            child.pipeline().addLast(childHandler);
			//設定屬性
            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
            //我們在服務端設定了兩個EventLoopGroup,這裡的childGroup是子			EventLoopGroup
            //調用的還是MultithreadEventLoopGroup的register方法
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
           

(4)register

這個register方法在之前已經分析過了,不再重複了。就是給NioSocketChannel配置設定一個EventLoop,并且啟動一個線程和EventLoop綁定,最終執行register0。

(5)register0

register0方法我們之前也分析過,就是将NioSocketChannel注冊到selector上,并且将我們自己定義的handler加入pipeline。和之前不同的是這裡的channel已經激活了。由于是第一次注冊,是以會執行pipeline.fireChannelActive()方法。而之前啟動時候是在綁定端口之後才執行的pipeline.fireChannelActive()。

這裡的子pipeline裡面的handler也有三個,HeadContext、EchoServerHandler(我們自定義的)、TailContext。是以和啟動的時候一樣,也是由HeadContext執行邏輯。

public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }
           

最終和之前一樣調用到了AbstractNioChannel的doBeginRead方法。

protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
        //如果這裡除了讀事件還有其他事件,都重新注冊了
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
           

這裡預設的是read事件,是以最後在NioSocketChannel注冊的selector上注冊了read事件。注冊完後回到最開始的read方法繼續執行。

(6)readBuf.clear()

在将所有的NioSocketChannel建立并且注冊了read事件後,把readBuf清除。

(7)fireChannelReadComplete

最後在主channel上傳播ReadComplete事件。ReadComplete事件也是個入站事件,從HeadContext開始傳播。

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelReadComplete();
			//重新再注冊accept事件
            readIfIsAutoRead();
        }
           

ReadComplete事件在ServerBootstrapAcceptor、TailContext中都沒做什麼。readIfIsAutoRead和之前一樣,将NioServerSocketChannel在selector上重新注冊了accept事件。

7 服務端處理READ事件

服務端處理read事件和ACCEPT事件流程差不多,隻不過ACCEPT事件是由NioMessageUnsafe的read方法處理,read事件是由NioByteUnsafe的read方法處理。看下NioByteUnsafe的read方法。

public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            //下面幾行決定用建立什麼樣的ByteBuf,這裡以池化、直接記憶體方式建立ByteBuf
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    //doReadBytes(byteBuf),讀取資料放入byteBuf中
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //沒有讀取到資料
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        //釋放byteBuf
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    //傳播ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());
                allocHandle.readComplete();
                 //傳播ReadComplete事件,這裡再次在selector上注冊了read事件
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
           

這裡流程和上面差不多,具體不再分析了。