天天看点

Netty4.1源码分析—— 服务端启动过程一、引言与结论二、NioEventLoopGroup、NioEventLoop三、Server初始化与启动四、总结

文章目录

  • 一、引言与结论
  • 二、NioEventLoopGroup、NioEventLoop
    • 2.1 NioEventLoopGroup
    • 2.2 NioEventLoop
      • 2.2.1 如何开启Selector?
      • 2.2.2 NioEventLoop的启动
      • 2.2.3 NioEventLoop的事件循环
        • 2.2.3.1 SelectStrategy
        • 2.2.3.2 EventLoop中的任务队列
        • 2.2.3.3 EventLoop的任务执行
        • 2.2.3.4 非IO任务执行
        • 2.2.3.5 IO任务执行
  • 三、Server初始化与启动
    • 3.1 初始化与注册
      • 3.1.1 如何创建ServerSocketChannel?
      • 3.1.2 初始化ServerSocketChannel
      • 3.1.3 初始化客户端连接(SocketChannel)
      • 3.1.4 Channel如何绑定EventLoop?
      • 3.1.5 Channel如何注册到Selector?
    • 3.2. 绑定端口并注册事件
  • 四、总结
本文只代表笔者一人的理解和叙述,笔者功力尚浅,如有错误,还请各位大神斧正。

一、引言与结论

先说结论,

Netty

Server

启动过程简单来说是加载配置以及为即将到来的连接做接收的准备,也就是为客户端连接服务端这一步做好准备。因为

Netty

的架构是

Reactor

模式的实现,所以这一步其实是让

ServerSockerChannel

注册上

OP_ACCEPT

事件,即接受连接事件。

谈及

ServerSockerChannel

,我们需要理解

Netty

中的

sockerChannel

分为服务端的

ServerSocketChannel

SocketChannel

以及客户端的

SocketChannel

。不同的

SocketChannel

对不同的事件感兴趣,下面表格给出了各自

SocketChannel

感兴趣的事件:

client/server channel OP_ACCEPT OP_CONNECT OP_WRITE OP_READ
client SocketChannel N Y Y Y
server ServerSocketChannel Y N N N
server SocketChannel N N Y Y

这三个不同的

SockerChannel

关系可以这么理解:

  • ServerSocketChannel

    只对即将到来的

    OP_ACCEPT

    感兴趣
  • 客户端发起连接,

    ServerSocketChannel

    响应

    OP_ACCEPT

    事件创建出服务端的

    SocketChannel

  • 客户端的

    SocketChannel

    与服务端的

    SocketChannel

    是一对一的关系,是这次连接的抽象体现

为了方便以及篇幅的原因,此次源码分析只采用

NIO

的实现进行分析,其它如

Epoll、OIO

等读者可自行按照该篇分析去源码里查看。

二、NioEventLoopGroup、NioEventLoop

NioEventLoopGroup

是一个事件循环组,它的作用是创建并管理其内部的

NioEventLoop

。其内部采用了池化技术的思想,所以我们可以简单的将它理解为一个线程池,只不过这个线程池并和我们熟知的

JDK

线程池

ThreadPoolExecutor

不怎么相同,

NioEventLoopGroup

里面的每个线程都拥有单独的任务队列,同时它也没有JDK线程池那套回收线程的机制。

NioEventLoopGroup

里线程具体表现类体现为

NioEventLoop

类(

EventLoop

并不等同

Thread

,可以理解一个EventLoop伴随着一个Thread),在创建

NioEventLoopGroup

时就已经定好了

Group

里有多少个

NioEventLoop

,同时在创建

NioEventLoop

时调用

JDK

NIO

openSelector

方法创建一个

Selector

对于两者的关系我们可以简单的认定为是线程池和线程的关系,但是要注意的是,

NioEventLoopGroup

没有

ThreadPoolExecutor

的回收、创建线程策略,而

NioEventLoop

也不是一个线程,它只是绑定一个线程的

Executor

2.1 NioEventLoopGroup

首先来看

NioEventLoopGroup

类的继承关系:

Netty4.1源码分析—— 服务端启动过程一、引言与结论二、NioEventLoopGroup、NioEventLoop三、Server初始化与启动四、总结

从图中可以看出,

NioEventLoopGroup

类最上面实现的是

Executor

接口,这也从侧面证明了

NioEventLoopGroup

类其实是一个线程池。从

NioEventLoopGroup

类点进它的构造器:

public NioEventLoopGroup(ThreadFactory threadFactory) {
    // 创建传入Selector,单例所有的EventLoopGroup共用一个Selector
    this(0, threadFactory, SelectorProvider.provider());
}

