天天看点

解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下

作者:linux上的码农

接上文解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-上

Netty对JDK NIO 原生Selector的优化

首先在NioEventLoop中有一个Selector优化开关DISABLE_KEY_SET_OPTIMIZATION,通过系统变量-D io.netty.noKeySetOptimization指定,默认是开启的,表示需要对JDK NIO原生Selector进行优化。

public final class NioEventLoop extends SingleThreadEventLoop {
   //Selector优化开关 默认开启 为了遍历的效率 会对Selector中的SelectedKeys进行数据结构优化
    private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
}
           

如果优化开关DISABLE_KEY_SET_OPTIMIZATION是关闭的,那么直接返回JDK NIO原生的Selector。

private SelectorTuple openSelector() {
        ..........SelectorProvider创建JDK NIO  原生Selector..............

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            //JDK NIO原生Selector ,Selector优化开关 默认开启需要对Selector进行优化
            return new SelectorTuple(unwrappedSelector);
        }
}
           

下面为Netty对JDK NIO原生的Selector的优化过程:

  1. 获取JDK NIO原生Selector的抽象实现类sun.nio.ch.SelectorImpl。JDK NIO原生Selector的实现均继承于该抽象类。用于判断由SelectorProvider创建出来的Selector是否为JDK默认实现(SelectorProvider第三种加载方式)。因为SelectorProvider可以是自定义加载,所以它创建出来的Selector并不一定是JDK NIO 原生的。
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });
           

JDK NIO Selector的抽象类sun.nio.ch.SelectorImpl

public abstract class SelectorImpl extends AbstractSelector {

    // The set of keys with data ready for an operation
    // //IO就绪的SelectionKey(里面包裹着channel)
    protected Set<SelectionKey> selectedKeys;

    // The set of keys registered with this Selector
    //注册在该Selector上的所有SelectionKey(里面包裹着channel)
    protected HashSet<SelectionKey> keys;

    // Public views of the key sets
    //用于向调用线程返回的keys,不可变
    private Set<SelectionKey> publicKeys;             // Immutable
    //当有IO就绪的SelectionKey时,向调用线程返回。只可删除其中元素,不可增加
    private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition

    protected SelectorImpl(SelectorProvider sp) {
        super(sp);
        keys = new HashSet<SelectionKey>();
        selectedKeys = new HashSet<SelectionKey>();
        if (Util.atBugLevel("1.4")) {
            publicKeys = keys;
            publicSelectedKeys = selectedKeys;
        } else {
            //不可变
            publicKeys = Collections.unmodifiableSet(keys);
            //只可删除其中元素,不可增加
            publicSelectedKeys = Util.ungrowableSet(selectedKeys);
        }
    }
}
           

这里笔者来简单介绍下JDK NIO中的Selector中这几个字段的含义,我们可以和上篇文章讲到的epoll在内核中的结构做类比,方便大家后续的理解:

解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下
  • Set<SelectionKey> selectedKeys 类似于我们上篇文章讲解Epoll时提到的就绪队列eventpoll->rdllist,Selector这里大家可以理解为Epoll。Selector会将自己监听到的IO就绪的Channel放到selectedKeys中。
这里的SelectionKey暂且可以理解为Channel在Selector中的表示,类比上图中epitem结构里的epoll_event,封装IO就绪Socket的信息。其实SelectionKey里包含的信息不止是Channel还有很多IO相关的信息。后面我们在详细介绍。
  • HashSet<SelectionKey> keys:这里存放的是所有注册到该Selector上的Channel。类比epoll中的红黑树结构rb_root
SelectionKey在Channel注册到Selector中后生成。
  • Set<SelectionKey> publicSelectedKeys 相当于是selectedKeys的视图,用于向外部线程返回IO就绪的SelectionKey。这个集合在外部线程中只能做删除操作不可增加元素,并且不是线程安全的。
  • Set<SelectionKey> publicKeys相当于keys的不可变视图,用于向外部线程返回所有注册在该Selector上的SelectionKey

这里需要重点关注抽象类sun.nio.ch.SelectorImpl中的selectedKeys和publicSelectedKeys这两个字段,注意它们的类型都是HashSet,一会优化的就是这里!!!!

  1. 判断由SelectorProvider创建出来的Selector是否是JDK NIO原生的Selector实现。因为Netty优化针对的是JDK NIO 原生Selector。判断标准为sun.nio.ch.SelectorImpl类是否为SelectorProvider创建出Selector的父类。如果不是则直接返回。不在继续下面的优化过程。
