天天看點

Netty系列:核心功能——傳輸 (Transport)

Netty核心功能——傳輸 (Transport)

Netty 為它所有的傳輸實作提供了一個通用 API,這使得這種轉換比直接使用 JDK 所能夠達到的簡單得多。所産生的代碼不會被實作的細節所污染,我們也不需要在整個代碼庫上進行廣泛的重構。簡而言之,我們可以将時間花在其他更有成效的事情上。

從一個例子開始

先來比較一下不使用netty以及使用netty所寫出來的nio程式對比:

//隻用 JDK API 來實作 NIO
public class PlainNioServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);                                            //1、綁定伺服器到制定端口
        Selector selector = Selector.open();                         //2、打開 selector 處理 channel
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);    //3、将ServerSocket 注冊到Selector以接受連接配接
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (;;) {
            try {
                selector.select();//4、等待需要處理的新事件;阻塞将一直持續到下一個傳入事件
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();    //5、擷取所有接收事件的SelectionKey執行個體
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {                //6、檢查事件是否是一個新的已經就緒可以被接受的連接配接
                        ServerSocketChannel server =
                                (ServerSocketChannel)key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE |
                                SelectionKey.OP_READ, msg.duplicate());    //7、接受用戶端,并将它注冊到選擇器
                        System.out.println(
                                "Accepted connection from " + client);
                    }
                    if (key.isWritable()) {                //8、檢查套接字是否已經準備好寫資料
                        SocketChannel client =
                                (SocketChannel)key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer)key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) {        //9、将資料寫到已連接配接的用戶端
                                break;
                            }
                        }
                        client.close();                    //10、關閉連接配接
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        // 在關閉時忽略
                    }
                }
            }
        }
    }
}      

将其改寫為使用netty實作:

public class NettyNioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new NioEventLoopGroup();//1、為非阻塞模式使用NioEventLoopGroup
        try {
            ServerBootstrap b = new ServerBootstrap();    //2、建立 ServerBootstrap
            b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
             .channel(NioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() { //3、指定 ChannelInitializer,對于每個已接受的連接配接都調用它
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {//4、添加 ChannelInboundHandlerAdapter以接收和處理事件
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate())//5将消息寫到用戶端,并添加ChannelFutureListener,以便消息寫完就關閉連接配接
                                .addListener(ChannelFutureListener.CLOSE);
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();                    //6、綁定伺服器以接受連接配接
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();                    //7、釋放所有的資源
        }
    }
}      

使用netty的代碼不僅更加的簡潔,還有一個好處是如果想要将nio轉變為oio,使用java原生api的話就需要重寫一個程式,而是用netty的話隻需要改變兩行代碼,分别是将​

​EventLoopGroup​

​類型改為​

​OioEventLoopGroup()​

​以及​

​ServerBootstrap​

​指定​

​channel​

​時将​

​OioServerSocketChannel.class​

​傳入即可。因為 Netty 為每種傳輸的實作都暴露了相同的 API,是以無論選用哪一種傳輸的實作,我們的代碼都幾乎不受影響。

傳輸 API

傳輸 API 的核心是 Channel 接口,它被用于所有的 I/O 操作,它的層級結構圖如下所示:

Netty系列:核心功能——傳輸 (Transport)

如圖所示,每個 Channel 都将會被配置設定一個 ChannelPipeline 和 ChannelConfig。 ChannelConfig 包含了該 Channel 的所有配置設定,并且支援熱更新。由于特定的傳輸可能具有獨特的設定,是以它可能會實作一個 ChannelConfig 的子類型。

ChannelHandler 的典型用途包括:
  • 将資料從一種格式轉換為另一種格式;
  • 提供異常的通知;
  • 提供 Channel 變為活動的或者非活動的通知;
  • 提供當 Channel 注冊到 EventLoop 或者從 EventLoop 登出時的通知;
  • 提供有關使用者自定義事件的通知。

内置的傳輸方式

Netty 内置了一些可開箱即用的傳輸方式。因為這些傳輸方式并不支援每一種協定,是以我們必須選擇一個和自己的應用程式所使用的協定相比對的傳輸方式。Netty 提供的傳輸方式如下表所示:

名稱 描述
NIO io.netty.channel.socket.nio 使用 java.nio.channels 包作為基礎——基于選擇器的方式
Epoll io.netty.channel.epoll 由 JNI 驅動的 epoll()和非阻塞 IO。這個傳輸支援 隻有在Linux上可用的多種特性,如SO_REUSEPORT, 比 NIO 傳輸更快,而且是完全非阻塞的(這個是 Netty 特有的實作,更加适配 Netty 現有的線程模型,具有更高的性能以及更低的垃圾回收壓力)
OIO io.netty.channel.socket.oio 使用 ​​java.net​​ 包作為基礎——使用阻塞流
Local io.netty.channel.local 可以在 VM 内部通過管道進行通信的本地傳輸
Embedded io.netty.channel.embedded Embedded 傳輸,允許使用 ChannelHandler 而又不需要一個真正的基于網絡的傳輸。這在測試你的 ChannelHandler 實作時非常有用