...SelectorProvider类
private static SelectorProvider provider = null;
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            //...
                provider = sun.nio.ch.DefaultSelectorProvider.create();
                return provider;
            }
        });
    }
}
           

在创建一个

NioEventLoopGroup

时,调用

SelectorProvider

类提供一个

SelectorProvider

,而

provider()

方法采用

synchronized

加单例的方式创建一个

SelectorProvider

。因为

Boss Group

Work Group

都是该类创建

SelectorProvider

,所以这两者共用一个

SelectorProvider

。而且此处也是

Netty

跨平台的实现之一(其实此处调用的是

NIO

的实现),进入

DefaultSelectorProvider.create()

源码你会发现如下:

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    public static SelectorProvider create() {
        return new WindowsSelectorProvider();
    }
}
           

直接创建一个

windows

版本的

Selector Provider

,但是如果你查看

Mac

或者

Linux

下的源码你会发现,这里被替换成

KQueue

Epoll

的实现。

继续查看

NioEventLoopGroup

的调用栈,

NioEventLoopGroup

的构造函数最终调用的是

MultithreadEventExecutorGroup

的构造方法,在该方法里面创建了一个执行器

Executor

,并且实例化了多个

NioEventLoop

:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    //如果外界未传进一个executor,自己实例化一个......
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 创建线程组,并且实例化每一个
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } 
        //......
    }
    // 初始化一个选择器工厂
    chooser = chooserFactory.newChooser(children);
    //......
}
           

代码并不难,需要注意的是此处的

chooserFactory

,该工厂的作用是提供多种选择策略,该策略是在进行一个连接建立时,执行其策略类的

next()

方法来选择一个

EventLoop

。选出来的

EventLoop

会和新创建的

SocketChannel

绑定,关于这点后面在详细分析。

我们接着看看

Netty

是如何实例化一个

NioEventLoop

的,接着进入

newChild(executor, args)

方法的实现,该方法是个抽象方法,有着

Epoll、KQueue、NIO

的多种实现,此处我们只分析

NIO

的实现:

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    SelectorProvider selectorProvider = (SelectorProvider) args[0];
    SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
    RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
    EventLoopTaskQueueFactory taskQueueFactory = null;
    EventLoopTaskQueueFactory tailTaskQueueFactory = null;

    int argsLength = args.length;
    if (argsLength > 3) {
        taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
    }
    if (argsLength > 4) {
        tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
    }
    return new NioEventLoop(this, executor, selectorProvider,
            selectStrategyFactory.newSelectStrategy(),
            rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
           

需要注意的

SelectStrategyFactory

是构造

Selector

进行轮询

IO事件

的一个策略实现类工厂,这个放在讲轮询

IO事件

时再详细介绍。

RejectedExecutionHandler

,拒绝执行处理

handler

,其作用是在

NioEventLoop

所专属的任务队列满了时候拒绝处理的一种策略,这点和

JDK线程池

的拒绝策略作用是差不多相同的。在Netty中默认队列里超过16个任务就会执行拒绝策略:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
    //...
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    //...
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

protected void addTask(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    //队列满,拒绝任务
    if (!offerTask(task)) {
        reject(task);
    }
}
           

NioEventLoopGroup和JDK线程池最大的区别在于,在Netty中每个NioEventLoop(一个线程)都有其专属的一个任务队列,而JDK里线程池所有的线程都共享一个任务队列。

2.2 NioEventLoop

同样的,先看NioEventLoop的继承关系:

Netty4.1源码分析—— 服务端启动过程一、引言与结论二、NioEventLoopGroup、NioEventLoop三、Server初始化与启动四、总结

从类图中看出,

NioEventLoop

继承自

SingleThreadEventLoop

,通时该类继承自

SingleThreadEventExecutor

并且实现了

ScheduledExecutorService

接口。从字面意义上理解是单线程事件处理器,表明

NioEventLoop

是单线程的架构,并且作用是处理事件。从它实现的接口来看,它不仅可以执行任务,还能进行任务调度。

2.2.1 如何开启Selector?

从NioEventLoopGroup最终调用NioEventLoop的构造器的处开始阅读:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
    super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
            rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    //......
}

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
    //.....
    selectedKeys = selectedKeySet;
}
           

在这一步,自然最重要的方法就是

openSelector

了,简明扼要的打开了一个

Selector

,在

windows

下,这里就是

WindowsSelectorProvider

调用

openSelector

方法创建一个

