我们以一个入门级别的服务端代码进行分析。netty其实就是对原生的java nio编程进行了封装,所以分析中会看到最后调用的都是java原生的API。因为整个过程代码比较多,这里会把一些不是核心流程的代码省略。
先整体看下服务端的代码。
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();//2
b.group(group,work)//3
/*指明使用NIO进行网络通讯*/
.channel(NioServerSocketChannel.class)//4
//.channel(EpollServerSocketChannel.class)
/*指明服务器监听端口*/
.localAddress(new InetSocketAddress(port))
/*接收到连接请求,新启一个socket通信,也就是channel,每个channel
* 有自己的事件的handler*/
.childOption(ChannelOption.TCP_NODELAY,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
//ch.pipeline().addLast(new EchoServerHandler2());
}
});
ChannelFuture f = b.bind().sync();//5
/*阻塞,直到channel关闭*/
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
1 创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
这里的创建了两个NioEventLoopGroup,只创建一个也可以,不明白的可以看下前面文章讲的Reactor模式。NioEventLoopGroup可以认为是个线程池,它会给每个连接的channel分配EventLoop,EventLoop就是个线程。
这里创建的是NioEventLoopGroup,分析下创建它的过程做了什么。
1.1 NioEventLoopGroup
先看下构造函数
public NioEventLoopGroup() {
this(0);
}
整个过程中构造函数调用的比较多,我们只看几个关键点。
public NioEventLoopGroup(int nThreads, Executor executor) {
//这里nThreads就是上面的0,executor为null,SelectorProvider.provider()这个java原生api,得到一个SelectorProvider
this(nThreads, executor, SelectorProvider.provider());
}
接着调用了父类MultithreadEventLoopGroup的构造方法。
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//这里判定如果没有指定线程数,设置默认线程数为系统处理器数量*2
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
接着再次调用另外一个父类MultithreadEventExecutorGroup的构造方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//创建执行器
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());//步骤1
}
//创建EventLoop数组
children = new EventExecutor[nThreads];//步骤2
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//初始化数组的对象
children[i] = newChild(executor, args);//步骤3
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
...//省略代码
}
}
}
//创建选择器,选择器会根据算法每次从上述数组选择一个EventLoop返回
chooser = chooserFactory.newChooser(children);//步骤4
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);//步骤5
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);//步骤6
}
(1)步骤1
前面我们没有指定执行器,所以步骤1创建了默认执行器,看下这个执行器是什么。
public final class ThreadPerTaskExecutor implements Executor {
//这里ThreadFactory 就是用来创建线程的
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
//创建线程,执行任务
threadFactory.newThread(command).start();
}
}
执行器构造函数参数是newDefaultThreadFactory方法返回的。
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
DefaultThreadFactory继承了JDK的ThreadFactory,就是用来创建线程的。
(2)步骤2
步骤2就是创建了一个EventExecutor数组,数组的大小就是我们之前指定的线程大小。
(3)步骤3
newChild创建数据对象,这里newChild调用的是NioEventLoopGroup方法,创建NioEventLoop对象。
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
看下创建NioEventLoop的过程。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
...//省略代码
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
//JDK的选择器
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
构造器里面创建了我们熟悉的JDK的选择器selector。
这里也调用了父类的构造方法,这里的父类是SingleThreadEventLoop,看名字就知道这是个单线程的。
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
//创建了线程等待队列的大小,这里是2147483647
tailTasks = newTaskQueue(maxPendingTasks);
}
(4)步骤4
创建选择器,这个选择器决定以什么算法从EventLoop数组中选择一个EventLoop执行任务。
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
这里使用的是PowerOfTwoEventExecutorChooser。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
(5)步骤5和步骤6
给每一个EventLoop添加了一个线程终止时的监听器。将上述EventLoop数组添加到了一个只读集合。
2 创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
ServerBootstrap构造函数什么都没做。
3 设置ServerBootstrap参数
b.group(group,work)//3
/*指明使用NIO进行网络通讯*/
.channel(NioServerSocketChannel.class)//4
//.channel(EpollServerSocketChannel.class)
/*指明服务器监听端口*/
.localAddress(new InetSocketAddress(port))
/*接收到连接请求,新启一个socket通信,也就是channel,每个channel
* 有自己的事件的handler*/
.childOption(ChannelOption.TCP_NODELAY,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
//ch.pipeline().addLast(new EchoServerHandler2());
}
});
3.1 group方法
首先调用了ServerBootstrap的group方法。
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
...//省略代码
this.childGroup = childGroup;
return this;
}
这里将parentGroup赋值给了父类AbstractBootstrap的group对象,childGroup赋值给了ServerBootstrap的childGroup对象。
这里如果group对象只传入了一个EventLoopGroup,上述两个对象是同一个。
3.2 channel方法
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
这里创建了一个channel工厂,内部持有了一个反射工厂。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
//构造器反射创建Channel
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
我们传入的是NioServerSocketChannel的class对象。
1.5 childOption和childHandler
这两个方法就是给ServerBootstrap对象赋值。
4. bind
ChannelFuture f = b.bind().sync();
看下ServerBootstrap对象的bind方法。
public ChannelFuture bind() {
//校验参数
validate();
//端口参数
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化channel并注册到selector选择器
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
//绑定端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
...//省略代码
}
}
4.1 initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//用前面将的channel工厂,利用反射创建channel对象
//这里创建的是NioServerSocketChannel
channel = channelFactory.newChannel();
//初始化
init(channel);
} catch (Throwable t) {
...//省略代码
}
//注册
ChannelFuture regFuture = config().group().register(channel);
...//省略代码
return regFuture;
}
(1)NioServerSocketChannel
这里先创建了NioServerSocketChannel对象,看下它的构造函数。
public NioServerSocketChannel() {
这里的DEFAULT_SELECTOR_PROVIDER是JDK的SelectorProvider对象。
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
newSocket创建的是ServerSocketChannelImpl对象。
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
...//省略代码
}
}
this调用了本类的构造方法。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
继续调用了父类的构造方法。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
//这里是SelectionKey.OP_ACCEPT,SelectionKey.OP_ACCEPT对应的整数是16,
this.readInterestOp = readInterestOp;
try {
//设置Channel非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
...//省略代码
}
}
再看下super调用的父类构造方法。
protected AbstractChannel(Channel parent) {
this.parent = parent;
//创建id
id = newId();
unsafe = newUnsafe();
创建Pipeline对象
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
这里创建DefaultChannelPipeline对象。
(2)DefaultChannelPipeline
看下DefaultChannelPipeline的构造方法。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
这里创建了两个Handler对象,一个是HeadContext,HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler接口,可以处理入站和出站事件。另外一个是TailContext,TailContext实现了ChannelInboundHandler,可以处理入站事件。
4.1.1 init方法
创建Channel对象后,调用init方法初始化该对象。
void init(Channel channel) throws Exception {
//将上述options赋值给该channel
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//将上述attrs赋值给该channel
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//Channel的Pipeline对象
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
//给Pipeline对象增加一个Handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
(1)addLast方法
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//创建DefaultChannelHandlerContext对象
newCtx = newContext(group, filterName(name, handler), handler);
//将这个Handler加入到TailContext前一个
addLast0(newCtx);
if (!registered) {
//channel还未注册到selector上时,走如下逻辑
newCtx.setAddPending();
//这里创建了一个PendingHandlerAddedTask,作用是调用ChannelInitializer的initChannel方法,并将自己从pipeline移除。
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
addLast0方法:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
callHandlerCallbackLater方法
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
//这里的added为true
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
//将PendingHandlerAddedTask对象赋值给pendingHandlerCallbackHead对象,作用在后面会讲到
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
4.1.2 register方法
这里将调用MultithreadEventLoopGroup的register方法。
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public EventExecutor next() {
//选择器从EventLoopGroup中选取一个EventLoop
return chooser.next();
}
这里next方法就是用上面讲的选择器从EventLoopGroup中选取一个EventLoop,调用它的register方法。
这里调用的是SingleThreadEventLoop的register方法。
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//这里调用的是AbstractChannel内部类AbstractUnsafe的register方法
promise.channel().unsafe().register(this, promise);
return promise;
}
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...//省略代码
AbstractChannel.this.eventLoop = eventLoop;
//当前线程是否是eventLoop的线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//如果不是放入eventLoop的等待队列,等待执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...//省略代码
}
}
}
这里就是netty中的多线程安全解决方案,在前边netty介绍我们讲过所有channel的IO事件都会和eventLoop绑定,一个eventLoop会和一个线程绑定。这里eventLoop.inEventLoop()会判断当前线程是不是EventLoop指定的线程,
如果不是加入eventLoop的等待队列。这样channel的IO事件都会在一个线程执行。
(1)inEventLoop方法
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
就是将当前线程和eventLoop线程进行对比,这里由于EventLoop还未指定线程,目前为null,当前线程为主线程,所以执行else代码块。
(2)execute
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//加入线程等待队列
addTask(task);
if (!inEventLoop) {
//启动线程
startThread();
if (isShutdown() && removeTask(task)) {
//如果线程池已经关闭,移除任务
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
(3)doStartThread
private void doStartThread() {
assert thread == null;
//这里调用的就是前面将的用线程工厂创建一个线程,然后执行下面的run方法。
executor.execute(new Runnable() {
@Override
public void run() {
//将当前线程赋值给NioEventLoop的thread
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//这里执行的是NioEventLoop的run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...//省略代码
);
}
(4)run
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
//处理注册时间
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
//执行队列中的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
...//省略代码
}
}
这里还没有注册事件,所以会直接执行队列中的任务。
(5)runAllTasks
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
//取出任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
//执行任务
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
这里safeExecute方法就是调用队列任务的run方法。这里将调用是AbstractChannel的register0方法。
(6)register0
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//向selector注册channel
doRegister();
neverRegistered = false;
registered = true;
//调用我们之前创建的PendingHandlerAddedTask中的方法
//作用在下面分析
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//这里把Register事件传播,基本没做什么
pipeline.fireChannelRegistered();
//这时候channel还没激活,因为端口还没绑定,isActive是false。
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
...//省略代码
}
}
(7)doRegister
这里调用的是AbstractNioChannel类的doRegister方法。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//向Selector选择器注册前面创建的ServerSocketChannel对象,并且感兴趣的事件为0,注意这里注册的还不是accept事件,因为端口还没绑定
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...//省略代码
}
}
}
(8)invokeHandlerAddedIfNeeded
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
//registered设为true,之后再向Pipeline中加入Handler和第一次加入不一样了
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
这里调用的是之前创建的PendingHandlerAddedTask对象,调用它的execute方法。
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
...//省略代码
}
}
}
callHandlerAdded0方法
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.setAddComplete();
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
...//省略代码
}
handlerAdded方法:
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
上述方法最后调用了ChannelInitializer类的initChannel。
(9)initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
//这里调用的就是之前定义ChannelInitializer的initChannel,注意这里还不是我们定义的ChannelInitializer。
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
//将ChannelInitializer从Pipeline中移除
remove(ctx);
}
return true;
}
return false;
}
我们看下之前的ChannelInitializer.
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//将ServerBootstrapAcceptor加入到Pipeline
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
现在Pipeline有三个Handler,顺序为HeadContext、ServerBootstrapAcceptor、TailContext。到这里initAndRegister方法分析完了,回到doBind方法继续分析。
4.2 doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
这里执行AbstractChannel的bind方法。
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
这里调用了pipeline的bind方法。先说明netty中所有出站事件都TailContext传播,入站事件从HeadContext传播。
(1)bind
这里的bind方法是从TailContext节点传播,是个出站事件
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
这里调用的是父类AbstractChannelHandlerContext的bind方法。
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
...//省略代码
//找到下一个可以处理出站事件的handler
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
(2)findContextOutbound
findContextOutbound方法从当前节点向前找OutboundHandler:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
目前我们的handler只有三个,能处理出站事件只有HeadContext。看下它的bind方法。
(3)bind
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
调用了AbstractChannel内部类AbstractUnsafe的bind方法。
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...//省略代码
boolean wasActive = isActive();
try {
/绑定端口
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//这时候channel已经激活
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//传播Active事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
(4)doBind
protected void doBind(SocketAddress localAddress) throws Exception {
//java原生api绑定端口,并且考虑了版本兼容
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
端口绑定后,channel已激活,这时候会调用fireChannelActive方法。这是个入站事件,从HeadContext开始传播。看下头结点的channelActive方法。
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//先把Active向后传播,这里ServerBootstrapAcceptor、TailContext都没做什么
ctx.fireChannelActive();
readIfIsAutoRead();
}
(5)readIfIsAutoRead
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
public Channel read() {
//read是个出站事件
pipeline.read();
return this;
}
read是个出站事件,所以从TailContext开始传播,但最终调用的是HeadContext的方法。
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
(6)beginRead
public final void beginRead() {
...//省略代码
try {
doBeginRead();
} catch (final Exception e) {
...//省略代码
}
}
这里最终调用了AbstractNioChannel的doBeginRead方法。
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//将感兴趣的事件注册成了OP_ACCEPT
selectionKey.interestOps(interestOps | readInterestOp);
}
}
到这里服务器启动过程已经分析完毕。
5 服务端启动过程总结
- 首先创建EventLoopGroup和ServerBootstrap对象,给EventLoopGroup和ServerBootstrap对象设置了相关属性,比如EventLoop数组。
- 创建NioServerSocketChannel对象,设置相关属性,比如初始化它的Pipeline,创建了JDK的ServerSocketChannelImpl对象,设置通道为非阻塞。
- 从EventLoop数组选取一个EventLoop,并且创建一个线程和EventLoop绑定,启动线程执行任务,在任务中将上述channel注册到selector上,但暂时不阻塞OP_ACCEPT事件,最后将ServerBootstrapAcceptor加入Pipeline。
- 将上述channel绑定监听端口,在selector上注册OP_ACCEPT事件等待客户端连接。
6 服务端处理OP_ACCEPT事件
上面我们分析了服务端启动的过程,接下来分析如果有客户端发起连接后,服务端怎么处理。
我们之前分析启动的过程中分析了NioEventLoop的run方法。这个方法会不断自旋处理队列中的任务以及各种选择器事件,其实处理选择器事件的是其中的processSelectedKeys方法。
private void processSelectedKeys() {
if (selectedKeys != null) {
//执行的是这个方法
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
6.1 processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//我们使用的是NIO,所以走这段代码
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
这里循环所有事件,调用processSelectedKey方法处理
(1)processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//检查事件是否有效
if (!k.isValid()) {
final EventLoop eventLoop;
try {
//获得channel的eventLoop
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// 处理连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 处理写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// 处理读事件或者ACCEPT事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
我们这里是ACCEPT事件,调用的是NioMessageUnsafe的read方法。
6.2 read
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//创建客户端连接
int localRead = doReadMessages(readBuf);
.//省略代码
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//传播read事件
pipeline.fireChannelRead(readBuf.get(i));
}
//清除readBuf
readBuf.clear();
allocHandle.readComplete();
//传播ReadComplete事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
(1)doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
//获取客户端连接,这里的SocketChannel是通过java原生的api获得
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//将上述SocketChannel 封装成NioSocketChannel对象,将入list对象
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
.//省略代码
return 0;
}
这里的list对象是NioMessageUnsafe类的成员变量,这里保存的所有连接上服务端的SocketChannel 。这里创建的NioSocketChannel对象,服务端启动时候创建的是NioServerSocketChannel对象。看下两个的类图。
两个很相似,前面讲过NioServerSocketChannel对象创建时候调用了父类AbstractNioMessageChannel的构造函数,AbstractNioMessageChannel又调用了父类AbstractNioChannel。NioSocketChannel创建的时候先调用的是AbstractNioByteChannel的构造函数,然后也调用了AbstractNioChannel。所以同样NioSocketChannel创建的和NioServerSocketChannel创建过程差不多,内部同样创建了Pipeline等对象。不同的是NioServerSocketChannel默认的InterestOp是OP_ACCEPT,而NioSocketChannel是OP_READ。
(2)fireChannelRead
注意现在有两个Pipeline,一个服务端NioServerSocketChannel的Pipeline,这里叫主Pipeline,一个是客户端连接后创建的NioSocketChannel的Pipeline,这个叫副Pipeline。这里的传播事件Pipeline是主Pipeline。
这里的read是个入站事件,主Pipeline里面现在有三个handle,HeadContext、ServerBootstrapAcceptor、TailContext。所以这里从HeadContext开始向下传播,HeadContext只是向下传播,TailContext也没做什么,主要看下ServerBootstrapAcceptor的channelRead方法。
(3)channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//这里是NioSocketChannel
final Channel child = (Channel) msg;
//这里的pipeline是副pipeline
child.pipeline().addLast(childHandler);
//设置属性
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//我们在服务端设置了两个EventLoopGroup,这里的childGroup是子 EventLoopGroup
//调用的还是MultithreadEventLoopGroup的register方法
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
(4)register
这个register方法在之前已经分析过了,不再重复了。就是给NioSocketChannel分配一个EventLoop,并且启动一个线程和EventLoop绑定,最终执行register0。
(5)register0
register0方法我们之前也分析过,就是将NioSocketChannel注册到selector上,并且将我们自己定义的handler加入pipeline。和之前不同的是这里的channel已经激活了。由于是第一次注册,所以会执行pipeline.fireChannelActive()方法。而之前启动时候是在绑定端口之后才执行的pipeline.fireChannelActive()。
这里的子pipeline里面的handler也有三个,HeadContext、EchoServerHandler(我们自定义的)、TailContext。所以和启动的时候一样,也是由HeadContext执行逻辑。
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
最终和之前一样调用到了AbstractNioChannel的doBeginRead方法。
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//如果这里除了读事件还有其他事件,都重新注册了
selectionKey.interestOps(interestOps | readInterestOp);
}
}
这里默认的是read事件,所以最后在NioSocketChannel注册的selector上注册了read事件。注册完后回到最开始的read方法继续执行。
(6)readBuf.clear()
在将所有的NioSocketChannel创建并且注册了read事件后,把readBuf清除。
(7)fireChannelReadComplete
最后在主channel上传播ReadComplete事件。ReadComplete事件也是个入站事件,从HeadContext开始传播。
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
//重新再注册accept事件
readIfIsAutoRead();
}
ReadComplete事件在ServerBootstrapAcceptor、TailContext中都没做什么。readIfIsAutoRead和之前一样,将NioServerSocketChannel在selector上重新注册了accept事件。
7 服务端处理READ事件
服务端处理read事件和ACCEPT事件流程差不多,只不过ACCEPT事件是由NioMessageUnsafe的read方法处理,read事件是由NioByteUnsafe的read方法处理。看下NioByteUnsafe的read方法。
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
//下面几行决定用创建什么样的ByteBuf,这里以池化、直接内存方式创建ByteBuf
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
//doReadBytes(byteBuf),读取数据放入byteBuf中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//没有读取到数据
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
//释放byteBuf
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
//传播ChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
//传播ReadComplete事件,这里再次在selector上注册了read事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
这里流程和上面差不多,具体不再分析了。