netty是一個nio客戶機-伺服器架構,它簡化了tcp和udp網絡程式設計,相對于java傳統nio,netty還屏蔽了作業系統的差異性,并且兼顧了性能。
Channel
channel封裝了對socket的原子操作,實質是對socket的封裝和擴充。
netty架構自己定義的通道接口,是對java nio channel的封裝和擴充,用戶端NIO套接字通道是NioSocketChannel,提供的伺服器端NIO套接字通道是NioServerSocketChannel。
NioSocketChannel内部管理了一個Java NIO的SocketChanne執行個體,用來建立SocketChannel執行個體和設定該執行個體的屬性,并調用Connect 方法向服務端發起 TCP 連結等。
NioServerSocketChannel内部管理了Java NIO的ServerSocketChannel執行個體,用來建立ServerSocketChannel執行個體和設定該執行個體屬性,并調用執行個體的bind方法在指定端口監聽用戶端的連結。
EventLoopGroup
netty使用了主從多線程的Reactor模型。在netty中每個EventLoopGroup本身是一個線程池,其中包含了自定義個數的 NioEventLoop,每個NioEventLoop是一個線程,并且每個NioEventLoop都會關聯一個selector選擇器。
用戶端一般隻用一個EventLoopGroup來處理網絡IO操作,伺服器端一般使用兩個,boss-group用來接收用戶端發來的TCP連結請求,worker-group用來具體處理網絡請求。
當Channel是用戶端通道NioSocketChannel時,會注冊NioSocketChannel管理的SocketChannel執行個體到自己關聯的NioEventLoop的selector選擇器上,然後NioEventLoop對應的線程會通過select指令監控感興趣的網絡讀寫事件。
當 Channel 是服務端通道NioServerSocketChannel時,NioServerSocketChannel本身會被注冊到boss EventLoopGroup裡面的某一個NioEventLoop管理的selector選擇器,而完成三次握手的連結套接字是被注冊到了worker EventLoopGroup裡面的某一個NioEventLoop管理的selector選擇器上。
多個Channel可以注冊到同一個NioEventLoop管理的selector選擇器上,這時NioEventLoop對應的單個線程就可以處理多個Channel的就緒事件;但是每個Channel隻能注冊到一個固定的NioEventLoop管理的selector上。
ChannelPipeline
ChannelPipeline持有一個ChannelHandler的雙向鍊結構。每個Channel都有屬于自己的ChannelPipeline,對從Channel 中讀取或者要寫入 Channel 中的資料進行依次處理。
多ChannelPipeline裡面可以複用一個ChannelHandler。
Netty 用戶端底層與 Java NIO 對應關系
NioSocketChannel是對Java NIO的SocketChannel的封裝,從NioSocketChannel的構造函數可以看出:
public class NioSocketChannel extends AbstractNioByteChannel implements SocketChannel {
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a socket.", var2);
}
}
}
SocketChannel的父類是AbstractNioChannel,在AbstractNioChannel中定義了java nio的SocketChannel類型的成員,并将其模式設定為非阻塞:
public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
...
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
}
}
是以說NioSocketChannel内部是對java nio的SocketChannel的封裝擴充,建立NioSocketChannel執行個體對象時候相當于執行了Java NIO中:
SocketChannel socketChannel = SocketChannel.open();
NioSocketChannel執行個體的建立和注冊是在Bootstrap的connect階段。Bootstrap的connect代碼:
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
...
public ChannelFuture connect(InetAddress inetHost, int inetPort) {
return this.connect(new InetSocketAddress(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
...
return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
}
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
ChannelFuture regFuture = this.initAndRegister();
...
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {}
ChannelFuture regFuture = this.config().group().register(channel);
...
}
channelFactory.newChannel()建立了NIOSocketChannel執行個體。
繼續跟蹤this.config().group().register(channel)到:
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
最終會跟蹤到AbstractNioChannel的doRegister():
public abstract class AbstractNioChannel extends AbstractChannel {
...
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
}
其中:
this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this)
将NioSocketChannel執行個體注冊到了目前NioEventLoop的選擇器中。
從選擇器擷取就緒的事件是在該用戶端套接關聯的NioEventLoop裡面的做的,每個NioEventLoop裡面有一個線程用來循環從選擇器裡面擷取就緒的事件:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
...
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {}
}
}
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
while(true) {
...
int selectedKeys = selector.select(timeoutMillis);
++selectCnt;
...
} catch (CancelledKeyException var13) {}
}
從選擇器選取就緒的事件後,會最終調用processSelectedKey具體處理每個事件:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {}
}
netty和reactor模型
netty實作單線程reactor:
private EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(group)
.childHandler(new HeartbeatInitializer());
netty實作多線程reactor:
// 預設線程數為CPU核心數 * 2,*2是因為考慮到超線程CPU包括兩個邏輯線程
private EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(boss)
.childHandler(new HeartbeatInitializer());
netty實作主從多線程reactor:
private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(boss, work)
.childHandler(new HeartbeatInitializer());