Selector

,而

openSelector

方法是

JDK NIO

的实现,就不在此展开讨论了。

2.2.2 NioEventLoop的启动

既然

NioEventLoop

是一个

Executor

,自然就应该从它的

execute

提交任务方法开始着手进行分析,可以从它的父类

SingleThreadEventLoop

中找到它的

execute

方法:

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    // 增加到任务队列
    addTask(task);
    // 不是则开启此线程
    if (!inEventLoop) {
        startThread();
        //....
    }
    //....
}
           

从代码里可以看出

execute

方法首先判断当前线程是不是EventLoop绑定的线程,然后把任务加入该

NioEventLoop

对应的任务队列里。对于一个新的

NioEventLoop

而言,它的线程上下文肯定是在其它的线程环境中,所以这一步

inEventLoop

一定是

false

。也就是说,EventLoop伴随的线程的启动是该EventLoop第一次执行任务的时候。

startThread

方法是调用

CAS

确保

NioEventLoop

对应的线程没有被启动过,所以略过

startThread

直接看它调用的

doStartThread

方法:

private void doStartThread() {
    assert thread == null;
    // 创建一个线程
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 赋值当前线程给EventLoop
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // 执行事件循环
                SingleThreadEventExecutor.this.run();
                success = true;
            } 
            //......
           

在这一步,调用了一个

executor

execute

方法。如果读者记忆好的话,应该会记得上文分析

NioEventLoopGroup

创建的时候,会传入一个

ThreadPerTaskExecutor

类,也就是此处的

executor

。来看它源码:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
           

该类的

execute

方法是直接创建一个线程并开启该线程,也就是在这一步伴随

NioEventLoop

一生的线程才被真正的创建好并执行,执行的主要逻辑则是

SingleThreadEventExecutor

run

方法,即

Netty

中的事件循环。

NioEventLoop

从创建到启动小结一下:

  • 第一步:调用

    JDK

    NIO

    方法开启一个

    Selector

    ,这个Selector是当前操作系统所支持的Selector,在Linux下就是Epoll的Selector,在Mac下就是KQueue的Selector。
  • 第二步:在第一次执行任务时,判断当前线程是否是EventLoop线程,不管是否是都将任务加入任务队列,如果不是则进行第三步。
  • 第三步:调用

    ThreadPerTaskExecutor

    类创建一个线程,将事件循环作为任务传入,并启动该线程。

2.2.3 NioEventLoop的事件循环

对于已经绑定线程的

EventLoop

来说,除非它被销毁,不然这个线程会伴随它的整个生命周期。

Netty EventLoop

线程模型带来的最终好处就是减少了线程切换上下文带来的开销。在

Netty

中,一个

SocketChannel

只能绑定一个

EventLoop

,也就是这个

channel

上产生的所有IO事件都会由

EventLoop

所完成,也就是在一个线程中完成。

run

方法内部是一个死循环,不断地从

Selector

中取出事件来执行。因为这个方法比较长,在分析这个方法前,先分步骤介绍该方法。

2.2.3.1 SelectStrategy

SelectStrategy

表明此次

Selector

该怎么执行的策略,它有三个值:

  • CONTINUE

    :表明此次循环直接跳过,不执行后续步骤,这一步也意味着IO事件没有发生。
  • BUSY_WAIT

    :忙等待是

    NIO

    所不支持的。
  • SELECT

    :表明

    EventLoop

    的任务队列里没有任务,于是采用

    select(timeout)

    来等待IO事件,timeout的值取决于到下一次定时任务被调度的时间差值。

明白了上述三个值,可以来看看

EventLoop

中是如何计算这个策略的:

//计算select策略
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());

//2
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

//3
final class DefaultSelectStrategy implements SelectStrategy {
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
    
    private DefaultSelectStrategy() { }
    
    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
}
           

调用

DefaultSelectStrategy

calculateStrategy

方法,判断队列中是否有任务。如果有任务,则调用

selectNow

方法,没有任务则返回

SELECT

状态。关于

selectNow

方法其实调用的是

NIO Selector

的几个

select

方法,它们分别如下:

  • select()

    : 阻塞到至少有一个通道在你注册的事件上就绪。返回的

    int

    值表示有多少通道已经就绪。
  • select(long timeout)

    :与

    select()

    一样,但是最长会阻塞

    timeout

    毫秒来等待。
  • selectNow()

    : 不会阻塞,不管什么通道就绪都立刻返回,此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。

了解了这些后再来看

EventLoop

是如何对于上述状态进行处理的:

int strategy;
try {
    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
    switch (strategy) {
    // 继续循环
    case SelectStrategy.CONTINUE:
        continue;
    // 忙等待,NIO不支持
    case SelectStrategy.BUSY_WAIT:
        // fall-through to SELECT since the busy-wait is not supported with NIO
    // 代表需要等待一段时间,调用select(timeout)
    // timeout是到下一次调度任务执行时间的间隔
    case SelectStrategy.SELECT:
        // 计算到下一次任务的调度死亡时间线
        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
        // 等于-1则调用select()方法进行阻塞
        if (curDeadlineNanos == -1L) {
            curDeadlineNanos = NONE; // nothing on the calendar
        }
        nextWakeupNanos.set(curDeadlineNanos);
        try {
            // 如果没有任务,执行select(timeout)方法
            if (!hasTasks()) {
                // 根据curDeadlineNanos的值调用select(timeout)或者select()
                strategy = select(curDeadlineNanos);
            }
        } finally {
            // This update is just to help block unnecessary selector wakeups
            // so use of lazySet is ok (no race condition)
            nextWakeupNanos.lazySet(AWAKE);
        }
        // fall through。默认表示有IO事件产生
    default:
    }
} catch (IOException e) {
    // 出现IO异常则重构selector
    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
    // the selector and retry. https://github.com/netty/netty/issues/8566
    rebuildSelector0();
    selectCnt = 0;
    handleLoopException(e);
    continue;
}
           

调用

selectNow

还是调用

select(timeout)

是取决于任务队列是是否有任务而言的。对于任务队列中有任务来说,假如没有IO事件产生,则策略返回

CONTINUE

继续下一次循环,有任务则走

default

分支进行处理IO事件;而对于任务队列中没有任务来说,都是返回

SELECT

策略,表明调用

select(timeout)

方法来进行阻塞事件循环,阻塞的时间取决于到下次任务被调度的时间差。而如果没有定时任务,那么就调用

select()

方法进行阻塞。如图:

Netty4.1源码分析—— 服务端启动过程一、引言与结论二、NioEventLoopGroup、NioEventLoop三、Server初始化与启动四、总结

执行多种策略的好处就是可以为了避免线程一直循环空跑,浪费CPU资源。在获取新的IO事件或者其它任务要阻塞线程,避免一直循环,等待有新的IO事件或者任务的时候才唤醒线程。

2.2.3.2 EventLoop中的任务队列

EventLoop

中的任务队列总共分为三种,也就是不同的任务种类保存在不同的队列中:

  • taskQueue:也就是普通的任务,该队列存储IO任务和非定时的普通任务,比如

    safeExecute

    提交的任务。
  • scheduleTaskQueue: 如同上文所说,

    EventLoop

    实现了

    ScheduledExecutorService

    接口,也就意味着它拥有了定时处理调度任务的能力。
  • tailTaskQueue:目前在Netty的源码中没有看到应用的场景,可能是为了以后扩展。

2.2.3.3 EventLoop的任务执行

在Netty中,任务的种类如果从功能性区分,则分为普通任务和定时任务。而如果从业务类型上区分,那么就分为IO任务和非IO任务,这里的非IO任务指的是不涉及到处理IO操作的一些操作。也就意味着涉及到一个连接的逻辑业务和处理IO事件都是在一个

EventLoop

内执行,即在一个线程内执行。在这时候分配两者的时间占比就显得尤为重要了,也因此Netty在此处设计了一个ioRatio变量,处理这种情况:

// ioRatio表示一次循环IO任务的占比,任务分为IO任务和非IO任务,比如doBind之类的
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
    try {
        // 有IO事件
        if (strategy > 0) {
            processSelectedKeys();
        }
    } finally {
        // 即便IO 100%,还是要处理非IO任务的
        // Ensure we always run tasks.
        ranTasks = runAllTasks();
    }
} else if (strategy > 0) {
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        // 计算io任务的时间,根据该时间和ioRatio来计算出非io任务的执行时间
        // Ensure we always run tasks.
        final long ioTime = System.nanoTime() - ioStartTime;
        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
} else {
    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
           

从源码可以看出,ioRatio变量指的是IO任务在一次

EventLoop

里所占的时间比例,它默认是50,即占比50%。当ioRatio变为100的时候,Netty将首先执行处理完所有的IO事件,然后再去处理所有的非IO任务,这种场景非常适合于IO密集型的程序,比如文件传输等等。而如果ioRatio不是100,那么Netty还是首先处理IO任务,然后根据处理IO任务的时间来计算出执行非IO任务的限定时间。

Netty通过巧妙的设置ioRatio来平衡IO任务和非IO任务的时间,ioRatio越低则越适合于CPU密集型的程序。但是Netty终究是一个IO框架,必须保证IO事件被处理才能取处理其它的逻辑业务。结合strategy所画的图,将这部分逻辑加上:

Netty4.1源码分析—— 服务端启动过程一、引言与结论二、NioEventLoopGroup、NioEventLoop三、Server初始化与启动四、总结

2.2.3.4 非IO任务执行

运行非IO任务有两个方法,两者互为重构,即

runAllTasks()

方法和即

runAllTasks(long timeoutNanos)

方法。前者负责在一次事件循环中跑完所有队列中的任务,后者负责在给定时间内执行队列中的任务。首先来看

runAllTasks()

方法:

protected boolean runAllTasks() {
    assert inEventLoop();
    // 表示是否scheduledTask里面还有任务
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        // 从scheduledTask中定时任务塞进taskQueue
        fetchedAll = fetchFromScheduledTaskQueue();
        if (runAllTasksFrom(taskQueue)) {
            // 至少运行一次标记
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    // 执行tailTaskQueue里面的任务
    afterRunningAllTasks();
    return ranAtLeastOne;
}
           

代码很简单,只要队列中有任务,那么就一直循环将任务跑完。需要注意的是

fetchFromScheduledTaskQueue

方法,该方法是以当前时间为标准,将

scheduledTaskQueue

队列里执行时间到达此时间的任务取出放在

taskQueue

里面:

private boolean fetchFromScheduledTaskQueue() {
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return true;
    }
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    for (;;) {
        // 将执行时间到达的定时任务取出
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        if (scheduledTask == null) {
            return true;
        }
        // 添加到taskQueue失败,重新加入scheduledTaskQueue
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}
           

runAllTasks(long timeoutNanos)

方法和

runAllTasks

方法的逻辑并无太大区别,只是多了一个时间检测:

// 每执行完64次任务检测一次
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
    lastExecutionTime = ScheduledFutureTask.nanoTime();
    if (lastExecutionTime >= deadline) {
        break;
    }
}
           

从这个检测来说,

Netty

在执行非IO任务给的限定时间并不完全精确,因为有可能业务逻辑导致一次任务运行时间较长,而检测是每运行64个任务检测一次。也正是由于这个原因,

Netty

中的自定义handler不应该做耗时较长的任务。

2.2.3.5 IO任务执行

IO任务就是对

SocketChannel

感兴趣的几个事件进行处理,而

Netty

在这个基础上做了一些优化:

private void processSelectedKeys() {
    // selectedKeys是一个数组,在openSelector时已经关联到selector,调用反射将值放在里面
    if (selectedKeys != null) {
        // 数组遍历,比原生Set集合迭代要快2%左右
        processSelectedKeysOptimized();
    } else {
        // Set集合遍历keys
        processSelectedKeysPlain(selector.selectedKeys());
    }
}
           

selectedKeys

不等于null的时候,采取

processSelectedKeysOptimized

方法,等于则采取

processSelectedKeysPlain

方法。事实上这两者的处理逻辑都差不多一样,其内部都是调用的是

processSelectedKey

方法来处理

SocketChannel

感兴趣的事件。区别在于前者在遍历

selectedKeys

时采取的是数组遍历,后者依旧是NIO的默认

Set

集合遍历(Iterator),据网上统一的说法前者要比后者快2%左右的性能。两者逻辑差不多,就只分析前者了:

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // 方便GC
        selectedKeys.keys[i] = null;
        // 这里一般取出的是ServerSocketChannel/SocketChannel
        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 一般不走这一步,除非使用者主动注册NioTask到Selector
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        // needsToSelectAgain在run方法里面是false,暂不清楚这一步意义在哪
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}
           

从源码可以看出,

processSelectedKeysOptimized

直接进行数组遍历将

SelectedKeys

中就绪的事件一个个的处理,需要注意的是

attachment()

方法。这个方法是一个取值的操作,从

SelectionKey

将值取出来,一般没有特殊情况这里就是

ServerSocketChannel

SocketChannel

。而这个值的设置则是在

channel

在注册感兴趣的事件时将自身设置里进去,这个后面会再讲到。来继续

processSelectedKey

方法:

rivate void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 检验
    if (!k.isValid()) {
    // ....
    }
    try {
        int readyOps = k.readyOps();
        // 对connect事件处理,处理逻辑为取消channel对OP_CONNECt的关注
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
        // 处理OP_WRITE
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        // 处理OP_READ or OP_ACCEPT
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}
           

