天天看点

Netty系列9-服务端源源码分析

我们以一个入门级别的服务端代码进行分析。netty其实就是对原生的java nio编程进行了封装,所以分析中会看到最后调用的都是java原生的API。因为整个过程代码比较多,这里会把一些不是核心流程的代码省略。

先整体看下服务端的代码。

/*线程组*/
        EventLoopGroup group = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();//2
            b.group(group,work)//3
                    /*指明使用NIO进行网络通讯*/
            .channel(NioServerSocketChannel.class)//4
                    //.channel(EpollServerSocketChannel.class)
                    /*指明服务器监听端口*/
                    .localAddress(new InetSocketAddress(port))
                    /*接收到连接请求,新启一个socket通信,也就是channel,每个channel
                    * 有自己的事件的handler*/
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                            //ch.pipeline().addLast(new EchoServerHandler2());
                        }
                    });
            ChannelFuture f = b.bind().sync();//5

           /*阻塞,直到channel关闭*/
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
           

1 创建EventLoopGroup

EventLoopGroup group = new NioEventLoopGroup();
 EventLoopGroup work = new NioEventLoopGroup();
           

这里的创建了两个NioEventLoopGroup,只创建一个也可以,不明白的可以看下前面文章讲的Reactor模式。NioEventLoopGroup可以认为是个线程池,它会给每个连接的channel分配EventLoop,EventLoop就是个线程。

这里创建的是NioEventLoopGroup,分析下创建它的过程做了什么。

1.1 NioEventLoopGroup

先看下构造函数

public NioEventLoopGroup() {
        this(0);
    }
           

整个过程中构造函数调用的比较多,我们只看几个关键点。

public NioEventLoopGroup(int nThreads, Executor executor) {
//这里nThreads就是上面的0,executor为null,SelectorProvider.provider()这个java原生api,得到一个SelectorProvider
        this(nThreads, executor, SelectorProvider.provider());
    }
           

接着调用了父类MultithreadEventLoopGroup的构造方法。

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  //这里判定如果没有指定线程数,设置默认线程数为系统处理器数量*2
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

           

接着再次调用另外一个父类MultithreadEventExecutorGroup的构造方法

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        if (executor == null) {
        //创建执行器
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());//步骤1
        }
        //创建EventLoop数组
        children = new EventExecutor[nThreads];//步骤2
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
            //初始化数组的对象
                children[i] = newChild(executor, args);//步骤3
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                   ...//省略代码
                }
            }
        }
        //创建选择器,选择器会根据算法每次从上述数组选择一个EventLoop返回
        chooser = chooserFactory.newChooser(children);//步骤4
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);//步骤5
        }
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);//步骤6
    }
           

(1)步骤1

前面我们没有指定执行器,所以步骤1创建了默认执行器,看下这个执行器是什么。

public final class ThreadPerTaskExecutor implements Executor {
//这里ThreadFactory 就是用来创建线程的
    private final ThreadFactory threadFactory;
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
    //创建线程,执行任务
        threadFactory.newThread(command).start();
    }
}

           

执行器构造函数参数是newDefaultThreadFactory方法返回的。

protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass());
    }

           

DefaultThreadFactory继承了JDK的ThreadFactory,就是用来创建线程的。

(2)步骤2

步骤2就是创建了一个EventExecutor数组,数组的大小就是我们之前指定的线程大小。

(3)步骤3

newChild创建数据对象,这里newChild调用的是NioEventLoopGroup方法,创建NioEventLoop对象。

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
           

看下创建NioEventLoop的过程。

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
          ...//省略代码
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        //JDK的选择器
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
           

构造器里面创建了我们熟悉的JDK的选择器selector。

这里也调用了父类的构造方法,这里的父类是SingleThreadEventLoop,看名字就知道这是个单线程的。

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        //创建了线程等待队列的大小,这里是2147483647
        tailTasks = newTaskQueue(maxPendingTasks);
    }
           

(4)步骤4

创建选择器,这个选择器决定以什么算法从EventLoop数组中选择一个EventLoop执行任务。

public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
           

这里使用的是PowerOfTwoEventExecutorChooser。

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }
           

(5)步骤5和步骤6

给每一个EventLoop添加了一个线程终止时的监听器。将上述EventLoop数组添加到了一个只读集合。

2 创建ServerBootstrap

ServerBootstrap b = new ServerBootstrap();
           

ServerBootstrap构造函数什么都没做。

3 设置ServerBootstrap参数