//判断是否可以对Selector进行优化,这里主要针对JDK NIO原生Selector的实现类进行优化,因为SelectorProvider可以加载的是自定义Selector实现
        //如果SelectorProvider创建的Selector不是JDK原生sun.nio.ch.SelectorImpl的实现类,那么无法进行优化,直接返回
        if (!(maybeSelectorImplClass instanceof Class) ||
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }
           

通过前面对SelectorProvider的介绍我们知道,这里通过provider.openSelector()创建出来的Selector实现类为KQueueSelectorImpl类,它继承实现了sun.nio.ch.SelectorImpl,所以它是JDK NIO 原生的Selector实现

class KQueueSelectorImpl extends SelectorImpl {

}
           
  1. 创建SelectedSelectionKeySet通过反射替换掉sun.nio.ch.SelectorImpl类中selectedKeys和publicSelectedKeys的默认HashSet实现。

为什么要用SelectedSelectionKeySet替换掉原来的HashSet呢??

因为这里涉及到对HashSet类型的sun.nio.ch.SelectorImpl#selectedKeys集合的两种操作:

  • 插入操作: 通过前边对sun.nio.ch.SelectorImpl类中字段的介绍我们知道,在Selector监听到IO就绪的SelectionKey后,会将IO就绪的SelectionKey插入sun.nio.ch.SelectorImpl#selectedKeys集合中,这时Reactor线程会从java.nio.channels.Selector#select(long)阻塞调用中返回(类似上篇文章提到的epoll_wait)。
  • 遍历操作:Reactor线程返回后,会从Selector中获取IO就绪的SelectionKey集合(也就是sun.nio.ch.SelectorImpl#selectedKeys),Reactor线程遍历selectedKeys,获取IO就绪的SocketChannel,并处理SocketChannel上的IO事件。

我们都知道HashSet底层数据结构是一个哈希表,由于Hash冲突这种情况的存在,所以导致对哈希表进行插入和遍历操作的性能不如对数组进行插入和遍历操作的性能好。

还有一个重要原因是,数组可以利用CPU缓存的优势来提高遍历的效率。后面笔者会有一篇专门的文章来讲述利用CPU缓存行如何为我们带来性能优势。

所以Netty为了优化对sun.nio.ch.SelectorImpl#selectedKeys集合的插入,遍历性能,自己用数组这种数据结构实现了SelectedSelectionKeySet,用它来替换原来的HashSet实现。

更多linux内核视频教程文档资料免费领取后台私信【内核】自行获取.

解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下
解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下

SelectedSelectionKeySet

  • 初始化SelectionKey[] keys数组大小为1024,当数组容量不够时,扩容为原来的两倍大小。
  • 通过数组尾部指针size,在向数组插入元素的时候可以直接定位到插入位置keys[size++]。操作一步到位,不用像哈希表那样还需要解决Hash冲突。
  • 对数组的遍历操作也是如丝般顺滑,CPU直接可以在缓存行中遍历读取数组元素无需访问内存。比HashSet的迭代器java.util.HashMap.KeyIterator 遍历方式性能不知高到哪里去了。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    //采用数组替换到JDK中的HashSet,这样add操作和遍历操作效率更高,不需要考虑hash冲突
    SelectionKey[] keys;
    //数组尾部指针
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    /**
     * 数组的添加效率高于 HashSet 因为不需要考虑hash冲突
     * */
    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }
        //时间复杂度O(1)
        keys[size++] = o;
        if (size == keys.length) {
            //扩容为原来的两倍大小
            increaseCapacity();
        }

        return true;
    }

    private void increaseCapacity() {
        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        System.arraycopy(keys, 0, newKeys, 0, size);
        keys = newKeys;
    }

    /**
     * 采用数组的遍历效率 高于 HashSet
     * */
    @Override
    public Iterator<SelectionKey> iterator() {
        return new Iterator<SelectionKey>() {
            private int idx;

            @Override
            public boolean hasNext() {
                return idx < size;
            }

            @Override
            public SelectionKey next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                return keys[idx++];
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
           
看到这里不禁感叹,从各种小的细节可以看出Netty对性能的优化简直淋漓尽致,对性能的追求令人发指。细节真的是魔鬼。
  1. Netty通过反射的方式用SelectedSelectionKeySet替换掉sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys这两个集合中原来HashSet的实现。
  • 反射获取sun.nio.ch.SelectorImpl类中selectedKeys和publicSelectedKeys。
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
  Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
           
  • Java9版本以上通过sun.misc.Unsafe设置字段值的方式
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {

                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        
                    }
           
  • 通过反射的方式用SelectedSelectionKeySet替换掉hashSet实现的sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys。
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
          if (cause != null) {
                return cause;
          }
          cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
          if (cause != null) {
                return cause;
          }
          //Java8反射替换字段
          selectedKeysField.set(unwrappedSelector, selectedKeySet);
          publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
           
  1. 将与sun.nio.ch.SelectorImpl类中selectedKeys和publicSelectedKeys关联好的Netty优化实现SelectedSelectionKeySet,设置到io.netty.channel.nio.NioEventLoop#selectedKeys字段中保存。
//会通过反射替换selector对象中的selectedKeySet保存就绪的selectKey
    //该字段为持有selector对象selectedKeys的引用,当IO事件就绪时,直接从这里获取
    private SelectedSelectionKeySet selectedKeys;
           
后续Reactor线程就会直接从io.netty.channel.nio.NioEventLoop#selectedKeys中获取IO就绪的SocketChannel
  1. 用SelectorTuple封装unwrappedSelector和wrappedSelector返回给NioEventLoop构造函数。到此Reactor中的Selector就创建完毕了。
return new SelectorTuple(unwrappedSelector,
                      new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
           
private static final class SelectorTuple {
        final Selector unwrappedSelector;
        final Selector selector;

        SelectorTuple(Selector unwrappedSelector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = unwrappedSelector;
        }

        SelectorTuple(Selector unwrappedSelector, Selector selector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = selector;
        }
    }
           
  • 所谓的unwrappedSelector是指被Netty优化过的JDK NIO原生Selector。
  • 所谓的wrappedSelector就是用SelectedSelectionKeySetSelector装饰类将unwrappedSelector和与sun.nio.ch.SelectorImpl类关联好的Netty优化实现SelectedSelectionKeySet封装装饰起来。

wrappedSelector会将所有对Selector的操作全部代理给unwrappedSelector,并在发起轮询IO事件的相关操作中,重置SelectedSelectionKeySet清空上一次的轮询结果。

final class SelectedSelectionKeySetSelector extends Selector {
    //Netty优化后的 SelectedKey就绪集合
    private final SelectedSelectionKeySet selectionKeys;
    //优化后的JDK NIO 原生Selector
    private final Selector delegate;

    SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
        this.delegate = delegate;
        this.selectionKeys = selectionKeys;
    }

    @Override
    public boolean isOpen() {
        return delegate.isOpen();
    }

    @Override
    public SelectorProvider provider() {
        return delegate.provider();
    }

    @Override
    public Set<SelectionKey> keys() {
        return delegate.keys();
    }

    @Override
    public Set<SelectionKey> selectedKeys() {
        return delegate.selectedKeys();
    }

    @Override
    public int selectNow() throws IOException {
        //重置SelectedKeys集合
        selectionKeys.reset();
        return delegate.selectNow();
    }

    @Override
    public int select(long timeout) throws IOException {
        //重置SelectedKeys集合
        selectionKeys.reset();
        return delegate.select(timeout);
    }

    @Override
    public int select() throws IOException {
        //重置SelectedKeys集合
        selectionKeys.reset();
        return delegate.select();
    }

    @Override
    public Selector wakeup() {
        return delegate.wakeup();
    }

    @Override
    public void close() throws IOException {
        delegate.close();
    }
}
           

到这里Reactor的核心Selector就创建好了,下面我们来看下用于保存异步任务的队列是如何创建出来的。

newTaskQueue

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        //通过用SelectedSelectionKeySet装饰后的unwrappedSelector
        this.selector = selectorTuple.selector;
        //Netty优化过的JDK NIO远程Selector
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
           

我们继续回到创建Reactor的主线上,到目前为止Reactor的核心Selector就创建好了,前边我们提到Reactor除了需要监听IO就绪事件以及处理IO就绪事件外,还需要执行一些异步任务,当外部线程向Reactor提交异步任务后,Reactor就需要一个队列来保存这些异步任务,等待Reactor线程执行。

下面我们来看下Reactor中任务队列的创建过程:

//任务队列大小,默认是无界队列
    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

    private static Queue<Runnable> newTaskQueue(
            EventLoopTaskQueueFactory queueFactory) {
        if (queueFactory == null) {
            return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
        }
        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    }

    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }  
           
  • 在NioEventLoop的父类SingleThreadEventLoop中提供了一个静态变量DEFAULT_MAX_PENDING_TASKS用来指定Reactor任务队列的大小。可以通过系统变量-D io.netty.eventLoop.maxPendingTasks进行设置,默认为Integer.MAX_VALUE,表示任务队列默认为无界队列。
  • 根据DEFAULT_MAX_PENDING_TASKS变量的设定,来决定创建无界任务队列还是有界任务队列。
//创建无界任务队列
    PlatformDependent.<Runnable>newMpscQueue()
    //创建有界任务队列
    PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)

    public static <T> Queue<T> newMpscQueue() {
        return Mpsc.newMpscQueue();
    }

    public static <T> Queue<T> newMpscQueue(final int maxCapacity) {
        return Mpsc.newMpscQueue(maxCapacity);
    }
           