AbstractNioChannel.NioUnsafe

类是一个特殊的类,一般用在处理IO最终任务的请求上面,比如注册事件、进行连接等,它有多个实现类,分别对应读取数据、处理注册等功能,目前的话只需要知道它能处理对应的事件就行了,后面会详细的讲解。

另外需要注意的是注册和取消对某一个事件关注的操作,是通过操作一个值来实现的。这点如果熟悉NIO的同学应该非常清楚,就比如上面源码在处理

OP_CONNECT

事件时取消此channel对

OP_CONNECT

的关注。

NIO中一共分为四个事件,每个事件代表着一个数字值:

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
           

Selector

中,为了精确表达语义又尽可能减少空间的使用,采用了一个整数值来表示某一个channel关注的所有事件。如果一个channel只关注

OP_READ

OP_WRITE

事件,那么将两者的值进行或操作得到一个新值(兴趣集)设进

Selector

里,而

Selector

只需要将兴趣集和某个OP事件的值进行与操作,判断其是否等于0就知道这个channel有没有对该事件关注了。而上述对就绪集里面的事件处理采取了一样的策略。

00001(OP_READ) | 00010(OP_WRITE) = 00011 & 00001 > 0 ? 对READ感兴趣 : 对READ不感兴趣
           

而如果取消对某一个事件的关注呢?也很简单,将兴趣集与该事件值的反码进行与操作得到一个新的兴趣集,在再设进Selector里就好了:

00001(OP_READ) | 00010(OP_WRITE) = [00011 & ~00001 = 00010](取消对READ的关注)
           

三、Server初始化与启动

Server初始化与注册是在Server调用

bind

方法来绑定某一个端口并开启服务,追踪其源码,可以看到最终调用了

doBind

方法,其源码如下:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化与注册channel到Selector
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    // 完成了直接调用
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 没有完成则将doBind0封装成一个promise,等到后面完成了再调用
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                    // ......
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}
           

initAndRegister

方法和

doBind0

方法是

doBind

做的两件事情,一个对应初始化和注册channel到

Selector

上,一个负责对应绑定注册channel关注的事件,后文会分为这两大模块进行分析。里面需要注意的是,

doBind0

的调用是被封装成一个

promise

传到后续逻辑中,等待初始化成功后再调用的。

3.1 初始化与注册

首先看

initAndRegister

方法,这个方法负责的任务是创建

ServerSocketChannel

并初始化、初始化

SocketChannel

、创建handler链、初始化外部参数以及将

ServerSockerChannel

注册到

Selector

上面。看下源码:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 反射+工厂创建对应的ServerSocketChannel
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        //.....
    }
    ChannelFuture regFuture = config().group().register(channel);
    //.....
    return regFuture;
}
           

3.1.1 如何创建ServerSocketChannel?

首先调用

channelFactory

来创建一个channel,这个channel类型是由外部决定的,也就是

ServerBootstrap的channel(Class<? extends C> channelClass)

方法:

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}
           

从类名看出,创建一个创建channel的反射工厂类,也就是

initAndRegister()

方法调用的

channelFactory

。而现在是

Server

端的启动,所以这里传入的是

NioServerSocketChannel

类,也就是这里的channel是

NioServerSocketChannel

。在这里也可以得出结论,

ServerSockerChannel

是由反射+工厂类来创建的。

这里需要注意,此处的是Netty自身的channel类,在创建该类的时候会调用

SelectorProvider

创建NIO的

ServerSocketChannel

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
           

3.1.2 初始化ServerSocketChannel

创建完

ServerSocketChannel

后,后面继续调用

init

方法,

init

方法是一个抽象类,有

ServerBootStrap

BootStrap

两个实现,目前我们只分析前者:

void init(Channel channel) {
// 初始化ServerSocketChannel option和其它的参数
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());

// 初始化并获取pipeline
ChannelPipeline p = channel.pipeline();

// SocketChannel的相关属性
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
// ChannelInitializer是一个特殊的handler,使用一次后会被移除
p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                // 该handler是为初始化SocketChannel做准备
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});
}
           

首先设置

ServerSockerChannel

的option和其它的参数,然后通过

ChannelInitializer

类来设置handler。这也是

Netty

中常用的写法了,在

Server

的启动程序里就有用

ChannelInitializer

类了。

ChannelInitializer

类是一个handler,这个handler有点特殊,它在使用完一次后就会被移除。