b.group(group,work)//3
                    /*指明使用NIO进行网络通讯*/
            .channel(NioServerSocketChannel.class)//4
                    //.channel(EpollServerSocketChannel.class)
                    /*指明服务器监听端口*/
                    .localAddress(new InetSocketAddress(port))
                    /*接收到连接请求,新启一个socket通信,也就是channel,每个channel
                    * 有自己的事件的handler*/
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                            //ch.pipeline().addLast(new EchoServerHandler2());
                        }
                    });
           

3.1 group方法

首先调用了ServerBootstrap的group方法。

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
               ...//省略代码
        this.childGroup = childGroup;
        return this;
    }
           

这里将parentGroup赋值给了父类AbstractBootstrap的group对象,childGroup赋值给了ServerBootstrap的childGroup对象。

这里如果group对象只传入了一个EventLoopGroup,上述两个对象是同一个。

3.2 channel方法

public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
           

这里创建了一个channel工厂,内部持有了一个反射工厂。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
		//构造器反射创建Channel
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

}
           

我们传入的是NioServerSocketChannel的class对象。

1.5 childOption和childHandler

这两个方法就是给ServerBootstrap对象赋值。

4. bind

ChannelFuture f = b.bind().sync();
           

看下ServerBootstrap对象的bind方法。

public ChannelFuture bind() {
	 //校验参数
        validate();
        //端口参数
        SocketAddress localAddress = this.localAddress;
        if (localAddress == null) {
            throw new IllegalStateException("localAddress not set");
        }
        return doBind(localAddress);
    }
           
private ChannelFuture doBind(final SocketAddress localAddress) {
		//初始化channel并注册到selector选择器
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            //绑定端口
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
          ...//省略代码
        }
    }
           

4.1 initAndRegister

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
		//用前面将的channel工厂,利用反射创建channel对象
		//这里创建的是NioServerSocketChannel
            channel = channelFactory.newChannel();
            //初始化
            init(channel);
        } catch (Throwable t) {
         ...//省略代码
        }
		//注册
        ChannelFuture regFuture = config().group().register(channel);
       ...//省略代码
        return regFuture;
    }
           

(1)NioServerSocketChannel

这里先创建了NioServerSocketChannel对象,看下它的构造函数。

public NioServerSocketChannel() {
  这里的DEFAULT_SELECTOR_PROVIDER是JDK的SelectorProvider对象。
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
           

newSocket创建的是ServerSocketChannelImpl对象。

private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            ...//省略代码
        }
    }
           

this调用了本类的构造方法。

public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
           

继续调用了父类的构造方法。

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
		//这里是SelectionKey.OP_ACCEPT,SelectionKey.OP_ACCEPT对应的整数是16,
        this.readInterestOp = readInterestOp;
        try {
		//设置Channel非阻塞
            ch.configureBlocking(false);
        } catch (IOException e) {
            ...//省略代码
        }
    }
           

再看下super调用的父类构造方法。

protected AbstractChannel(Channel parent) {
        this.parent = parent;
		//创建id
        id = newId();
        unsafe = newUnsafe();
		创建Pipeline对象
        pipeline = newChannelPipeline();
    }

           
protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
           

这里创建DefaultChannelPipeline对象。

(2)DefaultChannelPipeline

看下DefaultChannelPipeline的构造方法。

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
           

这里创建了两个Handler对象,一个是HeadContext,HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler接口,可以处理入站和出站事件。另外一个是TailContext,TailContext实现了ChannelInboundHandler,可以处理入站事件。

4.1.1 init方法

创建Channel对象后,调用init方法初始化该对象。

void init(Channel channel) throws Exception {
		//将上述options赋值给该channel
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
		//将上述attrs赋值给该channel
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
		//Channel的Pipeline对象
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
		//给Pipeline对象增加一个Handler
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
           

(1)addLast方法

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
			//创建DefaultChannelHandlerContext对象
            newCtx = newContext(group, filterName(name, handler), handler);
			//将这个Handler加入到TailContext前一个
            addLast0(newCtx);
            if (!registered) {
            //channel还未注册到selector上时,走如下逻辑
                newCtx.setAddPending();
                //这里创建了一个PendingHandlerAddedTask,作用是调用ChannelInitializer的initChannel方法,并将自己从pipeline移除。
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
           

addLast0方法:

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
           

callHandlerCallbackLater方法

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
		//这里的added为true
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
		//将PendingHandlerAddedTask对象赋值给pendingHandlerCallbackHead对象,作用在后面会讲到
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
           

4.1.2 register方法

这里将调用MultithreadEventLoopGroup的register方法。

public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
           
public EventExecutor next() {
   //选择器从EventLoopGroup中选取一个EventLoop
        return chooser.next();
    }
           

这里next方法就是用上面讲的选择器从EventLoopGroup中选取一个EventLoop,调用它的register方法。

这里调用的是SingleThreadEventLoop的register方法。

public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
           
public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //这里调用的是AbstractChannel内部类AbstractUnsafe的register方法
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
           
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
          ...//省略代码
            AbstractChannel.this.eventLoop = eventLoop;
			//当前线程是否是eventLoop的线程
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                //如果不是放入eventLoop的等待队列,等待执行
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                       ...//省略代码
                }
            }
        }
           

