天天看點

netty和nioChannelEventLoopGroupChannelPipelineNetty 用戶端底層與 Java NIO 對應關系netty和reactor模型

netty是一個nio客戶機-伺服器架構,它簡化了tcp和udp網絡程式設計,相對于java傳統nio,netty還屏蔽了作業系統的差異性,并且兼顧了性能。

Channel

channel封裝了對socket的原子操作,實質是對socket的封裝和擴充。

netty架構自己定義的通道接口,是對java nio channel的封裝和擴充,用戶端NIO套接字通道是NioSocketChannel,提供的伺服器端NIO套接字通道是NioServerSocketChannel。

NioSocketChannel内部管理了一個Java NIO的SocketChanne執行個體,用來建立SocketChannel執行個體和設定該執行個體的屬性,并調用Connect 方法向服務端發起 TCP 連結等。

NioServerSocketChannel内部管理了Java NIO的ServerSocketChannel執行個體,用來建立ServerSocketChannel執行個體和設定該執行個體屬性,并調用執行個體的bind方法在指定端口監聽用戶端的連結。

EventLoopGroup

netty使用了主從多線程的Reactor模型。在netty中每個EventLoopGroup本身是一個線程池,其中包含了自定義個數的 NioEventLoop,每個NioEventLoop是一個線程,并且每個NioEventLoop都會關聯一個selector選擇器。

用戶端一般隻用一個EventLoopGroup來處理網絡IO操作,伺服器端一般使用兩個,boss-group用來接收用戶端發來的TCP連結請求,worker-group用來具體處理網絡請求。

當Channel是用戶端通道NioSocketChannel時,會注冊NioSocketChannel管理的SocketChannel執行個體到自己關聯的NioEventLoop的selector選擇器上,然後NioEventLoop對應的線程會通過select指令監控感興趣的網絡讀寫事件。

當 Channel 是服務端通道NioServerSocketChannel時,NioServerSocketChannel本身會被注冊到boss EventLoopGroup裡面的某一個NioEventLoop管理的selector選擇器,而完成三次握手的連結套接字是被注冊到了worker EventLoopGroup裡面的某一個NioEventLoop管理的selector選擇器上。

多個Channel可以注冊到同一個NioEventLoop管理的selector選擇器上,這時NioEventLoop對應的單個線程就可以處理多個Channel的就緒事件;但是每個Channel隻能注冊到一個固定的NioEventLoop管理的selector上。

ChannelPipeline

ChannelPipeline持有一個ChannelHandler的雙向鍊結構。每個Channel都有屬于自己的ChannelPipeline,對從Channel 中讀取或者要寫入 Channel 中的資料進行依次處理。

多ChannelPipeline裡面可以複用一個ChannelHandler。

Netty 用戶端底層與 Java NIO 對應關系

NioSocketChannel是對Java NIO的SocketChannel的封裝,從NioSocketChannel的構造函數可以看出:

public class NioSocketChannel extends AbstractNioByteChannel implements SocketChannel {
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }

    public NioSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }
    
    private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openSocketChannel();
        } catch (IOException var2) {
            throw new ChannelException("Failed to open a socket.", var2);
        }
    }
}           

SocketChannel的父類是AbstractNioChannel,在AbstractNioChannel中定義了java nio的SocketChannel類型的成員,并将其模式設定為非阻塞:

public abstract class AbstractNioChannel extends AbstractChannel {
    private final SelectableChannel ch;
    
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        ...
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        ch.configureBlocking(false);
    }
}           

是以說NioSocketChannel内部是對java nio的SocketChannel的封裝擴充,建立NioSocketChannel執行個體對象時候相當于執行了Java NIO中:

SocketChannel socketChannel = SocketChannel.open();           

NioSocketChannel執行個體的建立和注冊是在Bootstrap的connect階段。Bootstrap的connect代碼:

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    ...
    public ChannelFuture connect(InetAddress inetHost, int inetPort) {
        return this.connect(new InetSocketAddress(inetHost, inetPort));
    }
 
    public ChannelFuture connect(SocketAddress remoteAddress) {
        ...
        return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
    }
 
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        ChannelFuture regFuture = this.initAndRegister();
        ...
    }
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = this.channelFactory.newChannel();
            this.init(channel);
        } catch (Throwable var3) {}

        ChannelFuture regFuture = this.config().group().register(channel);
        ...
    }           

channelFactory.newChannel()建立了NIOSocketChannel執行個體。

繼續跟蹤this.config().group().register(channel)到:

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    ...
    public ChannelFuture register(Channel channel) {
        return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
    }
    
    public ChannelFuture register(ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }           

最終會跟蹤到AbstractNioChannel的doRegister():

public abstract class AbstractNioChannel extends AbstractChannel {
    ...
    protected void doRegister() throws Exception {
        boolean selected = false;

        while(true) {
            try {
                this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException var3) {
                if (selected) {
                    throw var3;
                }

                this.eventLoop().selectNow();
                selected = true;
            }
        }
    }
}               

其中:

this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this)           

将NioSocketChannel執行個體注冊到了目前NioEventLoop的選擇器中。

從選擇器擷取就緒的事件是在該用戶端套接關聯的NioEventLoop裡面的做的,每個NioEventLoop裡面有一個線程用來循環從選擇器裡面擷取就緒的事件:

protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }
                ...
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {}
        }
    }

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;

        try {
            while(true) {
                ...
                int selectedKeys = selector.select(timeoutMillis);
                ++selectCnt;
                ...
        } catch (CancelledKeyException var13) {}

    }           

從選擇器選取就緒的事件後,會最終調用processSelectedKey具體處理每個事件:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {}
    }           

netty和reactor模型

netty實作單線程reactor:

private EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap()
    .group(group)
    .childHandler(new HeartbeatInitializer());           

netty實作多線程reactor:

// 預設線程數為CPU核心數 * 2,*2是因為考慮到超線程CPU包括兩個邏輯線程
private EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
    .group(boss)
    .childHandler(new HeartbeatInitializer());           

netty實作主從多線程reactor:

private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
    .group(boss, work)
    .childHandler(new HeartbeatInitializer());