Reactor内的异步任务队列的类型为MpscQueue,它是由JCTools提供的一个高性能无锁队列,从命名前缀Mpsc可以看出,它适用于多生产者单消费者的场景,它支持多个生产者线程安全的访问队列,同一时刻只允许一个消费者线程读取队列中的元素。
我们知道Netty中的Reactor可以线程安全的处理注册其上的多个SocketChannel上的IO数据,保证Reactor线程安全的核心原因正是因为这个MpscQueue,它可以支持多个业务线程在处理完业务逻辑后,线程安全的向MpscQueue添加异步写任务,然后由单个Reactor线程来执行这些写任务。既然是单线程执行,那肯定是线程安全的了。

Reactor对应的NioEventLoop类型继承结构

解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下

NioEventLoop的继承结构也是比较复杂,这里我们只关注在Reactor创建过程中涉及的到两个父类SingleThreadEventLoop,SingleThreadEventExecutor。

剩下的继承体系,我们在后边随着Netty源码的深入在慢慢介绍。

前边我们提到,其实Reactor我们可以看作是一个单线程的线程池,只有一个线程用来执行IO就绪事件的监听,IO事件的处理,异步任务的执行。用MpscQueue来存储待执行的异步任务。

命名前缀为SingleThread的父类都是对Reactor这些行为的分层定义。也是本小节要介绍的对象