1、NIO—基于選擇器的IO

NIO 提供了一個所有 I/O 操作的全異步的實作,相對于傳統的IO,其突出的有點就是快速和簡單,在網絡傳輸中優勢明顯。NIO主要有三大核心部分:Channel(通道),Buffer(緩沖區), Selector(選擇器)。傳統IO基于位元組流和字元流進行操作,而NIO基于Channel和Buffer(緩沖區)進行操作,資料總是從通道讀取到緩沖區中,或者從緩沖區寫入到通道中。Selector(選擇器)用于監聽多個通道的事件(比如:連接配接打開,資料到達)。是以,單個線程可以監聽多個資料通道。

Netty系列:核心功能——傳輸 (Transport)

對于所有 Netty 的傳輸實作都共有的使用者級别 API 完全地隐藏了這些 NIO 的内部細節。下圖就展示了netty的處理流程:

Netty系列:核心功能——傳輸 (Transport)

接下來簡單講解一下NIO的相關知識:

首先要明确的第一點就是——java中的NIO(JDK 1.4引入)和IO模型中的NIO并不是一個東西。前者是New-IO,後者是Non-blocking-IO,在這個概念下我們繼續往下看。

先來了解一下什麼是IO,I/O 在程式中指的 Input/Output,也就是資料的輸入和輸出。程式的 I/O 操作依賴于底層的 I/O 操作,基本上都會用上底層的 read/write 操作,但 read/write 操作并不是直接寫入或讀取實體裝置,而是面向緩沖區操作,調用作業系統的 read,是把資料從核心緩沖區複制到程序緩沖區;而 write 系統調用,是把資料從程序緩沖區複制到核心緩沖區。如下圖:

Netty系列:核心功能——傳輸 (Transport)

緩存區的設定主要是為了減少頻繁地與裝置的實體交換。因為外部裝置的直接讀寫,涉及作業系統的中斷。發生系統中斷時,需要儲存之前的程序資料和狀态等資訊,而結束中斷之後,還需要恢複之前的程序資料和狀态等資訊。為了減少這種底層系統的時間損耗、性能損耗,于是出現了記憶體緩沖區。

再來了解一下了解兩組名詞:阻塞和非阻塞、同步和異步:

  • 阻塞:阻塞意味着調用方在結果出現之前,不會做任何其他事情。
  • 非阻塞:非阻塞意味着調用方在結果出現之前,同時在做其他的事情。被調用方會立即傳回一個值,調用方拿到後可以做其他事情也可以選擇不做。
  • 同步:必須等待被調用方處理完請求傳回結果。
  • 異步:被調用方處理請求通過調用方注冊的回調接口傳回結果。
有了這些前置的知識就可以來看看Unix 中的四種 I/O 模型:
同步阻塞 I/O (Blocking IO)
在阻塞式 IO 模型中,應用程式從 IO 系統調用開始,直到系統調用傳回,在這段時間内,調用程序是阻塞的。如下圖:

優點:實作簡單,而且阻塞期間線程挂起,使用者線程基本不會占用資源

缺點:在高并發的場景下,需要大量的線程來維護每一個阻塞的任務,記憶體和線程切換的開銷都非常大。

同步非阻塞 I/O (Non-Blocking IO)
在這種模式下,如果調用時沒有資料,系統會立即傳回一個調用失敗的消息。然後調用方過段時間再查詢,如果有資料,則變成阻塞方式,進行資料複制。如下圖:

優點:每次的調用都可以立即得到回報,調用方不會阻塞,實時性比較好

缺點:調用方需要不斷的輪詢,這個會占用 CPU 資源,并且效率低下。

I/O 多路複用 (IO Multiplexing)
為了避免非阻塞 IO 模型上輪詢的問題,系統設計了 select/epoll,在這個模式下,一個程序可以監控多個檔案描述符,也就是多個資料的就緒狀态。目前支援 IO 多路複用的系統調用有 select,epoll 等,select 基本在所有的系統都支援,epoll 是 Linux 2.6 核心提出的,是 select 的一種增強版本。流程如下:
Netty系列:核心功能——傳輸 (Transport)

IO多路複用模型的基本原理就是select/epoll系統調用,單個線程不斷的輪詢select/epoll系統調用所負責的成百上千的socket連接配接,當某個或者某些socket網絡連接配接有資料到達了,就傳回這些可以讀寫的連接配接。

優點:通過一次select/epoll系統調用,就查詢到到可以讀寫的一個甚至是成百上千的網絡連接配接。

缺點:IO 多路複用本質上還是屬于同步 IO,也就是資料就緒後,還是需要阻塞等待資料複制完成。

異步 I/O (Asynchronous IO)
被調用方完成所有的操作後,再通知調用方。調用方得到通知後可以直接進行後續操作,而不需進行等待,如下圖:
Netty系列:核心功能——傳輸 (Transport)

優點:調用方永遠不會阻塞。

缺點:因為被調用方完成了大部分工作,是以需要被調用方支援。目前 Windows 下有 IOCP 實作了異步 IO,Linux 下在 2.6 引入,但是還不完善,并且底層依舊采用 epoll,和 IO 多路複用相比,性能上沒有明顯的優勢。

