天天看点

Netty源码分析——ServerBootstrap

基于Netty源代码版本:netty-all-4.1.33.Final

前言

BootStrap在netty的应用程序中负责引导服务器和客户端。netty包含了两种不同类型的引导:

  • 1、使用服务器的ServerBootStrap,用于接受客户端的连接以及为已接受的连接创建子通道。
  • 2、用于客户端的BootStrap,不接受新的连接,并且是在父通道类完成一些操作。

Netty服务端示例:

public class MyServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); 
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap(); 
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .handler(new SimpleServerHandler())
                    .childHandler(new MyServerInitializer())
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded");
    }
}

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        pipeline.addLast(new LengthFieldPrepender(4));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new MyServerHandler());
    }
}
           

上面这段代码展示了服务端的一个基本步骤:

  • (1)、 初始化用于Acceptor的主"线程池"以及用于I/O工作的从"线程池";
  • (2)、 初始化ServerBootstrap实例, 此实例是netty服务端应用开发的入口,也是本篇介绍的重点, 下面我们会深入分析;
  • (3)、 通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池";
  • (4)、 指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel;
  • (5)、 设置ServerSocketChannel的处理器(此处不详述,后面的系列会进行深入分析)
  • (6)、 设置子通道也就是SocketChannel的处理器, 其内部是实际业务开发的"主战场"(此处不详述,后面的系列会进行深入分析)
  • (7)、 配置ServerSocketChannel的选项
  • (8)、 配置子通道也就是SocketChannel的选项
  • (9)、 绑定并侦听某个端口

Netty客户端示例:

public class MyClient {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.SO_KEEPALIVE, true);
            .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }

    }
}
           

客户端的开发步骤和服务端都差不多:

  • (1)、 初始化用于连接及I/O工作的"线程池";
  • (2)、 初始化Bootstrap实例, 此实例是netty客户端应用开发的入口,也是本篇介绍的重点, 下面我们会深入分析;
  • (3)、 通过Bootstrap的group方法,设置(1)中初始化的"线程池";
  • (4)、 指定通道channel的类型,由于是客户端,故而是NioSocketChannel;
  • (5)、 设置SocketChannel的选项(此处不详述,后面的系列会进行深入分析);
  • (6)、 设置SocketChannel的处理器, 其内部是实际业务开发的"主战场"(此处不详述,后面的系列会进行深入分析);
  • (7)、 连接指定的服务地址;

通过对上面服务端及客户端代码分析,Bootstrap是Netty应用开发的入口,如果想要理解Netty内部的实现细节,那么有必要先了解一下Bootstrap内部的实现机制。

首先我们先看一下ServerBootstrap及Bootstrap的类继承结构图:

Netty源码分析——ServerBootstrap

通过类图我们知道AbstractBootstrap类是ServerBootstrap及Bootstrap的基类

1、 首先看看服务端的serverBootstrap.group(bossGroup, workerGroup):

调用ServerBootstrap的group方法,设置react模式的主线程池 以及 IO 操作线程池,ServerBootstrap中的group代码如下:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
	super.group(parentGroup);
	if (childGroup == null) {
		throw new NullPointerException("childGroup");
	}
	if (this.childGroup != null) {
		throw new IllegalStateException("childGroup set already");
	}
	this.childGroup = childGroup;
	return this;
}
           

在group方法中,会继续调用父类的group方法,而通过类继承图我们知道,super.group(parentGroup)其实调用的就是AbstractBootstrap的group方法。AbstractBootstrap中group代码如下:

public B group(EventLoopGroup group) {
	if (group == null) {
		throw new NullPointerException("group");
	}
	if (this.group != null) {
		throw new IllegalStateException("group set already");
	}
	this.group = group;
	return self();
}

@SuppressWarnings("unchecked")
private B self() {
	return (B) this;
}
           

通过以上分析,我们知道了AbstractBootstrap中定义了主线程池group的引用,而子线程池childGroup的引用是定义在ServerBootstrap中。

即将workerGroup保存在 ServerBootstrap对象的childGroup属性上。 bossGroup保存在ServerBootstrap对象的group属性上。