SingleThreadEventLoop

Reactor负责执行的异步任务分为三类:

  • 普通任务:这是Netty最主要执行的异步任务,存放在普通任务队列taskQueue中。在NioEventLoop构造函数中创建。
  • 定时任务: 存放在优先级队列中。后续我们介绍。
  • 尾部任务: 存放于尾部任务队列tailTasks中,尾部任务一般不常用,在普通任务执行完后 Reactor线程会执行尾部任务。**使用场景:**比如对Netty 的运行状态做一些统计数据,例如任务循环的耗时、占用物理内存的大小等等都可以向尾部队列添加一个收尾任务完成统计数据的实时更新。

SingleThreadEventLoop负责对尾部任务队列tailTasks进行管理。并且提供Channel向Reactor注册的行为。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    //任务队列大小,默认是无界队列
    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
    
    //尾部任务队列
    private final Queue<Runnable> tailTasks;

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
        //尾部队列 执行一些统计任务 不常用
        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }

    @Override
    public ChannelFuture register(Channel channel) {
        //注册channel到绑定的Reactor上
        return register(new DefaultChannelPromise(channel, this));
    }
}
           

SingleThreadEventExecutor

SingleThreadEventExecutor主要负责对普通任务队列的管理,以及异步任务的执行,Reactor线程的启停。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
        //parent为Reactor所属的NioEventLoopGroup Reactor线程组
        super(parent);
        //向Reactor添加任务时,是否唤醒Selector停止轮询IO就绪事件,马上执行异步任务
        this.addTaskWakesUp = addTaskWakesUp;
        //Reactor异步任务队列的大小
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        //用于启动Reactor线程的executor -> ThreadPerTaskExecutor
        this.executor = ThreadExecutorMap.apply(executor, this);
        //普通任务队列
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        //任务队列满时的拒绝策略
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
}
           

到现在为止,一个完整的Reactor架构就被创建出来了。

解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下

Reactor结构

3. 创建Channel到Reactor的绑定策略