出于这样的特性,它就很适合为Channel设置handler,毕竟没人会想每次数据通过handler链的时候都设置一遍handler。接着又在pipeline里加了

ServerBootstrapAcceptor

类,这个类就是初始化

SocketChannel

对应连接的存在,后文在分析。可以得出结论,初始化

ServerSocketChannel

是设置了其option、handler以及其他的参数,特殊的是初始化

SockerChannel

会以一个handler的方式加入到

ServerSocketChannel

的handler链中。

3.1.3 初始化客户端连接(SocketChannel)

上述的handler有两种,一种是从

ServerBootStrap

的handler方法传进来的handler以及

ServerBootStrap

childHandler

方法传来的childHandler。前者应用于

ServerSockerChannel

后者应用

SocketChannel

。特别注意的是

ServerSockerChannel

还有个handler,也就是

ServerBootstrapAcceptor

类。

ServerBootstrapAcceptor

类的构造器传入了

SocketChannel

相关的属性,先看下源码:

// SocketChannel绑定EventLoop,设定配置、handler等
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        // next 一个EventLoop绑定
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
}
           

ServerBootstrapAcceptor

类继承了

ChannelInboundHandlerAdapter

类,表明它是一个入栈的handler。在连接请求来领时,它就和

ServerSockerChannel

一样设置对应的handler、option等,这也表明

ServerBootstrapAcceptor

类是负责接收客户端连接,并初始化的handler类。需要注意的是

childGroup.register(child)

方法,该方法是将channel与EventLoopGroup里某一个EventLoop所绑定,在讲解ServerSockerChannel与

EventLoop

里会详细提到。

3.1.4 Channel如何绑定EventLoop?

上面的步骤全部都执行完了以后,就来到了

initAndRegister

方法的最后一步:

ChannelFuture regFuture = config().group().register(channel);
           

这一步是将channel注册到EventLoopGoup里,也就是将channel和EventLoop所绑定。进入

register

方法,可以查看到

NioEventLoopGroup

的父类

MultithreadEventLoopGroup

的调用:

// 选择一个EventLoop
public EventLoop next() {
    return (EventLoop) super.next();
}

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
           

调用next方法选择一个EventLoop,如果读者对上文有映象的话,创建

EventLoopGroup

的时候传入过一个

ChooserFactory

,该工厂就是来创建一个进行选择EventLoop策略的工厂。在

Netty

中有两种实现:

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicLong idx = new AtomicLong();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }
}
           

PowerOfTwoEventExecutorChooser

选择器提供一种与操作来选择其中一个EventLoop,这也是常用的一种操作了。假如m是2的n次方,令n&(m - 1)则会得到一个小于m的一个值,且随着n递增,得到的值会一直在[0, (m-1)]范围内循环。在JDK的

HashMap

中同样也是这种方法实现,但是这种策略需要EventLoop数量是2的n次方的值。它的策略示例如下:

size = 4 = 0100
1: 0001 & (0100 - 1) = 0001 = 1
3: 0011 & (0100 - 1) = 0011 = 3
5: 0101 & (0100 - 1) = 0001 = 1
6: 0110 & (0100 - 1) = 0010 = 2
           

GenericEventExecutorChooser

的选择策略就是普通的递增取模。这种策略要比上面采取与操作效率要低,从这里也可以看出Netty是一个很注重效率的框架。这两种策略的择取取决于指定传入EventLoopGroup内的EventLoop数量是不是2的n次方:

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}
           

3.1.5 Channel如何注册到Selector?

从EventLoopGroup中选择了一个EventLoop后,接着调用了它的register方法,在EventLoop的register方法内调用了

Unsafe

类的register方法,上文提到unsafe类就是Netty中专门处理IO任务的类:

private void register0(ChannelPromise promise) {
    try {
        // ...
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();
        // 设置promise success, 注册ServerSocketChannel那么这里就是调用doBind0
        safeSetSuccess(promise);
        // 通知注册事件
        pipeline.fireChannelRegistered();
        // 注册ServerSocketChannel在这步没有激活,不会往下走
        if (isActive()) {
         //....
        }
    } catch (Throwable t) {
       //....
    }
}
           

首先直接

doRegister

进行将ServerSocketChannel进行注册到Selector里面,然后进行广播通知该channel已被注册。

safeSetSuccess(promise)

这一步是将传进来的promise(参考本文第三节的开头)设为success,读者如果对上文有映象的话,该promise设为success调用的就是doBind0方法。来看最后一步注册:

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            //....
        }
    }
}
           

