天天看点

netty启动流程源码1

public static void main(String[] args) throws InterruptedException {
    //就是一个死循环,不停地检测IO事件,处理IO事件,执行任务
    //创建一个线程组:接受客户端连接   主线程
    EventLoopGroup bossGroup=new NioEventLoopGroup(1);//cpu核心数*2
    //创建一个线程组:接受网络操作   工作线程
    EventLoopGroup workerGroup=new NioEventLoopGroup();  //cpu核心数*2

    //是服务端的一个启动辅助类,通过给他设置一系列参数来绑定端口启动服务
    ServerBootstrap serverBootstrap=new ServerBootstrap();

    // 我们需要两种类型的人干活,一个是老板,一个是工人,老板负责从外面接活,
    // 接到的活分配给工人干,放到这里,bossGroup的作用就是不断地accept到新的连接,将新的连接丢给workerGroup来处理
    serverBootstrap.group(bossGroup,workerGroup)
    //设置使用NioServerSocketChannel作为服务器通道的实现
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG,128) //设置线程队列中等待连接的个数
    .childOption(ChannelOption.SO_KEEPALIVE,true)//保持活动连接状态
    //表示服务器启动过程中,需要经过哪些流程,这里NettyTestHendler最终的顶层接口为ChannelHander,
    // 是netty的一大核心概念,表示数据流经过的处理器
    .handler(new NettyTestHendler())
    //表示一条新的连接进来之后,该怎么处理,也就是上面所说的,老板如何给工人配活
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
            nioSocketChannel.pipeline().addLast(new StringDecoder(),new NettyServerHendler());
        }
    });
    System.out.println(".........server  init..........");
    // 这里就是真正的启动过程了,绑定9090端口,等待服务器启动完毕,才会进入下行代码
    ChannelFuture future = serverBootstrap.bind(9090).sync();
    System.out.println(".........server start..........");
    //等待服务端关闭socket
    future.channel().closeFuture().sync();

    // 关闭两组死循环
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}      

如上当我们创建一个:

EventLoopGroup bossGroup=new NioEventLoopGroup(1);

跟踪源码发现如下:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    //nThreads如果不传默认是0  如果是0的话  就获取CPU核数的两倍  DEFAULT_EVENT_LOOP_THREADS==CPU核数的两倍
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}      

如注释里写的:nThreads如果不传默认是0 如果是0的话 就获取CPU核数的两倍 DEFAULT_EVENT_LOOP_THREADS==CPU核数的两倍;这里这个值为1

继续跟踪到:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        //newDefaultThreadFactory()=线程工厂  专门创建线程的
        //newDefaultThreadFactory()调用的是 MultithreadEventLoopGroup.newDefaultThreadFactory()
        //最终创建的是DefaultThreadFactory,他实现了继承自jdk的ThreadFactory
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    //nThreads如果不传默认是0  如果是0的话  就获取CPU核数的两倍  DEFAULT_EVENT_LOOP_THREADS==CPU核数的两倍
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        //出现异常标识
        boolean success = false;
        try {
            //创建nThreads个nioEventLoop保存到children数组中
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            //出现异常处理
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
      

可以看出创建了一个数组 children = new EventExecutor[nThreads];

然后for循环赋值:这样

MultithreadEventExecutorGroup这个类的属性private final EventExecutor[] children 里面的值就是EventExecutor的实现类      

继续跟踪发现存的是NioEventLoop这个对象;且调用的构造方法如下:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    //NioEventLoopGroup.this
    //executor=new ThreadPerTaskExecutor(newDefaultThreadFactory());
    //SelectorProvider     ServerSocketChannel就是通过ServerSocketChannel.open()==》         SelectorProvider.provider().openServerSocketChannel()创建的
    //strategy=DefaultSelectStrategyFactory.INSTANCE===》new DefaultSelectStrategyFactory()
    //rejectedExecutionHandler=RejectedExecutionHandlers.reject() ===》 new RejectedExecutionHandler()
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    //provider=SelectorProvider.provider()
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    //替换了数据结构selectedKeys   publicSelectedKeys的原生selector
    selector = selectorTuple.selector;
    //子类包装的selector  底层数据结构也是被替换了的
    unwrappedSelector = selectorTuple.unwrappedSelector;
    //selectStrategy=new DefaultSelectStrategyFactory()
    selectStrategy = strategy;
}      

这里主要做的功能是netty做了优化:

把原生jdk中selector类中的selectedKeys   publicSelectedKeys  的属性类型从set集合变成用数组替代      

总结:在创建NioEventLoopGroup的时候主要的是根据参数去创建多少个selector