当我们查看客户端Bootstrap的group方法时,我们发现,其是直接调用的父类AbstractBoostrap的group方法。

2、示例代码中的 channel()方法

无论是服务端还是客户端,channel调用的都是基类的channel方法,其实现细节如下:

public B channel(Class<? extends C> channelClass) {
	if (channelClass == null) {
		throw new NullPointerException("channelClass");
	}
	return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

public ReflectiveChannelFactory(Class<? extends T> clazz) {
	ObjectUtil.checkNotNull(clazz, "clazz");
	try {
		this.constructor = clazz.getConstructor();
	} catch (NoSuchMethodException e) {
		throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
				" does not have a public non-arg constructor", e);
	}
}

public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
	return channelFactory((ChannelFactory<C>) channelFactory);
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
	if (channelFactory == null) {
		throw new NullPointerException("channelFactory");
	}
	if (this.channelFactory != null) {
		throw new IllegalStateException("channelFactory set already");
	}

	this.channelFactory = channelFactory;
	return self();
}
           

我们发现,其实channel方法内部,只是初始化了一个用于生产指定channel类型的工厂实例。

函数功能:设置父类属性channelFactory 为: ReflectiveChannelFactory类的对象。其中这里ReflectiveChannelFactory对象中包括一个clazz属性为:NioServerSocketChannel.class。

/**
 * 接受新连接的基于NIO选择器的实现
 */
public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
	......
}
           

并且ReflectiveChannelFactory中提供 newChannel()方法,我们可以看到 clazz.newInstance(),主要是通过反射来实例化NioServerSocketChannel.class

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
                '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
    }
}
           

ServerBootstrap.childHandler(new MyServerInitializer())

/**
 * 设置ChannelHandler,用于为Channel的请求提供服务。
 */
public ServerBootstrap childHandler(ChannelHandler childHandler) {
	if (childHandler == null) {
		throw new NullPointerException("childHandler");
	}
	this.childHandler = childHandler;
	return this;
}
           

ServerBootstrap.handler(new SimpleServerHandler())

final ChannelHandler handler() {
	return handler;
}
           

bootstrap.handler(new MyClientInitializer())

/**
 * 用于服务请求的ChannelHandler
 */
public B handler(ChannelHandler handler) {
	if (handler == null) {
		throw new NullPointerException("handler");
	}
	this.handler = handler;
	return self();
}
           

ServerBootstrap的option

这里调用的是父类的AbstractBootstrap的option()方法,源码如下:

public <T> B option(ChannelOption<T> option, T value) {
	if (option == null) {
		throw new NullPointerException("option");
	}
	if (value == null) {
		synchronized (options) {
			options.remove(option);
		}
	} else {
		synchronized (options) {
			options.put(option, value);
		}
	}
	return self();
}
           

其中最重要的一行代码就是:

options.put(option, value);

这里用到了options这个参数,在AbstractBootstrap的定义如下:

private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

可知是私有变量,而且是一个Map集合。这个变量主要是设置TCP连接中的一些可选项,而且这些属性是作用于每一个连接到服务器被创建的channel。

ServerBootstrap的childOption

这里调用的是父类的ServerBootstrap的childOption()方法,源码如下:

public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
	if (childOption == null) {
		throw new NullPointerException("childOption");
	}
	if (value == null) {
		synchronized (childOptions) {
			childOptions.remove(childOption);
		}
	} else {
		synchronized (childOptions) {
			childOptions.put(childOption, value);
		}
	}
	return this;
}
           

这个函数功能与option()函数几乎一样,唯一的区别是该属性设定只作用于被acceptor(也就是boss EventLoopGroup)接收之后的channel。

总结:

  • 1、group:workerGroup保存在 ServerBootstrap对象的childGroup属性上。 bossGroup保存在ServerBootstrap对象的group属性上
  • 2、channelFactory:BootstrapChannelFactory类的对象(clazz属性为:NioServerSocketChannel.class)
  • 3、handler:SimpleServerHandler
  • 4、childHandler
  • 5、option
  • 6、childOption

继续阅读