这里就是netty中的多线程安全解决方案,在前边netty介绍我们讲过所有channel的IO事件都会和eventLoop绑定,一个eventLoop会和一个线程绑定。这里eventLoop.inEventLoop()会判断当前线程是不是EventLoop指定的线程,

如果不是加入eventLoop的等待队列。这样channel的IO事件都会在一个线程执行。

(1)inEventLoop方法

public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
           

就是将当前线程和eventLoop线程进行对比,这里由于EventLoop还未指定线程,目前为null,当前线程为主线程,所以执行else代码块。

(2)execute

public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
		//加入线程等待队列
        addTask(task);
        if (!inEventLoop) {
		//启动线程
            startThread();
            if (isShutdown() && removeTask(task)) {
			//如果线程池已经关闭,移除任务
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
           
private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }
           

(3)doStartThread

private void doStartThread() {
        assert thread == null;
		//这里调用的就是前面将的用线程工厂创建一个线程,然后执行下面的run方法。
        executor.execute(new Runnable() {
            @Override
            public void run() {
			//将当前线程赋值给NioEventLoop的thread
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
				//这里执行的是NioEventLoop的run方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    ...//省略代码
                    );
    }
           

(4)run

protected void run() {
        for (;;) {
            try {
		
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
					//处理注册时间
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
						//执行队列中的任务
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
             ...//省略代码
        }
    }
           

这里还没有注册事件,所以会直接执行队列中的任务。

(5)runAllTasks

protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        //取出任务
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
        //执行任务
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

           

这里safeExecute方法就是调用队列任务的run方法。这里将调用是AbstractChannel的register0方法。

(6)register0

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //向selector注册channel
                doRegister();
                neverRegistered = false;
                registered = true;
				//调用我们之前创建的PendingHandlerAddedTask中的方法
				//作用在下面分析
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
				//这里把Register事件传播,基本没做什么
                pipeline.fireChannelRegistered();
          				//这时候channel还没激活,因为端口还没绑定,isActive是false。
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                      
                        beginRead();
                    }
                }
            } catch (Throwable t) {
               ...//省略代码
            }
        }
           

(7)doRegister

这里调用的是AbstractNioChannel类的doRegister方法。

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
			//向Selector选择器注册前面创建的ServerSocketChannel对象,并且感兴趣的事件为0,注意这里注册的还不是accept事件,因为端口还没绑定
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
               ...//省略代码
            }
        }
    }
           

(8)invokeHandlerAddedIfNeeded

final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            callHandlerAddedForAllHandlers();
        }
    
           
private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;

            // This Channel itself was registered.
			//registered设为true,之后再向Pipeline中加入Handler和第一次加入不一样了
            registered = true;

            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            this.pendingHandlerCallbackHead = null;
        }

        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }

           

这里调用的是之前创建的PendingHandlerAddedTask对象,调用它的execute方法。

void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                         ...//省略代码
                }
            }
        }
           

callHandlerAdded0方法

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
         
            ctx.setAddComplete();
            ctx.handler().handlerAdded(ctx);
        } catch (Throwable t) {
            ...//省略代码
    }
           
handlerAdded方法:
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }
           

上述方法最后调用了ChannelInitializer类的initChannel。

(9)initChannel

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
			//这里调用的就是之前定义ChannelInitializer的initChannel,注意这里还不是我们定义的ChannelInitializer。
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
			//将ChannelInitializer从Pipeline中移除
                remove(ctx);
            }
            return true;
        }
        return false;
    }
           

我们看下之前的ChannelInitializer.

public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                    //将ServerBootstrapAcceptor加入到Pipeline
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
           

