一、前言
上篇簡單分析了下future、promise,由于這個在netty源碼中用的實在太多,不得不提前了解下,這篇就開始分析源碼了,先從server端開始~~~
二、源碼分析
首先簡單貼下server端的代碼,這裡隻貼了核心代碼
public void startServer() throws Exception {
// 隻監聽一個端口,是以隻需要設定成線程數為1即可
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker);
// 底層實作1、首先建立ReflectiveChannelFactory2、調用反射機制建立
// NioServerSocketChannel,同時還會建立pipeline,預設建立DefaultChannelPipeline
b.channel(NioServerSocketChannel.class);
b.childHandler(nettyTelnetInitializer);
// 伺服器綁定端口監聽,bind()操作異步的
ChannelFuture f = b.bind(SERVER_PORT);
// 這個函數将會調用ChannelInitializer的initChannel(ChannelHandlerContext)方法
// 隻有在這個方法執行之後,server才算是綁定了端口
f = f.sync();
// 監聽伺服器關閉監聽
f.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
在startServer()方法中,首先建立了兩個NioEventLoopGroup類,由于netty是基于reactor線程模型的,這裡兩個類也分别代表boss和workers。(至于什麼是reactor線程模型,下下篇再介紹吧)
緊接着建立ServerBootstrap調用group()方法,
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
@SuppressWarnings("unchecked")
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}
再然後設定channel,由于是服務的,是以設定NioServerSocketChannel,底層源碼
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
由此可見,預設ChannelFactory是ReflectiveChannelFactory。再看它的構造函數
// 這裡的clazz很明顯就是NioServerSocketChannel,主要為了後面的反射使用
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
設定完一些參數之後之後,接着就是注冊和綁定了.綁定操作在AbstractBootstrap的doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化channel和注冊channel,都是異步操作
final ChannelFuture regFuture = initAndRegister();
// 擷取channel
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 為Future添加一個監聽器,當Future完成之後調用該方法,同時,當該方法執行之後,将會
// 調用sync()方法中的wait()之後的代碼
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
// 當注冊成功之後,設定字段,表示已注冊
promise.registered();
// 開始綁定ip和端口号
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
// 初始化和注冊
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 通過反射建立channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注冊channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
//ServerBootstrap類執行初始化
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 擷取對應channel的管道,在設定channel的時候就初始化了,預設是DefaultChannelPipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 為該管道添加一個handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加ServerBootstrapAcceptor,當用戶端開始連接配接的時候,将會調用ServerBootstrapAcceptor的channelRead()方法
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
// 注冊操作
// 在上面的注冊方法最後調用如下
// MultithreadEventLoopGroup類
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
// 關于這裡的next()是如何擷取EventExecutor就不點進去看了,主要是通過EventExecutorChooser來擷取.
// 接着來到這裡了SingleThreadEventLoop類
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
// AbstractUnsafe類
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// .......略
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// .......略
doRegister();
// .......略
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
// AbstractNioChannel類
// OK,到了這裡終于看見了selectKey了,這就是真正執行注冊了
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 擷取SelectionKeyImpl對象
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), , this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
//AbstractSelectableChannel類
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != )
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
// 到了這裡終于到了nio的selector了~~~
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
// 綁定
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 綁定IP/端口号
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
ok,到這裡,channel的初始化和注冊都完成了,DefaultChannelPromise,接着我們看最後一個方法吧.
// 此時的f是:DefaultChannelPromise的子類:PendingRegistrationPromise
f = f.sync()
//DefaultChannelPromise類
@Override
public ChannelPromise sync() throws InterruptedException {
super.sync();
return this;
}
//DefaultPromise<V>類
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
這裡的await方法比較重要,我們看源碼,前面的不重要,最重要的是wait()方法,這個是object的方法,也就是說到了這裡它會
進入”睡眠”狀态,直到future 完成.這個我們可以在
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
三、總結
本來準備用戶端服務放在一起分析的,不過可能占用篇幅過大,就分成兩篇了.
總的來說,server端過程如下:
1、初始化channel
2、NioEventLoop類的regist()方法,将channel注冊到selector
3、同步,就是調用sync()方法,該方法必須調用
雖然總結的比較簡單,但是源碼還是挺複雜的.
下篇就就分析client端的了~