現在再來看java中的NIO,其實他主要是基于 IO 多路複用這個 IO 模型。

最後再提一下零拷貝這個概念: 零拷貝(zero-copy)是一種目前隻有在使用 NIO 和 Epoll 傳輸時才可使用的特性。它使你可以快速高效地将資料從檔案系統移動到網絡接口,而不需要将其從核心空間複制到使用者空間,其在像 FTP 或者HTTP 這樣的協定中可以顯著地提升性能。但是,并不是所有的作業系統都支援這一特性。特别地,它對于實作了資料加密或者壓縮的檔案系統是不可用的——隻能傳輸檔案的原始内容。反過來說,傳輸已被加密的檔案則不是問題。

2、Epoll—用于 Linux 的本地非阻塞傳輸

之前所說的Netty 的 NIO 傳輸基于 Java 提供的異步/非阻塞網絡程式設計的通用抽象。雖然這保證了 Netty 的非阻塞 API 可以在任何平台上使用,但它也包含了相應的限制,因為 JDK為了在所有系統上提供相同的功能,必須做出妥協。

Linux作為高性能網絡程式設計的平台,其重要性與日俱增,這催生了大量先進特性的開發,其中包括epoll——一個高度可擴充的I/O事件通知特性。這個API自Linux核心版本 2.5.44 被引入,提供了比舊的POSIX ​

​select​

​和​

​poll​

​系統調用 更好的性能。

Netty為Linux提供了一組NIO API,使用一種和它本身的設計更加一緻的方式使用epoll,并且以一種更加輕量的方式使用中斷。如果我們的應用程式是運作在Linux系統上的,那麼有必要考慮利用這個版本的傳輸方式;因為在高負載下它的性能要優于JDK的NIO實作。

來看看IO多路複用中select、poll、epoll之間的差別:

先來了解一個概念——檔案描述符

檔案描述符(File descriptor)是一個用于表述指向檔案的引用的抽象化概念。在形式上是一個非負整數。實際上可以把它了解為一個索引值,指向核心為每一個程序所維護的該程序打開檔案的記錄表。當程式打開一個現有檔案或者建立一個新檔案時,核心向程序傳回一個檔案描述符。在程式設計中,一些涉及底層的程式編寫往往會圍繞着檔案描述符展開。

I/O 多路複用機制指核心一旦發現程序指定的一個或者多個IO條件準備讀取,它就通知該程序,就是說通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或寫就緒),能夠通知程式進行相應的讀寫操作。這種機制的使用需要 select 、 poll 、 epoll 來配合。

select ==> 時間複雜度O(n)
select是第一個實作IO多路複用 ,是三者當中最底層的,它的事件的輪訓機制是基于比特位的。每次查詢都要周遊整個事件清單,是以select具有O(n)的無差别輪詢複雜度,同時處理的流越多,無差别輪詢時間就越長。
poll ==> 時間複雜度O(n)
poll本質上和select沒有差別,改進了select最大數量限制。它将使用者傳入的數組拷貝到核心空間,然後查詢每個fd對應的裝置狀态,如果裝置就緒則在裝置等待隊列中加入一項并繼續周遊,如果周遊完所有fd後沒有發現就緒裝置,則挂起目前程序,直到裝置就緒或者主動逾時,被喚醒後它又要再次周遊fd。這個過程經曆了多次無謂的周遊。
epoll ==> 時間複雜度O(1)
epoll使用一個檔案描述符管理多個描述符,将使用者關系的檔案描述符的事件存放到核心的一個事件表中,這樣在使用者空間和核心空間的拷貝隻需要一次。

3、OIO—舊的阻塞 I/O

一般不怎麼會用到,但是你可以不用,不過他不能沒有。例如,你可能需要移植使用了一些進行阻塞調用的庫(如JDBC )的遺留代碼,而将邏輯轉 換為非阻塞的可能也是不切實際的,這時候OIO就派上用場了,可以在短期内使用Netty的OIO傳輸進行過度,然後再将代碼移植到純粹的異步傳輸上。

這時候你可能會想,Netty是如何能夠使用和用于異步傳輸相同的API來支援OIO的呢。 答案就是,Netty利用了SO_TIMEOUT這個Socket标志,它指定了等待一個I/O操作完成的最大毫秒 數。如果操作在指定的時間間隔内沒有完成,則将會抛出一個SocketTimeout Exception。Netty 将捕獲這個異常并繼續處理循環。在EventLoop下一次運作時,它将再次嘗試。這實際上也是類似于Netty這樣的異步架構能夠支援OIO的唯一方式。

Netty系列:核心功能——傳輸 (Transport)

4、用于 JVM 内部通信的 Local 傳輸

Netty 提供了一個 Local 傳輸,用于在同一個 JVM 中運作的用戶端和伺服器程式之間的異步 通信。同樣,這個傳輸也支援對于所有 Netty 傳輸實作都共同的 API。

5、Embedded 傳輸

總結