现在Pipeline有三个Handler,顺序为HeadContext、ServerBootstrapAcceptor、TailContext。到这里initAndRegister方法分析完了,回到doBind方法继续分析。

4.2 doBind0

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
            
              channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
           

这里执行AbstractChannel的bind方法。

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
           

这里调用了pipeline的bind方法。先说明netty中所有出站事件都TailContext传播,入站事件从HeadContext传播。

(1)bind

这里的bind方法是从TailContext节点传播,是个出站事件

public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }
           
这里调用的是父类AbstractChannelHandlerContext的bind方法。
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
            ...//省略代码
            //找到下一个可以处理出站事件的handler
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }
           

(2)findContextOutbound

findContextOutbound方法从当前节点向前找OutboundHandler:
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
           

目前我们的handler只有三个,能处理出站事件只有HeadContext。看下它的bind方法。

(3)bind

public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }
           

调用了AbstractChannel内部类AbstractUnsafe的bind方法。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
               ...//省略代码
            boolean wasActive = isActive();
            try {
            /绑定端口
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
			//这时候channel已经激活
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
					//传播Active事件
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }
           

(4)doBind

protected void doBind(SocketAddress localAddress) throws Exception {
   //java原生api绑定端口,并且考虑了版本兼容
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
           

端口绑定后,channel已激活,这时候会调用fireChannelActive方法。这是个入站事件,从HeadContext开始传播。看下头结点的channelActive方法。

public void channelActive(ChannelHandlerContext ctx) throws Exception {
	//先把Active向后传播,这里ServerBootstrapAcceptor、TailContext都没做什么
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }
           

(5)readIfIsAutoRead

private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }
           
public Channel read() {
   //read是个出站事件
        pipeline.read();
        return this;
    }
           

read是个出站事件,所以从TailContext开始传播,但最终调用的是HeadContext的方法。

public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

           

(6)beginRead

public final void beginRead() {
             ...//省略代码

            try {
                doBeginRead();
            } catch (final Exception e) {
                 ...//省略代码
            }
        }
           

这里最终调用了AbstractNioChannel的doBeginRead方法。

protected void doBeginRead() throws Exception {
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
		//将感兴趣的事件注册成了OP_ACCEPT
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
           

到这里服务器启动过程已经分析完毕。

5 服务端启动过程总结

  • 首先创建EventLoopGroup和ServerBootstrap对象,给EventLoopGroup和ServerBootstrap对象设置了相关属性,比如EventLoop数组。
  • 创建NioServerSocketChannel对象,设置相关属性,比如初始化它的Pipeline,创建了JDK的ServerSocketChannelImpl对象,设置通道为非阻塞。
  • 从EventLoop数组选取一个EventLoop,并且创建一个线程和EventLoop绑定,启动线程执行任务,在任务中将上述channel注册到selector上,但暂时不阻塞OP_ACCEPT事件,最后将ServerBootstrapAcceptor加入Pipeline。
  • 将上述channel绑定监听端口,在selector上注册OP_ACCEPT事件等待客户端连接。

6 服务端处理OP_ACCEPT事件

上面我们分析了服务端启动的过程,接下来分析如果有客户端发起连接后,服务端怎么处理。

我们之前分析启动的过程中分析了NioEventLoop的run方法。这个方法会不断自旋处理队列中的任务以及各种选择器事件,其实处理选择器事件的是其中的processSelectedKeys方法。

private void processSelectedKeys() {
        if (selectedKeys != null) {
        //执行的是这个方法
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
           

6.1 processSelectedKeysOptimized

private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
              selectedKeys.keys[i] = null;
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
           //我们使用的是NIO,所以走这段代码
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            if (needsToSelectAgain) {
              selectedKeys.reset(i + 1);
                selectAgain();
                i = -1;
            }
        }
    }
           

这里循环所有事件,调用processSelectedKey方法处理

(1)processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        //检查事件是否有效
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
            //获得channel的eventLoop 
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
           
                return;
            }
        
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
           // 处理连接事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
           // 处理写事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
          // 处理读事件或者ACCEPT事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
           

我们这里是ACCEPT事件,调用的是NioMessageUnsafe的read方法。

6.2 read

public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                   		//创建客户端连接
                        int localRead = doReadMessages(readBuf);
                         .//省略代码
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //传播read事件
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //清除readBuf
                readBuf.clear();
                allocHandle.readComplete();
                   //传播ReadComplete事件
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);
                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
               
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
           

(1)doReadMessages