这里需要注意的是两点。第一点ServerSocketChannel在这里注册到Selector上时并没有注册它感兴趣的事件,只是注册为0,也就是说关于ServerSocketChannel的兴趣集是在初始化ServerSocketChannel后面再设置的。第二点就是此处将ServerSocketChannel作为一个参数注册上去,这也就是在上文叙述到IO任务执行这一小节时,channel的值可以在SelectionKey就续集中用

attachment

方法拿出来的原因。

3.2. 绑定端口并注册事件

上文提到

safeSetSuccess(promise)

最终调用了是doBind0方法。在Netty中doBind0方法是绑定Server端一个端口和本地的IP来为客户端提供服务,而也就是这一步会将ServerSocketChannel关注的OP_ACCEPT注册到兴趣集里。

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
           

将channel的bind方法作为非IO任务提交到事件循环里异步执行:

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
           

可以看到是调用了pipeline的bind方法,我们知道pipeline可以认作是对应channel的handler链,这一步其实就是将该channel所对应要执行的handler链上所有的handler的bind方法都调用一遍。需要注意的是在Netty的pipeline上有着HeadContext和TailContext的handler节点,表示开始和结束handler,如果读者启动Netty的例子EchoServer服务,DEBUG在这里,就会得到Server端的pipeline handler调用链是下面的构造:

Netty4.1源码分析—— 服务端启动过程一、引言与结论二、NioEventLoopGroup、NioEventLoop三、Server初始化与启动四、总结

在初始化的时候新加入的ServerBootStrapAcceptor handler类也会在这条链路上,而Netty对pipeline的handler链处理采用的是双向链表来表示的:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    //......
}

public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    //......
}
           

而在doBind时,Netty的处理是从TailContext往HeadContext处理的,最重要的注册OP_ACCEPT事件则发生在HeadContext的bind方法里:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    //....
    // 判断是否channel激活,这一步尚未激活
    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
       //...
    }
    // 到这一步ServerSocketChannel激活
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}
           

一个channel是否被激活的条件是该channel有没有连接到它的远程节点,也就是是否可以接受和发送数据。在未doBind前ServerSocketChannel肯定是没有被激活的,先进入doBind方法:

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
           

从源码可见,Netty对不同的JDK版本做了兼容的处理,也就是在这一步ServerSocketChannel绑定了一个ip和端口组成的节点。再回到bind方法,当ServerSocketChannel绑定成功激活后,pileline会把ServerSocketChannel激活的消息传递出去,也就是pipeline会从TailContext到HeadContext走一遍调用channelActive方法,它最终调用的是HeadContext的channelActive方法:

public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
    // 注册读事件,读事件包括创建连接/读取数据
    readIfIsAutoRead();
}
           

在HeadContext的channelActive方法内,不仅将channel激活的消息广播出去,还调用了readIfIsAutoRead方法,该方法最终也会从TailContext到HeadContext走一遍,只不过这次调用的是read方法,而且最终也还是来到了HeadContext的read方法:

public void read(ChannelHandlerContext ctx) {
    // 实际上是注册 READ/ACCEPT事件
    unsafe.beginRead();
}

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // OP_ACCEPT = 1 << 4 = 16
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
           

最终会调用抽象类AbstractNioChannel的doBeginRead方法将OP_ACCEPT事件注册进ServerSockerChannel的兴趣集,在注册前还会判断一下是否被注册过了,等于0则表示channel没有被注册过(0值时初始化的时候注册上去的),而readInterestOp的值在创建NioServerSockerChannel时就已经被传进:

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
           

四、总结

  1. 在创建NioEventLoopGroup时创建调用NIO实现来创建一个SelectorProvider,此处Netty为不同的平台做了兼容处理。
  2. 在创建NIoEventLoop的时候调用SelectorProvider.openSelector()方法创建一个Selector。
  3. 采用工厂+反射的实现创建NioServerSocketChannel,在创建NioServerSocketChannel时传入其关注的OP_ACCEPT事件,并调用selector.openServerSocketChannel()创建一个NIO的ServerSocketChannel。
  4. 初始化NioServerSocketChannel时,将初始化客户端连接(NioSocketChannel)作为handler加入到NioServerSocketChannel对应的pipeline中。
  5. 初始化的时候调用NIO的register方法将ServerSocketChannel注册进Selector,注意此处设置ServerSocketChannel的感兴趣事件为0。
  6. 绑定节点是最终通过pipeline中的HeadContext handler类完成的,也通过HeadContext的read方法将OP_ACCEPT事件注册进ServerSocketChannel的兴趣集。