//io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
//io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
public ChannelFuture bind(SocketAddress localAddress) {
validate();//做一些验证
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
validate()做一些验证:
public B validate() {
if (group == null) { // EventLoopGroup不能为空
throw new IllegalStateException("group not set");
}
if (channelFactory == null) { //ChannelFactory不能为空
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建、初始化channel,并将其注册到Selector
final ChannelFuture regFuture = initAndRegister();
// 从异步结果中获取channel
final Channel channel = regFuture.channel();
// 获取异步操作执行过程中发生的异常直接返回
// 注意:只要有异常,说明此异步操作已经完成结束了
if (regFuture.cause() != null) {
return regFuture;
}
// 通常走到这里的时候,说明channel已经创建成功并且初始化成功了(看initAndRegister逻辑可以推断)
// 但是注册到Selector的操作是异步的,有可能还在处理也有可能已经完成了
// 判断当前异步操作是否完成(完成不代表成功,也有可能是异常)
if (regFuture.isDone()) { // 若异步操作完成
// At this point we know that the registration was complete and successful.
// 创建一个可修改的异步结果对象channelFuture
ChannelPromise promise = channel.newPromise();
// 绑定端口号,regFuture的成功与否是在这个方法里面判断的
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 若异步操作未完成
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 为异步操作添加监听器
regFuture.addListener(new ChannelFutureListener() {
// 当异步操作完成(成功,异常),就会触发该方法的执行
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 获取异步操作执行过程中发生的异常
Throwable cause = future.cause();
if (cause != null) { // 异步执行过程发生异常
// Registration on the EventLoop failed so fail
// the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the
// EventLoop of the Channel.
// 修改异步结果为:失败
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
// 注册,也是修改
promise.registered();
// 绑定端口号
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
一些需要注意的点:
如果异步操作完成,创建一个可修改的异步结果对象ChannelPromise
/**
* Special {@link ChannelFuture} which is writable.
* 可写的特殊的channelFuture
*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {...}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建一个channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(),GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 将当前channel注册给selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
...
public NioServerSocketChannel() {
// 我们Netty的channel实际上是对原生的NIO的channel的封装
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
...
// 获取到一个全局性的provider,用来创建Channel或者Selector
// nio讲解的时候介绍过
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308" target="_blank" rel="external nofollow" >#2308</a>.
*/
// 通过全局性的provider,创建一个原生的NIO的channel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 封装
super(null, channel, SelectionKey.OP_ACCEPT);
// 创建channel的配置对象
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
//将传进来的都保存到成员变量里
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
// 当client发送来连接请求时,会触发channelRead()方法的执行
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 注意,这里client发送来的就是连接当前Server的子channel
final Channel child = (Channel) msg;
// 初始化这个子channel
// 对用于处理client 读写请求的子channel设置handler,option,attr
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 将当前子channel注册到selector
// 注意这里用的就是childGroup了
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
//如果注册失败,强制关闭
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建一个channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 将当前channel注册给selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
//如果异常,已注册,则直接关闭
channel.close();
} else {
//否则强制关闭
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
// We need to cancel this key of the channel so we may not end up in a eventloop spin
// because we try to read or write until the actual close happens which may be later due
// SO_LINGER handling.
// See https://github.com/netty/netty/issues/4449
doDeregister();
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
// Ignore the error as the underlying channel may be closed in the meantime and so
// getSoLinger() may produce an exception. In this case we just return null.
// See https://github.com/netty/netty/issues/4449
}
return null;
}
}
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
//状态,是不是还从没有注册过,是的话就是第一次注册
boolean firstRegistration = neverRegistered;
// 完成注册
doRegister();
// 修改状态值
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 触发handlerAdded()方法的执行
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 触发channelRegistered()方法的执行
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// 若当前channel是激活状态,且是第一次注册,
// 则触发channelActive()的执行
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
// 执行到这里到时候Channel已经注册成功了
// 此时SelectionKey的兴趣集还是0
// beginRead方法会将SelectionKey的兴趣集
// 设置为之前NioServerSocketChannel构造中设置的值
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
//io.netty.channel.nio.AbstractNioChannel#doRegister
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 其实netty的channel的注册,本质上是原生的nio的channel的注册
// 第二个参数0是形参ops的值,表示当前对任何事件都暂不关注
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建、初始化channel,并将其注册到Selector
final ChannelFuture regFuture = initAndRegister();
// 从异步结果中获取channel
final Channel channel = regFuture.channel();
// 获取异步操作执行过程中发生的异常
if (regFuture.cause() != null) {
return regFuture;
}
// 判断当前异步操作是否完成:或者是成功,或者是异常
if (regFuture.isDone()) { // 若异步操作成功
// At this point we know that the registration was complete and successful.
// 创建一个可修改的异步结果对象channelFuture
ChannelPromise promise = channel.newPromise();
// 绑定端口号
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 若异步操作未完成
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 为异步操作添加监听器
regFuture.addListener(new ChannelFutureListener() {
// 当异步操作完成(成功,异常),就会触发该方法的执行
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 获取异步操作执行过程中发生的异常
Throwable cause = future.cause();
if (cause != null) { // 异步执行过程发生异常
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
// 修改异步结果为:失败
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 绑定端口号
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
之前我们分析过,一但注册的操作完成,就会触发doBind0进行端口号绑定:
//io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
//异步操作是成功才会触发绑定端口逻辑
//这里异步操作就是注册selector的操作
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
//io.netty.channel.DefaultChannelPipeline.HeadContext#bind
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
...
}
可以看到又调了unsafe,底层代码了:
继续跟
//io.netty.channel.AbstractChannel.AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// 若当前channel未被激活,则该方法返回false
boolean wasActive = isActive();
try {
// 绑定
// 一旦端口被绑定了,则channel就被激活了
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发pipline中的处理器中的channelActive()方法的执行
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}