到这一步,Reactor线程组NioEventLoopGroup里边的所有Reactor就已经全部创建完毕。

无论是Netty服务端NioServerSocketChannel关注的OP_ACCEPT事件也好,还是Netty客户端NioSocketChannel关注的OP_READ和OP_WRITE事件也好,都需要先注册到Reactor上,Reactor才能监听Channel上关注的IO事件实现IO多路复用。

NioEventLoopGroup(Reactor线程组)里边有众多的Reactor,那么以上提到的这些Channel究竟应该注册到哪个Reactor上呢?这就需要一个绑定的策略来平均分配。

还记得我们前边介绍MultithreadEventExecutorGroup类的时候提到的构造器参数EventExecutorChooserFactory吗?

这时候它就派上用场了,它主要用来创建Channel到Reactor的绑定策略。默认为DefaultEventExecutorChooserFactory.INSTANCE。

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
   //从Reactor集合中选择一个特定的Reactor的绑定策略 用于channel注册绑定到一个固定的Reactor上
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    chooser = chooserFactory.newChooser(children);
}
           

下面我们来看下具体的绑定策略:

DefaultEventExecutorChooserFactory

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }
    ...................省略.................
}
           

我们看到在newChooser方法绑定策略有两个分支,不同之处在于需要判断Reactor线程组中的Reactor个数是否为2的次幂。

Netty中的绑定策略就是采用round-robin轮询的方式来挨个选择Reactor进行绑定。

采用round-robin的方式进行负载均衡,我们一般会用round % reactor.length取余的方式来挨个平均的定位到对应的Reactor上。

如果Reactor的个数reactor.length恰好是2的次幂,那么就可以用位操作&运算round & reactor.length -1来代替%运算round % reactor.length,因为位运算的性能更高。具体为什么&运算能够代替%运算,笔者会在后面讲述时间轮的时候为大家详细证明,这里大家只需记住这个公式,我们还是聚焦本文的主线。

了解了优化原理,我们在看代码实现就很容易理解了。

利用%运算的方式Math.abs(idx.getAndIncrement() % executors.length)来进行绑定。

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicLong idx = new AtomicLong();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
           

利用&运算的方式idx.getAndIncrement() & executors.length - 1来进行绑定。

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

           
又一次被Netty对性能的极致追求所折服~~~~

4. 向Reactor线程组中所有的Reactor注册terminated回调函数

当Reactor线程组NioEventLoopGroup中所有的Reactor已经创建完毕,Channel到Reactor的绑定策略也创建完毕后,我们就来到了创建NioEventGroup的最后一步。

俗话说的好,有创建就有启动,有启动就有关闭,这里会创建Reactor关闭的回调函数terminationListener,在Reactor关闭时回调。

terminationListener回调的逻辑很简单:

  • 通过AtomicInteger terminatedChildren变量记录已经关闭的Reactor个数,用来判断NioEventLoopGroup中的Reactor是否已经全部关闭。
  • 如果所有Reactor均已关闭,设置NioEventLoopGroup中的terminationFuture为success。表示Reactor线程组关闭成功。
//记录关闭的Reactor个数,当Reactor全部关闭后,才可以认为关闭成功
        private final AtomicInteger terminatedChildren = new AtomicInteger();
        //关闭future
        private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    //当所有Reactor关闭后 才认为是关闭成功
                    terminationFuture.setSuccess(null);
                }
            }
        };
        
        //为所有Reactor添加terminationListener
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
           

我们在回到文章开头Netty服务端代码模板

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        //创建主从Reactor线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ...........省略............
    }
}
           

现在Netty的主从Reactor线程组就已经创建完毕,此时Netty服务端的骨架已经搭建完毕,骨架如下:

解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下

主从Reactor线程组

总结

本文介绍了首先介绍了Netty对各种IO模型的支持以及如何轻松切换各种IO模型。

还花了大量的篇幅介绍Netty服务端的核心引擎主从Reactor线程组的创建过程。在这个过程中,我们还提到了Netty对各种细节进行的优化,展现了Netty对性能极致的追求。

好了,Netty服务端的骨架已经搭好,剩下的事情就该绑定端口地址然后接收连接了.

首页 - 内核技术中文网 - 构建全国最权威的内核技术交流分享论坛

转载地址:解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下 - 圈点 - 内核技术中文网 - 构建全国最权威的内核技术交流分享论坛

解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下

继续阅读