protected int doReadMessages(List<Object> buf) throws Exception {
 //获取客户端连接,这里的SocketChannel是通过java原生的api获得
        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {
            if (ch != null) {
            //将上述SocketChannel 封装成NioSocketChannel对象,将入list对象
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
           .//省略代码
        return 0;
    }
           

这里的list对象是NioMessageUnsafe类的成员变量,这里保存的所有连接上服务端的SocketChannel 。这里创建的NioSocketChannel对象,服务端启动时候创建的是NioServerSocketChannel对象。看下两个的类图。

Netty系列9-服务端源源码分析
Netty系列9-服务端源源码分析

两个很相似,前面讲过NioServerSocketChannel对象创建时候调用了父类AbstractNioMessageChannel的构造函数,AbstractNioMessageChannel又调用了父类AbstractNioChannel。NioSocketChannel创建的时候先调用的是AbstractNioByteChannel的构造函数,然后也调用了AbstractNioChannel。所以同样NioSocketChannel创建的和NioServerSocketChannel创建过程差不多,内部同样创建了Pipeline等对象。不同的是NioServerSocketChannel默认的InterestOp是OP_ACCEPT,而NioSocketChannel是OP_READ。

(2)fireChannelRead

注意现在有两个Pipeline,一个服务端NioServerSocketChannel的Pipeline,这里叫主Pipeline,一个是客户端连接后创建的NioSocketChannel的Pipeline,这个叫副Pipeline。这里的传播事件Pipeline是主Pipeline。

这里的read是个入站事件,主Pipeline里面现在有三个handle,HeadContext、ServerBootstrapAcceptor、TailContext。所以这里从HeadContext开始向下传播,HeadContext只是向下传播,TailContext也没做什么,主要看下ServerBootstrapAcceptor的channelRead方法。

(3)channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {
			 //这里是NioSocketChannel
            final Channel child = (Channel) msg;
            //这里的pipeline是副pipeline
            child.pipeline().addLast(childHandler);
			//设置属性
            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
            //我们在服务端设置了两个EventLoopGroup,这里的childGroup是子			EventLoopGroup
            //调用的还是MultithreadEventLoopGroup的register方法
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
           

(4)register

这个register方法在之前已经分析过了,不再重复了。就是给NioSocketChannel分配一个EventLoop,并且启动一个线程和EventLoop绑定,最终执行register0。

(5)register0

register0方法我们之前也分析过,就是将NioSocketChannel注册到selector上,并且将我们自己定义的handler加入pipeline。和之前不同的是这里的channel已经激活了。由于是第一次注册,所以会执行pipeline.fireChannelActive()方法。而之前启动时候是在绑定端口之后才执行的pipeline.fireChannelActive()。

这里的子pipeline里面的handler也有三个,HeadContext、EchoServerHandler(我们自定义的)、TailContext。所以和启动的时候一样,也是由HeadContext执行逻辑。

public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }
           

最终和之前一样调用到了AbstractNioChannel的doBeginRead方法。

protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
        //如果这里除了读事件还有其他事件,都重新注册了
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
           

这里默认的是read事件,所以最后在NioSocketChannel注册的selector上注册了read事件。注册完后回到最开始的read方法继续执行。

(6)readBuf.clear()

在将所有的NioSocketChannel创建并且注册了read事件后,把readBuf清除。

(7)fireChannelReadComplete

最后在主channel上传播ReadComplete事件。ReadComplete事件也是个入站事件,从HeadContext开始传播。

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelReadComplete();
			//重新再注册accept事件
            readIfIsAutoRead();
        }
           

ReadComplete事件在ServerBootstrapAcceptor、TailContext中都没做什么。readIfIsAutoRead和之前一样,将NioServerSocketChannel在selector上重新注册了accept事件。

7 服务端处理READ事件

服务端处理read事件和ACCEPT事件流程差不多,只不过ACCEPT事件是由NioMessageUnsafe的read方法处理,read事件是由NioByteUnsafe的read方法处理。看下NioByteUnsafe的read方法。

public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            //下面几行决定用创建什么样的ByteBuf,这里以池化、直接内存方式创建ByteBuf
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    //doReadBytes(byteBuf),读取数据放入byteBuf中
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //没有读取到数据
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        //释放byteBuf
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    //传播ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());
                allocHandle.readComplete();
                 //传播ReadComplete事件,这里再次在selector上注册了read事件
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
           

这里流程和上面差不多,具体不再分析了。