天天看點

Netty的體系結構及使用

《Netty權威指南》

一、異步和事件驅動

1.Java網絡程式設計

阻塞I/O -- socket

非阻塞I/O -- NIO

2.Netty簡介

代碼清單 1-3 展示了一個 Netty所做的是事情和很好的例子。 這裡,connect()方法将會直接傳回,而不會阻塞,該調用将會在背景完成。這究竟什麼時候會發生 則取決于若幹的因素,但這個關注點已經從代碼中抽象出來了。因為線程不用阻塞以等待對應的 操作完成,是以它可以同時做其他的工作,進而更加有效地利用資源。

代碼清單 1-3 異步地建立連接配接

Channel channel = ...; // Does not block ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));

1. 導讀

在這一章中,我們介紹了 Netty 架構的背景知識,包括 Java 網絡程式設計 API 的演變過程,阻塞 和非阻塞網絡操作之間的差別,以及異步 I/O 在高容量、高性能的網絡程式設計中的優勢。

然後,我們概述了 Netty 的特性、設計和優點,其中包括 Netty 異步模型的底層機制,包括 回調、Future 以及它們的結合使用。我們還談到了事件是如何産生的以及如何攔截和處理它們。

3. Netty 的核心元件

Channel;

Channel 是 Java NIO 的一個基本構造。

它代表一個到實體(如一個硬體裝置、一個檔案、一個網絡套接字或者一個能夠執行一個或者多個不同的I/O操作的程式元件)的開放連接配接,如讀操作和寫操作 1。

目前,可以把 Channel 看作是傳入(入站)或者傳出(出站)資料的載體。是以,它可以被打開或者被關閉,連接配接或者斷開連接配接。

回調;

Future;

Future 提供了另一種在操作完成時通知應用程式的方式。這個對象可以看作是一個異步操 作的結果的占位符;它将在未來的某個時刻完成,并提供對其結果的通路。

JDK 預置了 interface java.util.concurrent.Future,但是其所提供的實作,隻 允許手動檢查對應的操作是否已經完成,或者一直阻塞直到它完成。這是非常繁瑣的,是以 Netty 提供了它自己的實作——ChannelFuture,用于在執行異步操作的時候使用。

ChannelFuture提供了幾種額外的方法,這些方法使得我們能夠注冊一個或者多個 ChannelFutureListener執行個體。監聽器的回調方法operationComplete(),将會在對應的 操作完成時被調用 1。然後監聽器可以判斷該操作是成功地完成了還是出錯了。如果是後者,我 們可以檢索産生的Throwable。簡而 言之 ,由ChannelFutureListener提供的通知機制消除 了手動檢查對應的操作是否完成的必要。

每個 Netty 的出站 I/O 操作都将傳回一個 ChannelFuture;也就是說,它們都不會阻塞。 正如我們前面所提到過的一樣,Netty 完全是異步和事件驅動的。

如果你把 ChannelFutureListener 看作是回調的一個更加精細的版本,那麼你是對的。 事實上,回調和 Future 是互相補充的機制;它們互相結合,構成了 Netty 本身的關鍵構件塊之一。

事件和 ChannelHandler。

Netty 使用不同的事件來通知我們狀态的改變或者是操作的狀态。這使得我們能夠基于已經 發生的事件來觸發适當的動作。這些動作可能是:

記錄日志;

資料轉換;

流控制;

應用程式邏輯。

Netty 是一個網絡程式設計架構,是以事件是按照它們與入站或出站資料流的相關性進行分類的。

可能由入站資料或者相關的狀态更改而觸發的事件包括:

連接配接已被激活或者連接配接失活;

資料讀取;

使用者事件;

錯誤事件。

出站事件是未來将會觸發的某個動作的操作結果,這些動作包括:

打開或者關閉到遠端節點的連接配接;

将資料寫到或者沖刷到套接字。

每個事件都可以被分發給 ChannelHandler 類中的某個使用者實作的方法。這是一個很好的 将事件驅動範式直接轉換為應用程式構件塊的例子。圖 1-3 展示了一個事件是如何被一個這樣的 ChannelHandler 鍊處理的。

Netty 的 ChannelHandler 為處理器提供了基本的抽象,如圖 1-3 所示的那些。我們會 在适當的時候對 ChannelHandler 進行更多的說明,但是目前你可以認為每個 Channel- Handler 的執行個體都類似于一種為了響應特定事件而被執行的回調。

Netty 提供了大量預定義的可以開箱即用的 ChannelHandler 實作,包括用于各種協定 (如 HTTP 和 SSL/TLS)的 ChannelHandler。在内部,ChannelHandler 自己也使用了事件

和 Future,使得它們也成為了你的應用程式将使用的相同抽象的消費者。

3. 把他們放在一起

1.Future、回調和 ChannelHandler

Netty 的異步程式設計模型是建立在 Future 和回調的概念之上的,而将事件派發到 ChannelHandler 的方法則發生在更深的層次上。結合在一起,這些元素就提供了一個處理環境,使你的應用程式邏 輯可以獨立于任何網絡操作相關的顧慮而獨立地演變。這也是 Netty 的設計方式的一個關鍵目标。

攔截操作以及高速地轉換入站資料和出站資料,都隻需要你提供回調或者利用操作所傳回的 Future。這使得連結操作變得既簡單又高效,并且促進了可重用的通用代碼的編寫。

2.選擇器、事件和 EventLoop

Netty 通過觸發事件将 Selector 從應用程式中抽象出來,消除了所有本來将需要手動編寫 的派發代碼。在内部,将會為每個 Channel 配置設定一個 EventLoop,用以處理所有事件,包括:

注冊感興趣的事件;

将事件派發給 ChannelHandler;

安排進一步的動作。

EventLoop 本身隻由一個線程驅動,其處理了一個 Channel 的所有 I/O 事件,并且在該

EventLoop 的整個生命周期内都不會改變。

這個簡單而強大的設計消除了你可能有的在ChannelHandler 實作中需要進行同步的任何顧慮,

是以,你可以專注于提供正确的邏輯,用來在有感興趣的資料要處理的時候執行。如同我們在詳

細探讨 Netty 的線程模型時将會看到的,該 API 是簡單而緊湊的。

二、第一款Netty應用程式

1.編寫 Echo 伺服器

所有的 Netty 伺服器都需要以下兩部分。

至少一個ChannelHandler — 伺服器對從用戶端接收的資料的處理,即它的業務邏輯。

引導 — 這是配置伺服器的啟動代碼。eg: 将伺服器綁定到它要監聽連接配接請求的端口上。

在第 1 章中,我們介紹了 Future 和回調,并且闡述了它們在事件驅動設計中的應用。我們 還讨論了 ChannelHandler,它是一個接口族的父接口,它的實作負責接收并響應事件通知。 在 Netty 應用程式中,所有的資料處理邏輯都包含在這些核心抽象的實作中。

因為你的 Echo 伺服器會響應傳入的消息,是以它需要實作 ChannelInboundHandler 接口,用 來定義響應入站事件的方法。這個簡單的應用程式隻需要用到少量的這些方法,是以繼承 Channel- InboundHandlerAdapter 類也就足夠了,它提供了 ChannelInboundHandler 的預設實作。

我們感興趣的方法是:

channelRead()— 對于每個傳入的消息都要調用;

channelReadComplete()— 通知ChannelInboundHandler最後一次對channel-

Read()的調用是目前批量讀取中的最後一條消息;

exceptionCaught()— 在讀取操作期間,有異常抛出時會調用。

該 Echo 伺服器的 ChannelHandler 實作是 EchoServerHandler,如代碼清單 2-1 所示。

/** * 代碼清單 2-1 */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); // 将接收到的消息 寫給發送者,而 不沖刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { // 将未決消息沖刷到 遠端節點,并且關 閉該 Channel,釋放消息 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 列印異常 棧跟蹤并關閉該Channel cause.printStackTrace(); ctx.close(); } }

除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習的 ChannelHandler 的 子類型和實作,我們将在第 6 章和第 7 章中對它們進行詳細的闡述。目前,請記住下面這些關鍵點:

針對不同類型的事件來調用 ChannelHandler; 應用程式通過實作或者擴充 ChannelHandler 來挂鈎到事件的生命周期,并且提供自

定義的應用程式邏輯; 在架構上,ChannelHandler 有助于保持業務邏輯與網絡處理代碼的分離。這簡化了開

發過程,因為代碼必須不斷地演化以響應不斷變化的需求。

2. 引導伺服器

在讨論過由 EchoServerHandler 實作的核心業務邏輯之後,我們現在可以探讨引導服務 器本身的過程了,具體涉及以下内容:

綁定到伺服器端口, 監聽并接受傳入連接配接請求;

配置 Channel,以将有關的入站消息通知給 EchoServerHandler 執行個體。

/** * 代碼清單2-2 EchoServer類 */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println( "Usage: " + EchoServer.class.getSimpleName() + " <port>"); } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); // 使用的是 NIO 傳輸,是以指定 了NioEventLoopGroup來接受和處理新的連接配接 EventLoopGroup group = new NioEventLoopGroup(); try { // 建立 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 指定所使用的 NIO 傳輸 Channel .channel(NioServerSocketChannel.class) //使用指定的 端口設定套 接字位址 .localAddress(new InetSocketAddress(port)) //使用了一個特殊的類——ChannelInitializer。這是關鍵。當一個新的連接配接被接收時, // 一個新的子 Channel 将會被建立,而 ChannelInitializer 将會把一個你的 // EchoServerHandler 的執行個體添加到該 Channel 的 ChannelPipeline 中。 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); //異步地綁定伺服器; 調用 sync()方法阻塞 等待直到綁定完成 ChannelFuture f = b.bind().sync(); //擷取 Channel 的 CloseFuture,并 且阻塞目前線 程直到它完成 f.channel().closeFuture().sync(); } finally { // 關閉 EventLoopGroup, 釋放所有的資源 group.shutdownGracefully().sync(); } } }

在這個時候,伺服器已經初始化,并且已經就緒能被使用了。這個示例使用了 NIO,因為得益于它的可擴充性和徹底的異步性,它是目前使用最廣泛的傳 輸。但是也可以使用一個不同的傳輸實作。如果你想要在自己的伺服器中使用 OIO 傳輸,将需 要指定 OioServerSocketChannel 和 OioEventLoopGroup。

讓我們回顧一下你剛完成的伺服器實作中的重要步驟。下面這些是伺服器的主要代碼元件:

EchoServerHandler 實作了業務邏輯;

main()方法引導了伺服器;

引導過程中所需要的步驟如下:

建立一個 ServerBootstrap 的執行個體以引導和綁定伺服器;

建立并配置設定一個 NioEventLoopGroup 執行個體以進行事件的處理,如接受新連接配接以及讀/寫資料;

指定伺服器綁定的本地的 InetSocketAddress;

使用一個 EchoServerHandler 的執行個體初始化每一個新的 Channel;

調用 ServerBootstrap.bind()方法以綁定伺服器。

3.編寫 Echo 用戶端

Echo 用戶端将會:

(1)連接配接到伺服器;

(2)發送一個或者多個消息; (3)對于每個消息,等待并接收從伺服器發回的相同的消息; (4)關閉連接配接。

編寫用戶端所涉及的兩個主要代碼部分也是業務邏輯和引導,和你在伺服器中看到的一樣。

如同伺服器,用戶端将擁有一個用來處理資料的 ChannelInboundHandler。在這 個場景 下,你将擴充 SimpleChannelInboundHandler 類以處理所有必須的任務,如代碼清單 2-3 所示。這要求重寫下面的方法:

channelActive()——在到伺服器的連接配接已經建立之後将被調用; channelRead0()1——當從伺服器接收到一條消息時被調用; exceptionCaught()——在處理過程中引發異常時被調用。

代碼清單 2-3 用戶端的 ChannelHandler

4 引導用戶端

如同将在代碼清單 2-4 中所看到的,引導用戶端類似于引導伺服器,不同的是,用戶端是使 用主機和端口參數來連接配接遠端位址

/** * 代碼清單 2-4 用戶端的主類 */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) // 設定伺服器的InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println( "Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }

讓我們回顧一下這一節中所介紹的要點:

為初始化用戶端,建立了一個 Bootstrap 執行個體;

為進行事件處理配置設定了一個 NioEventLoopGroup 執行個體,其中事件處理包括建立新的 連接配接以及處理入站和出站資料;

為伺服器連接配接建立了一個 InetSocketAddress 執行個體;

當連接配接被建立時,一個 EchoClientHandler 執行個體會被安裝到(該 Channel 的)

ChannelPipeline 中;

在一切都設定完成後,調用 Bootstrap.connect()方法連接配接到遠端節點; 完成了用戶端,你便可以着手建構并測試該系統了。

三、Netty3元件和設計 本章主要内容

 Netty 的技術和體系結構方面的内容

 Channel、EventLoop 和 ChannelFuture

 ChannelHandler 和 ChannelPipeline

 引導

我們将從兩個不同的但卻又密切相 關的視角來探讨 Netty: 類庫的視角以及架構的視角。對于使用 Netty 編寫高效的、可重用的和 可維護的代碼來說,兩者缺一不可。

從高層次的角度來看,Netty 解決了兩個相應的關注領域,我們可将其大緻标記為技術的和 體系結構的。首先,它的基于 Java NIO 的異步的和事件驅動的實作,保證了高負載下應用程式 性能的最大化和可伸縮性。其次,Netty 也包含了一組設計模式,将應用程式邏輯從網絡層解耦, 簡化了開發過程,同時也最大限度地提高了可測試性、子產品化以及代碼的可重用性。

在我們更加詳細地研究 Netty 的各個元件時,我們将密切關注它們是如何協作來支撐這 些體系結構上的最佳實踐的。通過遵循同樣的原則,我們便可獲得 Netty 所提供的所有益處。

3.1 Channel、EventLoop 和 ChannelFuture

這些類合在一起,可以被認為是 Netty 網絡抽象的代表:

Channel— Socket;

EventLoop— 控制流、多線程處理、并發;

ChannelFuture— 異步通知。

Channel

基本的 I/O 操作(bind()、connect()、read()和 write())依賴于底層網絡傳輸所提 供的原語。

在基于 Java 的網絡程式設計中,其基本的構造是 class Socket。Netty 的 Channel 接 口所提供的 API,大大地降低了直接使用 Socket 類的複雜性。

此外,Channel 也是擁有許多 預定義的、專門化實作的廣泛類層次結構的根

EventLoop

EventLoop 定義了 Netty 的核心抽象,用于處理連接配接的生命周期中所發生的事件。

目前,圖 3-1 在高層次上說明了 Channel、EventLoop、Thread 以及 EventLoopGroup

之間的關系。

一個 EventLoopGroup 包含一個或者多個 EventLoop;

一個 EventLoop 在它的生命周期内隻和一個 Thread 綁定;

所有由 EventLoop 處理的 I/O 事件都将在它專有的 Thread 上被處理;

一個 Channel 在它的生命周期内隻注冊于一個 EventLoop;

一個 EventLoop 可能會被配置設定給一個或多個 Channel。

注意,在這種設計中,一個給定Channel 的 I/O 操作都是由相同的 Thread 執行的,實際

上消除了對于同步的需要。

ChannelFuture

正如我們已經解釋過的那樣,Netty 中所有的 I/O 操作都是異步的。因為一個操作可能不會

立即傳回,是以我們需要一種用于在之後的某個時間點确定其結果的方法。為此,Netty 提供了

ChannelFuture接口,其addListener()方法注冊了一個ChannelFutureListener,以

便在某個操作完成時(無論是否成功)得到通知。

關于 ChannelFuture 的更多讨論 可以将 ChannelFuture 看作是将來要執行的操作的結果的 占位符。它究竟什麼時候被執行則可能取決于若幹的因素,是以不可能準确地預測,但是可以肯

定的是它将會被執行。此外,所有屬于同一個 Channel 的操作都被保證其将以它們被調用的順序

被執行。

3.2 ChannelHandler 和 ChannelPipeline

ChannelHandler 接口

從應用程式開發人員的角度來看,Netty 的主要元件是 ChannelHandler,它充當了所有處理入站和出站資料的應用程式邏輯的容器。

因為 ChannelHandler 的方法是 由網絡事件(其中術語“事件”的使用非常廣泛)觸發的。

事實上,ChannelHandler 可專 門用于幾乎任何類型的動作,例如将資料從一種格式轉換為另外一種格式,或者處理轉換過程 中所抛出的異常。

舉例來說,ChannelInboundHandler 是一個你将會經常實作的子接口。這種類型的 ChannelHandler 接收入站事件和資料,這些資料随後将會被你的應用程式的業務邏輯所處 理。當你要給連接配接的用戶端發送響應時,也可以從 ChannelInboundHandler 沖刷資料。你 的應用程式的業務邏輯通常駐留在一個或者多個 ChannelInboundHandler 中。

ChannelPipeline 接口

使得事件流經 ChannelPipeline 是 ChannelHandler 的工作,它們是在應用程式的初 始化或者引導階段被安裝的。

這些對象接收事件、執行它們所實作的處理邏輯,并将資料傳遞給 鍊中的下一個 ChannelHandler。

它們的執行順序是由它們被添加的順序所決定的。實際上, 被我們稱為 ChannelPipeline 的是這些 ChannelHandler 的編排順序。

圖 3-3 說明了一個 Netty 應用程式中入站和出站資料流之間的差別。從一個用戶端應用程式 的角度來看,如果事件的運動方向是從用戶端到伺服器端,那麼我們稱這些事件為出站的,反之 則稱為入站的。

圖 3-3 也顯示了入站和出站 ChannelHandler 可以被安裝到同一個 ChannelPipeline 中。

如果一個消息或者任何其他的入站事件被讀取,那麼它會從 ChannelPipeline 的頭部 開始流動,并被傳遞給第一個 ChannelInboundHandler。這個 ChannelHandler 不一定 會實際地修改資料,具體取決于它的具體功能,在這之後,資料将會被傳遞給鍊中的下一個 ChannelInboundHandler。最終,資料将會到達 ChannelPipeline 的尾端,屆時,所有 處理就都結束了。

資料的出站運動(即正在被寫的資料)在概念上也是一樣的。

關于入站和出站 ChannelHandler 的更多讨論

通過使用作為參數傳遞到每個方法的 ChannelHandlerContext,事件可以被傳遞給目前

ChannelHandler 鍊中的下一個 ChannelHandler。因為你有時會忽略那些不感興趣的事件,是以 Netty 提供了抽象基類 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。通過調 用 ChannelHandlerContext 上的對應方法,每個都提供了簡單地将事件傳遞給下一個 ChannelHandler 的方法的實作。随後,你可以通過重寫你所感興趣的那些方法來擴充這些類。

在Netty中,有兩種發送消息的方式。

你可以直接寫到Channel中,

也可以 寫到和Channel- Handler 相關聯的 ChannelHandlerContext 對象中。

前一種方式将會導緻消息從 Channel- Pipeline 的尾端開始流動,而後者将導緻消息從 ChannelPipeline 中的下一個 Channel- Handler 開始流動。

3. 更加深入地了解 ChannelHandler

正如我們之前所說的,有許多不同類型的 ChannelHandler,它們各自的功能主要取決于 它們的超類。Netty 以擴充卡類的形式提供了大量預設的 ChannelHandler 實作,其旨在簡化 應用程式處理邏輯的開發過程。你已經看到了,ChannelPipeline 中的每個 ChannelHandler 将負責把事件轉發到鍊中的下一個 ChannelHandler。這些擴充卡類(及它們的子類)将自動 執行這個操作,是以你可以隻重寫那些你想要特殊處理的方法和事件。

為什麼需要擴充卡類

有一些擴充卡類可以将編寫自定義的 ChannelHandler 所需要的努力降到最低限度,因為它們提 供了定義在對應接口中的所有方法的預設實作。

下面這些是編寫自定義 ChannelHandler 時經常會用到的擴充卡類:

 ChannelHandlerAdapter

 ChannelInboundHandlerAdapter

 ChannelOutboundHandlerAdapter

 ChannelDuplexHandler

4.編碼器和解碼器

當你通過 Netty 發送或者接收一個消息的時候,就将會發生一次資料轉換。入站消息會被解 碼;也就是說,從位元組轉換為另一種格式,通常是一個 Java 對象。

如果是出站消息,則會發生 相反方向的轉換:它将從它的目前格式被編碼為位元組。這兩種方向的轉換的原因很簡單:網絡數 據總是一系列的位元組。

對應于特定的需要,Netty為編碼器和解碼器提供了不同類型的抽象類。例如,你的應用程式可能使用了一種中間格式,而不需要立即将消息轉換成位元組。你将仍然需要一個編碼器,但是 它将派生自一個不同的超類。為了确定合适的編碼器類型,你可以應用一個簡單的命名約定。

通常來說,這些基類的名稱将類似于 ByteToMessageDecoder 或 MessageToByte- Encoder。對于特殊的類型,你可能會發現類似于 ProtobufEncoder 和 ProtobufDecoder 這樣的名稱——預置的用來支援 Google 的 Protocol Buffers。

5.抽象類 SimpleChannelInboundHandler

在這種類型的 ChannelHandler 中,最重要的方法是 channelRead0(Channel- HandlerContext,T)。除了要求不要阻塞目前的 I/O 線程之外,其具體實作完全取決于你。

3.3 引導

Netty 的引導類為應用程式的網絡層配置提供了容器,這涉及将一個程序綁定到某個指定的 端口,或者将一個程序連接配接到另一個運作在某個指定主機的指定端口上的程序。

通常來說,我們把前面的用例稱作引導一個伺服器,後面的用例稱作引導一個用戶端。雖然 這個術語簡單友善,但是它略微掩蓋了一個重要的事實,即“伺服器”和“用戶端”實際上表示 了不同的網絡行為; 換句話說,是監聽傳入的連接配接還是建立到一個或者多個程序的連接配接。

是以,有兩種類型的引導: 一種用于用戶端(簡單地稱為 Bootstrap),而另一種 (ServerBootstrap)用于伺服器。

表 3-1 比較了這兩種 類型的引導類。

引導一個用戶端隻需要一個 EventLoopGroup,但是一個 ServerBootstrap 則需要兩個(也可以是同一個執行個體)。為什麼呢?

因為伺服器需要兩組不同的 Channel。

第一組将隻包含一個 ServerChannel,代表服務 器自身的已綁定到某個本地端口的正在監聽的套接字。

而第二組将包含所有已建立的用來處理傳 入用戶端連接配接(對于每個伺服器已經接受的連接配接都有一個)的 Channel。

圖 3-4 說明了這個模 型,并且展示了為何需要兩個不同的 EventLoopGroup。

與 ServerChannel 相關聯的 EventLoopGroup 将負責配置設定一個為連接配接請求建立 Channel 的 EventLoop。一旦連接配接被接受,第二個 EventLoopGroup 就會給它的 Channel 配置設定一個 EventLoop。

四、傳輸

在本章中,我們将研究:

1.Netty傳輸、它們的實作和使用,以及 Netty 是如何将它們呈現給開發者的。

2.深入探讨了 Netty 預置的傳輸,并且解釋了它們的行為。

3.如何比對不同的傳輸和特定用例的需求。

本章主要内容

 OIO——阻塞傳輸

 NIO——異步傳輸

 Local——JVM 内部的異步通信

 Embedded——測試你的ChannelHandler

流經網絡的資料總是具有相同的類型:位元組。這些位元組是如何流動的主要取決于我們所說的 網絡傳輸— 一個幫助我們抽象底層資料傳輸機制的概念。使用者并不關心這些細節;他們隻想确 保他們的位元組被可靠地發送和接收。

如果你有 Java 網絡程式設計的經驗,那麼你可能已經發現,在某些時候,你需要支撐比預期多 很多的并發連接配接。如果你随後嘗試從阻塞傳輸切換到非阻塞傳輸,那麼你可能會因為這兩種網絡 API 的截然不同而遇到問題。

然而,Netty 為它所有的傳輸實作提供了一個通用 API,這使得這種轉換比你直接使用 JDK 所能夠達到的簡單得多。所産生的代碼不會被實作的細節所污染,而你也不需要在你的整個代碼 庫上進行廣泛的重構。簡而言之,你可以将時間花在其他更有成效的事情上。

4.1 案例研究:傳輸遷移

1.不通過 Netty 使用 OIO 和 NIO

/** * 代碼清單 4-1 未使用 Netty 的阻塞網絡程式設計 * * @author xuxh * @date 2021/03/07 11:27 */ public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); try { for (; ; ) { final Socket clientSocket = socket.accept(); System.out.println( "Accepted connection from " + clientSocket); new Thread(new Runnable() { @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes( Charset.forName("UTF-8"))); out.flush(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); } } catch ( IOException e) { e.printStackTrace(); } } }

/** * 代碼清單 4-2 未使用 Netty 的異步網絡程式設計 * * @author xuxh * @date 2021/03/07 11:32 */ public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ssocket = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ssocket.bind(address); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); for (; ; ) { try { // 等待需要處理的新事 件;阻塞 将一直持續到 下一個傳入事件 selector.select(); } catch (IOException ex) { ex.printStackTrace(); // handle exception break; } // 擷取所有接 收事件的 Selection- Key 執行個體 Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { // 檢查事件是否是一 個新的已經就緒可 以被接受的連接配接 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); // 接受用戶端,并将它注冊到選擇器 client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); System.out.println("Accepted connection from " + client); } // 檢查套接字是否已經準備好寫資料 if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { // 将資料寫到已連接配接的用戶端 if (client.write(buffer) == 0) { break; } } client.close(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // ignore on close } } } } } }

如同你所看到的,雖然這段代碼所做的事情與之前的版本完全相同,但是代碼卻截然不同。 如果為了用于非阻塞 I/O 而重新實作這個簡單的應用程式,都需要一次完全的重寫的話,那麼不 難想象,移植真正複雜的應用程式需要付出什麼樣的努力。

鑒于此,讓我們來看看使用 Netty 實作該應用程式将會是什麼樣子吧。

4.1.2 通過 Netty 使用 OIO 和 NIO

/** * 代碼清單 4-3 使用 Netty 的阻塞網絡處理 * * @author xuxh * @date 2021/03/07 21:15 */ public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); EventLoopGroup group = new OioEventLoopGroup(); try { // 建立 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 OioEventLoopGroup 以允許阻塞模式 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,對于 每個已接受的 連接配接都調用它 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一個 Channel- InboundHandler- Adapter 以攔截和 處理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息寫到用戶端,并添 加 ChannelFutureListener, 以便消息一被寫完就關閉 連接配接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //綁定伺服器 以接受連接配接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 釋放所有的資源 group.shutdownGracefully().sync(); } } }

/** * 代碼清單 4-4 使用 Netty 的異步網絡處理 * * @author xuxh * @date 2021/03/07 21:40 */ public class NettyNioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); // 使用的是 NIO 傳輸,是以指定 了NioEventLoopGroup來接受和處理新的連接配接 EventLoopGroup group = new NioEventLoopGroup(); try { // 建立 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 NioEventLoopGroup 非阻塞模式 .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,對于 每個已接受的 連接配接都調用它 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一個 Channel- InboundHandler- Adapter 以攔截和 處理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息寫到用戶端,并添 加 ChannelFutureListener, 以便消息一被寫完就關閉 連接配接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //綁定伺服器 以接受連接配接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 釋放所有的資源 group.shutdownGracefully().sync(); } } }

因為 Netty 為每種傳輸的實作都暴露了相同的 API,是以無論選用哪一種傳輸的實作,你的 代碼都仍然幾乎不受影響。在所有的情況下,傳

4.2 傳輸 API

傳輸 API 的核心是 interface Channel,它被用于所有的 I/O 操作。Channel 類的層次結構如圖 4-1 所示。

如圖所示,每個 Channel 都将會被配置設定一個 ChannelPipeline 和 ChannelConfig。

ChannelConfig 包含了該 Channel 的所有配置設定,并且支援熱更新。由于特定的傳輸可能具有獨特的設定,是以它可能會實作一個 ChannelConfig 的子類型。(請參考 ChannelConfig 實作對應的 Javadoc。)

ChannelPipeline 持有所有應用于入站資料和出站資料以及事件的 ChannelHandler 實 例,這些 ChannelHandler 實作了應用程式處理狀态變化以及資料處理的邏輯。

由于 Channel 是獨一無二的,是以為了保證順序将 Channel 聲明為 java.lang. Comparable 的一個子接口。是以,如果兩個不同的 Channel 執行個體都傳回了相同的散列碼,那 麼 AbstractChannel 中的 compareTo()方法的實作将會抛出一個 Error。

ChannelHandler 的典型用途包括:

将資料從一種格式轉換為另一種格式;

提供異常的通知;

提供 Channel 變為活動的或者非活動的通知;

提供當 Channel 注冊到 EventLoop 或者從 EventLoop 登出時的通知;

提供有關使用者自定義事件的通知。

你也可以根據需要通過添加或者移除ChannelHandler執行個體來修改ChannelPipeline。通過利用Netty的這項能力可以建構出高度靈活的應用程式。

除了通路所配置設定的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel 的其他方法,其中最重要的列舉在表 4-1 中。

稍後我們将進一步深入地讨論所有這些特性的應用。目前,請記住,Netty 所提供的廣泛功 能隻依賴于少量的接口。這意味着,你可以對你的應用程式邏輯進行重大的修改,而又無需大規 模地重構你的代碼庫。

Netty 的 Channel 實作是線程安全的,是以你可以存儲一個到 Channel 的引用,并且每當 你需要向遠端節點寫資料時,都可以使用它,即使當時許多線程都在使用它。

4.3 内置的傳輸

Netty 内置了一些可開箱即用的傳輸。因為并不是它們所有的傳輸都支援每一種協定,是以 你必須選擇一個和你的應用程式所使用的協定相容的傳輸。

在本節中我們将讨論這些關系。表 4-2 顯示了所有 Netty 提供的傳輸。

4.3.1 NIO——非阻塞 I/O

NIO 提供了一個所有 I/O 操作的全異步的實作。它利用了自 NIO 子系統被引入 JDK 1.4 時便 可用的基于選擇器的 API。

選擇器背後的基本概念是充當一個系統資料庫,,當 Channel 的狀态發生變化時, 在選擇器可以得到通知。

Channel可能的狀态變化有:

新的 Channel 已被接受并且就緒;

Channel 連接配接已經完成;

Channel 有已經就緒的可供讀取的資料;

Channel 可用于寫資料。

選擇器運作在一個檢查狀态變化并對其做出相應響應的線程上,在應用程式對狀态的變化做出響應之後,選擇器将會被重置,并将重複這個過程。

表 4-3 中的常量值代表了由 class java.nio.channels.SelectionKey 定義的位模式。這些位模式可以組合起來定義一組應用程式正在請求通知的狀态變化集。

對于所有傳輸都共有的使用者級别 API ,Netty完全地隐藏了這些 NIO 的内部細節。 圖 4-2 展示了該處理流程。

零拷貝

零拷貝(zero-copy)是一種目前隻有在使用 NIO 和 Epoll 傳輸時才可使用的特性。它使你可以快速 高效地将資料從檔案系統移動到網絡接口,而不需要将其從核心空間複制到使用者空間,在像 FTP 或者 HTTP 這樣的協定中可以顯著地提升性能。但是,并不是所有的作業系統都支援這一特性。特别地,它對于實作了資料加密或者壓縮的檔案系統是不可用的——隻能傳輸檔案的原始内容。

4.3.2 Epoll — 用于 Linux 的本地非阻塞傳輸4.3.2 Epoll— 用于 Linux 的本地非阻塞傳輸

正如我們之前所說的,Netty 的 NIO 傳輸基于 Java 提供的異步/非阻塞網絡程式設計的通用抽象。 雖然這保證了 Netty 的非阻塞 API 可以在任何平台上使用,但它也包含了相應的限制,因為 JDK 為了在所有系統上提供相同的功能,必須做出妥協。

Linux作為高性能網絡程式設計的平台,其重要性與日俱增,這催生了大量先進特性的開發,其中包括epoll——一個高度可擴充的I/O事件通知特性。這個API自Linux核心版本 2.5.44(2002)被 引入,提供了比舊的POSIX select和poll系統調用 1更好的性能,同時現在也是Linux上非阻 塞網絡程式設計的事實标準。Linux JDK NIO API使用了這些epoll調用。

Netty為Linux提供了一組NIO API,它以一種和它本身的設計更加一緻的方式使用epoll,并且以一種更加輕量的方式使用中斷。如果你的應用程式隻運作于Linux系統,那麼請考慮利用 這個版本的傳輸; 你将發現在高負載下它的性能要優于JDK的NIO實作。

這個傳輸的語義與在圖 4-2 所示的完全相同,而且它的用法也是簡單直接的。相關示例參照 代碼清單 4-4。如果要在那個代碼清單中使用 epoll 替代 NIO,隻需要将 NioEventLoopGroup 替換為 EpollEventLoopGroup,并且将 NioServerSocketChannel.class 替換為 EpollServerSocketChannel.class 即可。

4.3.3 OIO — 舊的阻塞 I/O

Netty 的 OIO 傳輸實作代表了一種折中: 它可以通過正常的傳輸 API 使用,但是由于它是建立在 java.net 包的阻塞實作之上的,是以它不是異步的。但是,它仍然非常适合于某些用途。

有了這個背景,你可能會想,Netty是如何能夠使用和用于異步傳輸相同的API來支援OIO的呢。 答案就是,Netty利用了SO_TIMEOUT這個Socket标志,它指定了等待一個I/O操作完成的最大毫秒數。如果操作在指定的時間間隔内沒有完成,則将會抛出一個SocketTimeout Exception。Netty 将捕獲這個異常并繼續處理循環。在EventLoop下一次運作時,它将再次嘗試。這實際上也是類似于Netty這樣的異步架構能夠支援OIO的唯一方式。

這種方式的一個問題是,當一個SocketTimeoutException被抛出時填充棧跟蹤所需要的時間,其對于性能來說代價很大。

圖 4-3 說明了這個邏輯。

4.3.4 用于 JVM 内部通信的 Local 傳輸

Netty 提供了一個 Local 傳輸,用于在同一個 JVM 中運作的用戶端和伺服器程式之間的異步 通信。同樣,這個傳輸也支援其他傳輸共同的API。

在這個傳輸中,和伺服器 Channel 相關聯的 SocketAddress 并沒有綁定實體網絡位址; 相反,隻要伺服器還在運作,它就會被存儲在系統資料庫裡,并在 Channel 關閉時登出。因為這個 傳輸并不接受真正的網絡流量,是以它并不能夠和其他傳輸實作進行互操作。是以,在同一個 JVM 中,用戶端希望 連接配接到使用了這個傳輸的伺服器端時必須使用它。除了這個限制,它的使用方式和其他的傳輸一模一樣。

4.3.5 Embedded 傳輸

Netty 提供了一種額外的傳輸,使得你可以将一組 ChannelHandler 作為幫助器類嵌入到 其他的 ChannelHandler 内部。通過這種方式,你将可以擴充一個 ChannelHandler 的功能, 而又不需要修改其内部代碼。

不足為奇的是,Embedded 傳輸的關鍵是一個具體的 Channel 的實作:EmbeddedChannel。在第 9 章中,我們将詳細地讨論如何使用這個類來為 ChannelHandler 的實作建立單元 測試用例。

4.4 傳輸的用例

既然我們已經詳細地了解了所有的傳輸,那麼讓我們考慮一下選用一個适用于特定用途的協定的因素吧。正如前面所提到的,并不是所有的傳輸都支援所有的核心協定。

表 4-4 展示了截止出版時的傳輸和其所支援的協定。

雖然隻有 SCTP 傳輸有這些特殊要求,但是其他傳輸可能也有它們自己的配置選項需要考慮。 此外,如果隻是為了支援更高的并發連接配接數,伺服器平台可能需要配置得和用戶端不一樣。

這裡是一些你很可能會遇到的用例。

非阻塞代碼庫——如果你的代碼庫中沒有阻塞調用(或者你能夠限制它們的範圍),那麼在 Linux 上使用 NIO 或者 epoll 始終是個好主意。雖然 NIO/epoll 旨在處理大量的并發連 接,但是在處理較小數目的并發連接配接時,它也能很好地工作,尤其是考慮到它在連接配接之 間共享線程的方式。

阻塞代碼庫——正如我們已經指出的,如果你的代碼庫嚴重地依賴于阻塞 I/O,而且你的應 用程式也有一個相應的設計,那麼在你嘗試将其直接轉換為 Netty 的 NIO 傳輸時,你将可 能會遇到和阻塞操作相關的問題。不要為此而重寫你的代碼,可以考慮分階段遷移:先從 OIO 開始,等你的代碼修改好之後,再遷移到 NIO(或者使用 epoll,如果你在使用 Linux)。

在同一個 JVM 内部的通信——在同一個 JVM 内部的通信,不需要通過網絡暴露服務,是 Local 傳輸的完美用例。這将消除所有真實網絡操作的開銷,同時仍然使用你的 Netty 代碼 庫。如果随後需要通過網絡暴露服務,那麼你将隻需要把傳輸改為 NIO 或者 OIO 即可。

測試你的 ChannelHandler 實作——如果你想要為自己的 ChannelHandler 實作編 寫單元測試,那麼請考慮使用 Embedded 傳輸。這既便于測試你的代碼,而又不需要建立大 量的模拟(mock)對象。你的類将仍然符合正常的 API 事件流,保證該 ChannelHandler 在和真實的傳輸一起使用時能夠正确地工作。

表 4-5 總結了我們探讨過的用例。

5. ByteBuf

本章專門探讨了 Netty 的基于 ByteBuf 的資料容器。我們讨論過的要點有:

使用不同的讀索引和寫索引來控制資料通路;

使用記憶體的不同方式——基于位元組數組和直接緩沖區;

通過 CompositeByteBuf 生成多個 ByteBuf 的聚合視圖;

資料通路方法——搜尋、切片以及複制;

讀、寫、擷取和設定 API;

記憶體配置設定:ByteBufAllocator 池化和引用計數。

網絡資料的基本機關總是位元組。Java NIO 提供了 ByteBuffer 作為它 的位元組容器,但是這個類使用起來過于複雜,而且也有些繁瑣。

Netty 的 ByteBuffer 替代品是 ByteBuf,一個強大的實作,既解決了 JDK API 的局限性, 又為網絡應用程式的開發者提供了更好的 API。

5.1 ByteBuf 的 API

Netty 的資料處理 API 通過兩個元件暴露——abstract class ByteBuf 和 interface ByteBufHolder。

下面是一些 ByteBuf API 的優點:

它可以被使用者自定義的緩沖區類型擴充;

通過内置的複合緩沖區類型實作了透明的零拷貝;

容量可以按需增長(類似于 JDK 的 StringBuilder);

在讀和寫這兩種模式之間切換不需要調用 ByteBuffer 的 flip()方法;

讀和寫使用了不同的索引;

支援方法的鍊式調用;

支援引用計數;

支援池化。

5.2 ByteBuf 類——Netty 的資料容器

5.2.1 它是如何工作的

ByteBuf 維護了兩個不同的索引:一個用于讀取,一個用于寫入。當你從 ByteBuf 讀取時, 它的 readerIndex 将會被遞增已經被讀取的位元組數。同樣地,當你寫入 ByteBuf 時,它的 writerIndex 也會被遞增。圖 5-1 展示了一個空 ByteBuf 的布局結構和狀态。

要了解這些索引兩兩之間的關系,請考慮一下,如當 readerIndex 達到 和 writerIndex 同樣的值時會發生什麼。在那時,你将會到達“可以讀取的”資料的末尾。就 如同試圖讀取超出數組末尾的資料一樣,試圖讀取超出該點的資料将會觸發一個 IndexOutOf- BoundsException。

名稱以 read 或者 write 開頭的 ByteBuf 方法,将會推進其對應的索引,而名稱以 set 或 者 get 開頭的操作則不會。可以指定 ByteBuf 的最大容量。試圖移動寫索引(即 writerIndex)超過這個值将會觸發一個異常1。(預設的限制是 Integer.MAX_VALUE。)

5.2.2 ByteBuf 的使用模式

1.堆緩沖區

最常用的 ByteBuf 模式是将資料存儲在 JVM 的堆空間中。這種模式被稱為支撐數組 (backing array),它能在沒有使用池化的情況下提供快速的配置設定和釋放。這種方式非常适合于有遺留的資料需要處理的情況,如代碼清單5-1 所示。

2.直接緩沖區

直接緩沖區是另外一種 ByteBuf 模式。NIO 在 JDK 1.4 中引入的 ByteBuffer 類允許 JVM 實作通過本地調用來配置設定記憶體。

這主要是為了避免在每次調用本地 I/O 操作之前(或者之後)将緩沖區的内容複制到一個中間緩沖區,或者從中間緩沖區把内容複制到緩沖區。

ByteBuffer的Javadoc1明确指出:“直接緩沖區的内容将駐留在正常的會被垃圾回收的堆之外。”這也就解釋了為何直接緩沖區對于網絡資料傳輸是理想的選擇。

如果你的資料包含在一 個在堆上配置設定的緩沖區中,事實上,在通過套接字發送它之前,JVM将會在内部把你的緩沖 區複制到一個直接緩沖區中。

直接緩沖區的主要缺點是,相對于基于堆的緩沖區,它們的配置設定和釋放都較為昂貴。

如果你 正在處理遺留代碼,你也可能會遇到另外一個缺點:因為資料不是在堆上,是以你不得不進行一 次複制,如代碼清單 5-2 所示。

顯然,與使用支撐數組相比,這涉及的工作更多。是以,如果事先知道容器中的資料将會被作為數組來通路,使用堆記憶體更合适。

3.複合緩沖區

第三種也是最後一種模式使用的是複合緩沖區,它為多個 ByteBuf 提供一個聚合視圖。在 這裡可以根據需要添加或者删除 ByteBuf 執行個體,這是一個 JDK 的 ByteBuffer 實作完全缺失的特性。

Netty 通過一個 ByteBuf 子類——CompositeByteBuf——實作了這個模式,它提供了一個将多個緩沖區表示為單個合并緩沖區的虛拟表示。

為了舉例說明,讓我們考慮一下一個由兩部分——頭部和主體——組成的将通過 HTTP 協定 傳輸的消息。這兩部分由應用程式的不同子產品産生,将會在消息被發送的時候組裝。該應用程式 可以選擇為多個消息重用相同的消息主體。當這種情況發生時,對于每個消息都将會建立一個新 的頭部。

因為我們不想為每個消息都重新配置設定這兩個緩沖區,是以使用 CompositeByteBuf 是一個 完美的選擇。它在消除了沒必要的複制的同時,暴露了通用的 ByteBuf API。

圖 5-2 展示了生成 的消息布局。

警告

CompositeByteBuf中的ByteBuf執行個體可能同時包含直接記憶體配置設定和非直接記憶體配置設定。 如果其中隻有一個執行個體,那麼對 CompositeByteBuf 上的 hasArray()方法的調用将傳回該組 件上的 hasArray()方法的值;否則它将傳回 false。

需要注意的是,Netty使用了CompositeByteBuf來優化套接字的I/O操作,盡可能地消除了 由JDK的緩沖區實作所導緻的性能以及記憶體使用率的懲罰。1這種優化發生在Netty的核心代碼中, 是以不會被暴露出來,但是你應該知道它所帶來的影響。

5.3 位元組級操作

ByteBuf 提供了許多超出基本讀、寫操作的方法用于修改它的資料。在接下來我們将會讨論這些中最重要的部分。

5.3.1 随機通路索引

如同在普通的 Java 位元組數組中一樣,ByteBuf 的索引是從零開始的:第一個位元組的索引是 0,最後一個位元組的索引總是 capacity() - 1。

代碼清單 5-6 表明,對存儲機制的封裝使得遍 曆 ByteBuf 的内容非常簡單。

代碼清單 5-6 通路資料

ByteBuf buffer = ...; for (int i = 0; i < buffer.capacity(); i++) { byte b = buffer.getByte(i); System.out.println((char)b); }

需要注意的是,使用那些需要一個索引值參數的方法(的其中)之一來通路資料既不會改變 readerIndex 也不會改變 writerIndex。

5.3.2 順序通路索引

雖然 ByteBuf 同時具有讀索引和寫索引,但是 JDK 的 ByteBuffer 卻隻有一個索引,這 也就是為什麼必須調用 flip()方法來在讀模式和寫模式之間進行切換的原因。圖 5-3 展示了 ByteBuf 是如何被它的兩個索引劃分成 3 個區域的。

5.3.3 可丢棄位元組

在圖 5-3 中标記為可丢棄位元組的分段包含了已經被讀過的位元組。通過調用 discardRead- Bytes()方法,可以丢棄它們并回收空間。這個分段的初始大小為 0,存儲在 readerIndex 中, 會随着 read 操作的執行而增加(get*操作不會移動 readerIndex)。

圖 5-4 展示了圖 5-3 中所展示的緩沖區上調用discardReadBytes()方法後的結果。可以看 到,可丢棄位元組分段中的空間已經變為可寫的了。注意,在調用discardReadBytes()之後,對 可寫分段的内容并沒有任何的保證。因為隻是移動了可以讀取的位元組以及writerIndex,而沒有對所有可寫入的位元組進行擦除寫。

雖然你可能會傾向于頻繁地調用 discardReadBytes()方法以確定可寫分段的最大化,但是 請注意,這将極有可能會導緻記憶體複制,因為可讀位元組(圖中标記為 CONTENT 的部分)必須被移 動到緩沖區的開始位置。我們建議隻在有真正需要的時候才這樣做,例如,當記憶體非常寶貴的時候。

5.3.4 可讀位元組

ByteBuf 的可讀位元組分段存儲了實際資料。

新配置設定的、包裝的或者複制的緩沖區的預設的 readerIndex 值為 0。

任何名稱以 read 或者 skip 開頭的操作都将檢索或者跳過位于目前 readerIndex 的資料,并且将它增加已讀位元組數。

如果被調用的方法需要一個 ByteBuf 參數作為寫入的目标,并且沒有指定目标索引參數, 那麼該目标緩沖區的 writerIndex 也将被增加,例如:

readBytes(ByteBuf dest);

如果嘗試在緩沖區的可讀位元組數已經耗盡時從中讀取資料,那麼将會引發一個 IndexOutOf- BoundsException。

5.3.5 可寫位元組

可寫位元組分段是指一個擁有未定義内容的、寫入就緒的記憶體區域。

新配置設定的緩沖區的 writerIndex 的預設值為 0。

任何名稱以 write 開頭的操作都将從目前的 writerIndex 處 開始寫資料,并将它增加已經寫入的位元組數。

如果寫操作的目标也是 ByteBuf,并且沒有指定 源索引的值,則源緩沖區的 readerIndex 也同樣會被增加相同的大小。這個調用如下所示:

writeBytes(ByteBuf dest);

如果嘗試往目标寫入超過目标容量的資料,将會引發一個IndexOutOfBoundException1。

5.3.6 索引管理

JDK 的 InputStream 定義了 mark(int readlimit)和 reset()方法,這些方法分别 被用來将流中的目前位置标記為指定的值,以及将流重置到該位置。

同樣,可以通過調用 markReaderIndex()、markWriterIndex()、resetWriterIndex() 和 resetReaderIndex()來标記和重置 ByteBuf 的 readerIndex 和 writerIndex。這些和 InputStream 上的調用類似,隻是沒有 readlimit 參數來指定标記什麼時候失效。

也可以通過調用 readerIndex(int)或者 writerIndex(int)來将索引移動到指定位置。試 圖将任何一個索引設定到一個無效的位置都将導緻一個 IndexOutOfBoundsException。

可以通過調用 clear()方法來将 readerIndex 和 writerIndex 都設定為 0。注意,這并不會清除記憶體中的内容。圖 5-5(重複上面的圖 5-3)展示了它是如何工作的。

和之前一樣,ByteBuf 包含 3 個分段。圖 5-6 展示了在 clear()方法被調用之後 ByteBuf 的狀态。

5.3.7 查找操作

在ByteBuf中有多種可以用來确定指定值的索引的方法。最簡單的是使用indexOf()方法。 較複雜的查找可以通過那些需要一個ByteBufProcessor1作為參數的方法達成。這個接口隻定 義了一個方法:

boolean process(byte value)

它将檢查輸入值是否是正在查找的值。 ByteBufProcessor針對一些常見的值定義了許多便利的方法。假設你的應用程式需要和

所謂的包含有以NULL結尾的内容的Flash套接字 2內建。調用 forEachByte(ByteBufProcessor.FIND_NUL)

将簡單高效地消費該 Flash 資料,因為在處理期間隻會執行較少的邊界檢查。

5.3.8 派生緩沖區

派生緩沖區為 ByteBuf 提供了以專門的方式來呈現其内容的視圖。這類視圖是通過以下方 法被建立的:

duplicate();

slice();

slice(int, int);

Unpooled.unmodifiableBuffer(...);

order(ByteOrder);

readSlice(int)。

每個這些方法都将傳回一個新的 ByteBuf 執行個體,它具有自己的讀索引、寫索引和标記

索引。其内部存儲和 JDK 的 ByteBuffer 一樣也是共享的。這使得派生緩沖區的建立成本 是很低廉的,但是這也意味着,如果你修改了它的内容,也同時修改了其對應的源執行個體,所 以要小心。

ByteBuf複制

如果需要一個現有緩沖區的真實副本,請使用 copy()或者 copy(int, int)方 法。不同于派生緩沖區,由這個調用所傳回的 ByteBuf 擁有獨立的資料副本。

5.3.9 讀/寫操作

正如我們所提到過的,有兩種類别的讀/寫操作:

get()和 set()操作,從給定的索引開始,并且保持索引不變;

read()和 write()操作,從給定的索引開始,并且會根據已經通路過的位元組數對索引進行調整。

表 5-1 列舉了最常用的 get()方法。完整清單請參考對應的 API 文檔。

大多數的這些操作都有一個對應的 set()方法。這些方法在表 5-2 中列出。

代碼清單 5-12 說明了 get()和 set()方法的用法,表明了它們不會改變讀索引和寫索引。

現在,讓我們研究一下 read()操作,其作用于目前的 readerIndex 或 writerIndex。 這些方法将用于從 ByteBuf 中讀取資料,如同它是一個流。表 5-3 展示了最常用的方法。

幾乎每個 read()方法都有對應的 write()方法,用于将資料追加到 ByteBuf 中。注意,表 5-4 中所列出的這些方法的參數是需要寫入的值,而不是索引值。

5.3.10 更多的操作

表5-5 列舉了由ByteBuf提供的其他有用操作。

表 5-5 其他有用的操作

名稱 描述

isReadable() 如果至少有一個位元組可供讀取,則傳回 true

isWritable() 如果至少有一個位元組可被寫入,則傳回 true

readableBytes() 傳回可被讀取的位元組數

writableBytes() 傳回可被寫入的位元組數

capacity() 傳回 ByteBuf 可容納的位元組數。在此之後,它會嘗試再次擴充直 到達到 maxCapacity()

maxCapacity() 傳回 ByteBuf 可以容納的最大位元組數

hasArray() 如果 ByteBuf 由一個位元組數組支撐,則傳回 true

array() 如果 ByteBuf 由一個位元組數組支撐則傳回該數組;否則,它将抛出一個 UnsupportedOperationException 異常

5.4 ByteBufHolder 接口

我們經常發現,除了實際的資料負載之外,我們還需要存儲各種屬性值。HTTP 響應便是一 個很好的例子,除了表示為位元組的内容,還包括狀态碼、cookie 等。

為了處理這種常見的用例,Netty 提供了 ByteBufHolder。ByteBufHolder 也為 Netty 的 進階特性提供了支援,如緩沖區池化,其中可以從池中借用 ByteBuf,并且在需要時自動釋放。

ByteBufHolder 隻有幾種用于通路底層資料和引用計數的方法 。表 5-6 列出了它們(這裡不包括它繼承自 ReferenceCounted 的那些方法)。

表5-6 ByteBufHolder的操作

content() 傳回由這個 ByteBufHolder 所持有的 ByteBuf

copy() 傳回這個 ByteBufHolder 的一個深拷貝,包括一個其所包含的 ByteBuf 的非共享拷貝

duplicate() 傳回這個ByteBufHolder的一個淺拷貝,包括一個其所包含的ByteBuf的共享拷貝

5.5 ByteBuf 配置設定

在這一節中,我們将描述管理 ByteBuf 執行個體的不同方式。

5.5.1 按需配置設定:ByteBufAllocator 接口

為了降低配置設定和釋放記憶體的開銷,Netty 通過 interface ByteBufAllocator 實作了 (ByteBuf 的)池化,它可以用來配置設定我們所描述過的任意類型的 ByteBuf 執行個體。

使用池化是特定于應用程式的決定,其并不會以任何方式改變 ByteBuf API(的語義)。

表5-7 列出了ByteBufAllocator提供的一些操作。

可以通過 Channel(每個都可以有一個不同的 ByteBufAllocator 執行個體)或者綁定到 ChannelHandler 的 ChannelHandlerContext 擷取一個到 ByteBufAllocator 的引用。

代碼清單 5-14 說明了這兩種方法。

Channel channel = ...; ByteBufAllocator allocator = channel.alloc(); .... ChannelHandlerContext ctx = ...; ByteBufAllocator allocator2 = ctx.alloc();

Netty提供了兩種ByteBufAllocator的實作: PooledByteBufAllocator和UnpooledByteBufAllocator。

前者池化了ByteBuf的執行個體以提高性能并最大限度地減少記憶體碎片。

後者的實作不池化ByteBuf執行個體,并且在每次它被調用時都會傳回一個新的執行個體。

雖然Netty4.1.x預設使用了PooledByteBufAllocator,但這可以通過ChannelConfig API或者在引導應用程式時指定不同的配置設定器來更改。

5.5.2 Unpooled 緩沖區

可能某些情況下,你未能擷取一個到 ByteBufAllocator 的引用。對于這種情況,Netty 提 供了一個簡單的稱為 Unpooled 的工具類,它提供了靜态的輔助方法來建立未池化的 ByteBuf 執行個體。表 5-8 列舉了這些中最重要的方法。

5.5.3 ByteBufUtil 類

ByteBufUtil 提供了用于操作 ByteBuf 的靜态的輔助方法。

這些靜态方法中最有價值的可能就是 hexdump()方法,它以十六進制的表示形式列印 ByteBuf 的内容。這在各種情況下都很有用,例如,列印調試ByteBuf 的内容。

另一個有用的方法是 boolean equals(ByteBuf, ByteBuf),它被用來判斷兩個 ByteBuf 執行個體的相等性。

5.6 引用計數

引用計數是一種通過在某個對象所持有的資源不再被其他對象引用時釋放該對象所持有的資源來優化記憶體使用和性能的技術。

引用計數對于池化實作(如 PooledByteBufAllocator)來說是至關重要的,它降低了 記憶體配置設定的開銷。

試圖通路一個已經被釋放的引用計數的對象,将會導緻一個 IllegalReferenceCount- Exception。

注意,一個特定的(ReferenceCounted 的實作)類,可以用它自己的獨特方式來定義它 的引用計數規則。例如,我們可以設想一個類,其 release()方法的實作總是将引用計數設為 零,而不用關心它的目前值,進而一次性地使所有的活動引用都失效。

誰負責釋放

一般來說,是由最後通路(引用計數)對象的那一方來負責将它釋放。在第 6 章中, 我們将會解釋這個概念和 ChannelHandler 以及 ChannelPipeline 的相關性。

第 6 章 ChannelHandler 和ChannelPipeline

本章中,我們将專注于 ChannelHandler,它為你的資料處理邏輯提供了載體。

因為ChannelHandler 大量地使用了 ByteBuf,你将開始看到 Netty 的整體架構的各個重要部分最 終走到了一起。

 ChannelHandler API 和 ChannelPipeline API

 檢測資源洩漏

 異常處理

6.1 ChannelHandler 家族

6.1.1 Channel 的生命周期

Interface Channel 定義了一組和 ChannelInboundHandler API 密切相關的簡單但 功能強大的狀态模型,表 6-1 列出了 Channel 的這 4 個狀态。

表6-1 Channel的生命周期狀态

狀态 描述

ChannelUnregistered Channel 已經被建立,但還未注冊到 EventLoop

ChannelRegistered Channel 已經被注冊到了 EventLoop

ChannelActive Channel 處于活動狀态(已經連接配接到它的遠端節點)。它現在可以接收和發送資料了

ChannelInactive Channel 沒有連接配接到遠端節點

Channel 的正常生命周期如圖 6-1 所示。當這些狀态發生改變時,将會生成對應的事件。 這些事件将會被轉發給 ChannelPipeline 中的 ChannelHandler,其可以随後對它們做出響應。

6.1.2 ChannelHandler 的生命周期

表 6-2 中列出了 interface ChannelHandler 定義的生命周期操作,在 ChannelHandler 被添加到 ChannelPipeline 中或者被從 ChannelPipeline 中移除時會調用這些操作。這些 方法中的每一個都接受一個 ChannelHandlerContext 參數。表6-2 ChannelHandler的生命周期方法

類型 描述

handlerAdded 當把 ChannelHandler 添加到 ChannelPipeline 中時被調用

handlerRemoved 當從 ChannelPipeline 中移除 ChannelHandler 時被調用

exceptionCaught 當處理過程中在 ChannelPipeline 中有錯誤産生時被調用

Netty 定義了下面兩個重要的 ChannelHandler 子接口:

ChannelInboundHandler——處理入站資料以及各種狀态變化;

ChannelOutboundHandler——處理出站資料并且允許攔截所有的操作。

6.1.3 ChannelInboundHandler 接口

表 6-3 列出了 interface ChannelInboundHandler 的生命周期方法。這些方法将會在資料被接收時或者與其對應的 Channel 狀态發生改變時被調用。正如我們前面所提到的,這些 方法和 Channel 的生命周期密切相關。

表6-3 ChannelInboundHandler的方法

channelRegistered 當 Channel已經注冊到它的 EventLoop 并且能夠處理 I/O 時被調用

channelUnregistered 當 Channel從它的 EventLoop 登出并且無法處理任何 I/O 時被調用

channelActive 當 Channel處于活動狀态時被調用;Channel 已經連接配接/綁定并且已經就緒

channelInactive 當 Channel離開活動狀态并且不再連接配接它的遠端節點時被調用

channelReadComplete 當Channel上的一個讀操作完成時被調用 1

channelRead 當從 Channel 讀取資料時被調用

ChannelWritability- Changed 當Channel的可寫狀态發生改變時被調用。使用者可以確定寫操作不會完成得太快(以避免發生 OutOfMemoryError)或者可以在 Channel 變為再次可寫時恢複寫入。可以通過調用Channel的isWritable()方法來檢測 Channel 的可寫性。與可寫性相關的門檻值可以通過 Channel.config(). setWriteHighWaterMark()和 Channel.config().setWriteLowWater- Mark()方法來設定

userEventTriggered 當 ChannelnboundHandler.fireUserEventTriggered()方法被調 用時被調用,因為一個 POJO 被傳經了 ChannelPipeline

當某個 ChannelInboundHandler 的實作重寫 channelRead()方法時,它将負責顯式地 釋放與池化的 ByteBuf 執行個體相關的記憶體。Netty 為此提供了一個實用方法 ReferenceCount- Util.release(),如代碼清單 6-1 所示。

/** * 代碼清單 6-1 釋放消息資源 **/ @Sharable // 擴充了 Channel-InboundHandler- Adapter public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 丢棄已接 收的消息 ReferenceCountUtil.release(msg); } }

Netty 将使用 WARN 級别的日志消息記錄未釋放的資源,使得可以非常簡單地在代碼中發現違規的執行個體。但是以這種方式管理資源可能很繁瑣。一個更加簡單的方式是使用 SimpleChannelInboundHandler。代碼清單 6-2 是代碼清單 6-1 的一個變體,說明了這一點。

代碼清單 6-2 使用 SimpleChannelInboundHandler

@Sharable public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // No need to do anything special // 不需要任何顯式的資源釋放 } }

由于 SimpleChannelInboundHandler 會自動釋放資源,是以你不應該存儲指向任何消 息的引用供将來使用,因為這些引用都将會失效。

6.1.4 ChannelOutboundHandler 接口

出站操作和資料将由 ChannelOutboundHandler 處理。它的方法将被 Channel、ChannelPipeline 以及 ChannelHandlerContext 調用。ChannelOutboundHandler的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些複雜的方法來處理請求。例如,如果到遠端節點的寫入被暫停了,那麼你可以推遲沖 刷操作并在稍後繼續。

表 6-4 顯示了所有由 ChannelOutboundHandler 本身所定義的方法(忽略了那些從 Channel- Handler 繼承的方法)。

表6-4 ChannelOutboundHandler的方法

bind(ChannelHandlerContext,SocketAddress,ChannelPromise) 當請求将 Channel 綁定到本地位址時被調用

connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) 當請求将 Channel 連接配接到遠端節點時被調用

disconnect(ChannelHandlerContext,ChannelPromise) 當請求将 Channel 從遠端節點斷開時被調用

close(ChannelHandlerContext,ChannelPromise) 當請求關閉 Channel 時被調用

deregister(ChannelHandlerContext,ChannelPromise) 當請求将 Channel 從它的 EventLoop 登出 時被調用

read(ChannelHandlerContext) 當請求從 Channel 讀取更多的資料時被調用

flush(ChannelHandlerContext) 當請求通過 Channel 将入隊資料沖刷到遠端節點時被調用

write(ChannelHandlerContext,Object,ChannelPromise) 當請求通過 Channel 将資料寫到遠端節點時 被調用

ChannelPromise與ChannelFuture ChannelOutboundHandler中的大部分方法都需要一個ChannelPromise參數,以便在操作完成時得到通知。ChannelPromise是ChannelFuture的一個 子類,其定義了一些可寫的方法,如setSuccess()和setFailure(),進而使ChannelFuture不 可變 (這裡借鑒的是 Scala 的 Promise 和 Future 的設計,當一個 Promise 被完成之後,其對應的 Future 的值便 不能再進行任何修改了。)。

6.1.5 ChannelHandler 擴充卡

你可以使用 ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter類作為自己的 ChannelHandler的起始點。這兩個擴充卡分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本實作。通過擴充抽象類ChannelHandlerAdapter,它們獲得了它們共同的超接口ChannelHandler 的方法。生成的類的層次結構如圖 6-2 所示。

在 ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法體調用了其相關聯的ChannelHandlerContext上的等效方法,進而将事件轉發到 了 ChannelPipeline 中的下一個 ChannelHandler 中。你要想在自己的 ChannelHandler 中使用這些擴充卡類,隻需要簡單地擴充它們,并且重 寫那些你想要自定義的方法。

6.1.6 資源管理

每當通過調用 ChannelInboundHandler.channelRead()或者 ChannelOutbound- Handler.write()方法來處理資料時,你都需要確定沒有任何的資源洩漏。你可能還記得在前面的章節中所提到的,Netty 使用引用計數來處理池化的 ByteBuf。是以在完全使用完某個 ByteBuf 後,調整其引用計數是很重要的。

為了幫助你診斷潛在的(資源洩漏)問題,Netty提供了class ResourceLeakDetector1, 它将對你應用程式的緩沖區配置設定做大約 1%的采樣來檢測記憶體洩露。相關的開銷是非常小的。

如果檢測到了記憶體洩露,将會産生類似于下面的日志消息:

LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=ADVANCED' or call ResourceLeakDetector.setLevel().

Netty 目前定義了 4 種洩漏檢測級别,如表 6-5 所示。

表 6-5 洩漏檢測級别

級别 描述

DISABLED 禁用洩漏檢測。隻有在詳盡的測試之後才應設定為這個值

SIMPLE 使用 1%的預設采樣率檢測并報告任何發現的洩露。這是預設級别,适合絕大部分的情況

ADVANCED 使用預設的采樣率,報告所發現的任何的洩露以及對應的消息被通路的位置

PARANOID 類似于ADVANCED,但是其将會對每次(對消息的)通路都進行采樣。這對性能将會有很 大的影響,應該隻在調試階段使用

洩露檢測級别可以通過将下面的 Java 系統屬性設定為表中的一個值來定義:

java -Dio.netty.leakDetectionLevel=ADVANCED

實作 ChannelInboundHandler.channelRead()和 ChannelOutboundHandler.write() 方法時,應該如何使用這個診斷工具來防止洩露呢?讓我們看看你的 channelRead()操作直接消費 入站消息的情況;也就是說,它不會通過調用 ChannelHandlerContext.fireChannelRead() 方法将入站消息轉發給下一個 ChannelInboundHandler。代碼清單 6-3 展示了如何釋放消息。

代碼清單 6-3 消費并釋放入站消息

@Sharable public class DiscardInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } }

消費入站消息的簡單方式

由于消費入站資料是一項正常任務,是以 Netty 提供了一個特殊的被 稱為 SimpleChannelInboundHandler 的 ChannelInboundHandler 實作。這個實作會在消 息被 channelRead0()方法消費之後自動釋放消息。

在出站方向這邊,如果你處理了 write()操作并丢棄了一個消息,那麼你也應該負責釋放 它。代碼清單 6-4 展示了一個丢棄所有的寫入資料的實作。

代碼清單 6-4 丢棄并釋放出站消息

@Sharable public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ReferenceCountUtil.release(msg); promise.setSuccess(); } }

重要的是,不僅要釋放資源,還要通知 ChannelPromise。否則可能會出現 Channel- FutureListener 收不到某個消息已經被處理了的通知的情況。

總之,如果一個消息被消費或者丢棄了,并且沒有傳遞給 ChannelPipeline 中的下一個 ChannelOutboundHandler,那麼使用者就有責任調用 ReferenceCountUtil.release()。 如果消息到達了實際的傳輸層,那麼當它被寫入時或者 Channel 關閉時,都将被自動釋放。

6.2 ChannelPipeline 接口

每一個新建立的 Channel 都将會被配置設定一個新的 ChannelPipeline。這項關聯是永久性 的;Channel 既不能附加另外一個 ChannelPipeline,也不能分離其目前的。在 Netty 元件 的生命周期中,這是一項固定的操作,不需要開發人員的任何幹預。

圖 6-3 展示了一個典型的同時具有入站和出站 ChannelHandler 的 ChannelPipeline 的布 局,并且印證了我們之前的關于 ChannelPipeline 主要由一系列的 ChannelHandler 所組成的 說 法 。C h a n n e l P i p e l i n e 還 提 供 了 通 過 C h a n n e l P i p e l i n e 本 身 傳 播 事 件 的 方 法 。如 果 一 個 入 站 事件被觸發,它将被從 ChannelPipeline 的頭部開始一直被傳播到 Channel Pipeline 的尾端。

ChannelPipeline 相對論

你可能會說,從事件途經 ChannelPipeline 的角度來看,ChannelPipeline 的頭部和尾端取決于該事件是入站的還是出站的。然而 Netty 總是将 ChannelPipeline 的入站口(圖 6-3 中的左側) 作為頭部,而将出站口(該圖的右側)作為尾端。

在 ChannelPipeline 傳播事件時,它會測試 ChannelPipeline 中的下一個 Channel- Handler 的類型是否和事件的運動方向相比對。如果不比對,ChannelPipeline 将跳過該 ChannelHandler 并前進到下一個,直到它找到和該事件所期望的方向相比對的為止。(當然,ChannelHandler 也可以同時實作 ChannelInboundHandler 接口和 ChannelOutbound-

Handler 接口。)

6.2.1 修改 ChannelPipeline

ChannelHandler 可以通過添加、删除或者替換其他的 ChannelHandler 來實時地修改 ChannelPipeline 的布局。

表6-6 ChannelHandler的用于修改ChannelPipeline的方法

AddFirstaddBefore 将一個ChannelHandler添加到ChannelPipeline中

addAfteraddLast

remove 将一個 ChannelHandler 從 ChannelPipeline 中移除

replace 将 ChannelPipeline 中的一個 ChannelHandler 替換為另一個 ChannelHandler

除了這些操作,還有别的通過類型或者名稱來通路 ChannelHandler 的方法。這些方法都 列在了表 6-7 中。

表6-7 ChannelPipeline的用于通路ChannelHandler的操作

get 通過類型或者名稱傳回 ChannelHandler

context 傳回和 ChannelHandler 綁定的 ChannelHandlerContext

names 傳回 ChannelPipeline 中所有 ChannelHandler 的名稱

6.2.2 觸發事件

ChannelPipeline 的 API 公開了用于調用入站和出站操作的附加方法。表 6-8 列出了入 站操作,用于通知 ChannelInboundHandler 在 ChannelPipeline 中所發生的事件。

在出站這邊,處理事件将會導緻底層的套接字上發生一系列的動作。表 6-9 列出了 ChannelPipeline API 的出站操作。

總結一下:

ChannelPipeline 儲存了與 Channel 相關聯的 ChannelHandler;

ChannelPipeline 可以根據需要,通過添加或者删除 ChannelHandler 來動态地修改;

ChannelPipeline 有着豐富的 API 用以被調用,以響應入站和出站事件。

6.3 ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關 聯,每當有 ChannelHandler 添加到 ChannelPipeline 中時,都會建立 ChannelHandler- Context。ChannelHandlerContext 的主要功能是管理它所關聯的 ChannelHandler 和在 同一個 ChannelPipeline 中的其他 ChannelHandler 之間的互動。

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一點重要的不同。如果調用 Channel 或者 ChannelPipeline 上的這 些方法,它們将沿着整個 ChannelPipeline 進行傳播。而調用位于 ChannelHandlerContext 上的相同方法,則将從目前所關聯的 ChannelHandler 開始,并且隻會傳播給位于該 ChannelPipeline 中的下一個能夠處理該事件的 ChannelHandler。

表 6-10 對 ChannelHandlerContext API 進行了總結。

當使用 ChannelHandlerContext 的 API 的時候,請牢記以下兩點:

ChannelHandlerContext 和 ChannelHandler 之間的關聯(綁定)是永遠不會改變的,是以緩存對它的引用是安全的;

如同我們在本節開頭所解釋的一樣,相對于其他類的同名方法,ChannelHandler Context的方法将産生更短的事件流,應該盡可能地利用這個特性來獲得最大的性能。

6.3.1 使用 ChannelHandlerContext

在這一節中我們将讨論 ChannelHandlerContext 的用法,以及存在于 ChannelHandler- Context、Channel 和 ChannelPipeline 上的方法的行為。圖 6-4 展示了它們之間的關系。

如同在圖 6-5 中所能夠看到的一樣,代碼清單 6-6 和代碼清單 6-7 中的事件流是一樣的。重要的 是要注意到,雖然被調用的 Channel 或 ChannelPipeline 上的 write()方法将一直傳播事件通 過整個 ChannelPipeline,但是在 ChannelHandler 的級别上,事件從一個 ChannelHandler 到下一個 ChannelHandler 的移動是由 ChannelHandlerContext 上的調用完成的。

為什麼會想要從 ChannelPipeline 中的某個特定點開始傳播事件呢?

為了減少将事件傳經對它不感興趣的 ChannelHandler 所帶來的開銷。

為了避免将事件傳經那些可能會對它感興趣的 ChannelHandler。 要想調用從某個特定的 ChannelHandler 開始的處理過程,必須擷取到在(Channel-

Pipeline)該 ChannelHandler 之前的 ChannelHandler 所關聯的 ChannelHandler- Context。這個 ChannelHandlerContext 将調用和它所關聯的 ChannelHandler 之後的 ChannelHandler。

如圖 6-6 所示,消息将從下一個 ChannelHandler 開始流經 ChannelPipeline,繞過了 所有前面的 ChannelHandler。我們剛才所描述的用例是常見的,對于調用特定的 ChannelHandler 執行個體上的操作尤其有用。

6.3.2 ChannelHandler 和 ChannelHandlerContext 的進階用法

正如我們在代碼清單 6-6 中所看到的,你可以通過調用 ChannelHandlerContext 上的 pipeline()方法來獲得被封閉的 ChannelPipeline 的引用。這使得運作時得以操作 ChannelPipeline 的 ChannelHandler,我們可以利用這一點來實作一些複雜的設計。例如, 你可以通過将 ChannelHandler 添加到 ChannelPipeline 中來實作動态的協定切換。

另一種進階的用法是緩存到 ChannelHandlerContext 的引用以供稍後使用,這可能會發 生在任何的 ChannelHandler 方法之外,甚至來自于不同的線程。代碼清單 6-9 展示了用這種 模式來觸發事件。

代碼清單 6-9 緩存到 ChannelHandlerContext 的引用

public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) { this.ctx = ctx; } public void send(String msg) { ctx.writeAndFlush(msg); } }

6.4 異常處理

異常處理是任何真實應用程式的重要組成部分,它也可以通過多種方式來實作。是以,Netty 提供了幾種方式用于處理入站或者出站處理過程中所抛出的異常。

6.4.1 處理入站異常

如果在處理入站事件的過程中有異常被抛出,那麼它将從它在 ChannelInboundHandler 裡被觸發的那一點開始流經 ChannelPipeline。要想處理這種類型的入站異常,你需要在你 的 ChannelInboundHandler 實作中重寫下面的方法。

public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause) throws Exception

代碼清單 6-12 展示了一個簡單的示例,其關閉了 Channel 并列印了異常的棧跟蹤資訊。 代碼清單 6-12 基本的入站異常處理

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

因為異常将會繼續按照入站方向流動(就像所有的入站事件一樣),是以實作了前面所示邏 輯的 ChannelInboundHandler 通常位于 ChannelPipeline 的最後。這確定了所有的入站 異常都總是會被處理,無論它們可能會發生在 ChannelPipeline 中的什麼位置。

你應該如何響應異常,可能很大程度上取決于你的應用程式。你可能想要關閉Channel(和 連接配接),也可 能會嘗試進行恢複。如果你不實作任何處理入站異常的邏輯(或者沒有消費該異常), 那麼Netty将會記錄該異常沒有被處理的事實 1。

ChannelHandler.exceptionCaught()的預設實作是簡單地将目前異常轉發給ChannelPipeline 中的下一個 ChannelHandler;

如果異常到達了 ChannelPipeline 的尾端,它将會被記錄為未被處理;

要想定義自定義的處理邏輯,你需要重寫 exceptionCaught()方法。然後你需要決定

是否需要将該異常傳播出去。

6.4.2 處理出站異常

用于處理出站操作中的正常完成以及異常的選項,都基于以下的通知機制。

每個出站操作都将傳回一個ChannelFuture。注冊到ChannelFuture的ChannelFutureListener 将在操作完成時被通知該操作是成功了還是出錯了。

幾乎所有的 ChannelOutboundHandler 上的方法都會傳入一個 ChannelPromise 的執行個體。作為 ChannelFuture 的子類,ChannelPromise 也可以被配置設定用于異步通知的監聽器。但是,ChannelPromise 還具有提供立即通知的可寫方法:

ChannelPromise setSuccess(); ChannelPromise setFailure(Throwable cause);

ChannelPromise 的可寫方法

通過調用 ChannelPromise 上的 setSuccess()和 setFailure()方法,可以使一個操作的狀态在 ChannelHandler 的方法傳回給其調用者時便即刻被感覺到。

添加 ChannelFutureListener 隻需要調用 ChannelFuture 執行個體上的 addListener方法,并且有兩種不同的方式可以做到這一點。其中最常用的方式是

/** * 代碼清單 6-13 添加 ChannelFutureListener 到 ChannelFuture */ ChannelFuture future = channel.write(someMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } }); }

第二種方式是将 ChannelFutureListener 添加到即将作為參數傳遞給 ChannelOutboundHandler 的方法的 ChannelPromise。代碼清單 6-14 中所展示的代碼和代碼清單 6-13 中所展示的具有相同的效果。

/** * 代碼清單 6-14 添加 ChannelFutureListener 到 ChannelPromise */ public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } }); } } }

第 7 章 EventLoop和線程模型

Netty 的 EventLoop 和并發模型,對于了解 Netty 是如何實作異步的、事件驅動的網絡程式設計模型來說至關重要。

 線程模型概述

 事件循環的概念和實作

 任務排程

 實作細節

如果你對 Java 的并發 API(java.util.concurrent)有比較好的了解,那麼你應該會發 現在本章中的讨論都是直截了當的。如果這些概念對你來說還比較陌生,或者你需要更新自己的 相關知識,那麼由 Brian Goetz 等編寫的《Java 并發程式設計實戰》 (Addison-Wesley Professional, 2006)這本書将是極好的資源。

7.1 線程模型概述

在這一節中,我們将介紹常見的線程模型,随後将繼續讨論 Netty 過去以及目前的線程模型, 并評審它們各自的優點以及局限性。

因為具有多核心或多個 CPU 的計算機現在已經司空見慣,大多數的現代應用程式都利用了 複雜的多線程處理技術以有效地利用系統資源。

相比之下,在早期的 Java 語言中,我們使用多 線程處理的主要方式無非是按需建立和啟動新的 Thread 來執行并發的任務單元——一種在高 負載下工作得很差的原始方式。Java 5 随後引入了 Executor API,其線程池通過緩存和重用 Thread 極大地提高了性能。

基本的線程池化模式可以描述為:

從池的空閑線程清單中選擇一個 Thread,并且指派它去運作一個已送出的任務(一個Runnable 的實作);

當任務完成時,将該 Thread 傳回給該清單,使其可被重用。 圖 7-1 說明了這個模式。

雖然池化和重用線程相對于簡單地為每個任務都建立和銷毀線程是一種進步,但是它并不能 消除由上下文切換所帶來的開銷,其将随着線程數量的增加很快變得明顯,并且在高負載下愈演 愈烈。此外,僅僅由于應用程式的整體複雜性或者并發需求,在項目的生命周期内也可能會出現 其他和線程相關的問題。

簡而言之,多線程處理是很複雜的,我們來看看 Netty 是如何幫助簡 化它的。

7.2 EventLoop 接口

EventLoop通常被稱為事件循環,代碼清單 7-1 中說明了事件循環的基本思想,其中每個任務都是一個 Runnable 的執行個體(如圖 7-1 所示)。

/** * 代碼清單 7-1 在事件循環中執行任務 */ while (!terminated) { // 阻塞,直到有事件已經就緒可被運作 List<Runnable> readyEvents = blockUntilEventsReady(); for (Runnable ev: readyEvents) { // 循環周遊,并處理所有的事件 ev.run(); } }

Netty 的 EventLoop 是協同設計的一部分,它采用了兩個基本的 API:并發和網絡程式設計。

首先,io.netty.util.concurrent 包建構在 JDK 的 java.util.concurrent 包上,用 來提供線程執行器。

其次,io.netty.channel 包中的類,為了與 Channel 的事件進行互動, 擴充了這些接口/類。圖 7-2 展示了生成的類層次結構。

在這個模型中,一個 EventLoop 将由一個永遠都不會改變的 Thread 驅動,同時任務 (Runnable 或者 Callable)可以直接送出給 EventLoop 實作,以立即執行或者排程執行。 根據配置和可用核心的不同,可能會建立多個 EventLoop 執行個體用以優化資源的使用,并且單個 EventLoop 可能會被指派用于服務多個 Channel。

事件/任務的執行順序

事件和任務是以先進先出(FIFO)的順序執行的。這樣可以通過保證字 節内容總是按正确的順序被處理,消除潛在的資料損壞的可能性。

7.2.1 Netty 4 中的 I/O 和事件處理

正如我們在第 6 章中所較長的描述的,由 I/O 操作觸發的事件将流經安裝了一個或者多個 ChannelHandler 的 ChannelPipeline。

事件的性質通常決定了它将被如何處理;它可能将資料從網絡棧中傳遞到你的應用程式中, 或者進行逆向操作,或者 執行一些截然不同的操作。但是事件的處理邏輯必須足夠的通用和靈活, 以處理所有可能的用例。是以,在Netty 4 中,所有的I/O操作和事件都由已經被配置設定給了 EventLoop的那個Thread來處理。

這不同于 Netty 3 中所使用的模型。在下一節中,我們将讨論這個早期的模型以及它被替換 的原因。

7.2.2 Netty 3 中的 I/O 操作

在以前的版本中所使用的線程模型隻保證了入站(之前稱為上遊)事件會在所謂的 I/O 線程中執行。所有的出站(下遊)事件都由調用線程處理,其可 能是 I/O 線程也可能是别的線程。開始看起來這似乎是個好主意,但是已經被發現是有問題的, 因為需要在 ChannelHandler 中對出站事件進行仔細的同步。簡而言之,不可能保證多個線程 不會在同一時刻嘗試通路出站事件。例如,如果你通過在不同的線程中調用 Channel.write()

方法,針對同一個 Channel 同時觸發出站的事件,就會發生這種情況。

Netty 4 中所采用的線程模型,通過在同一個線程中處理某個給定的 EventLoop 中所産生的所有事件,解決了這個問題。這提供了一個更加簡單的執行體系架構,并且消除了在多個 ChannelHandler 中進行同步的需要(除了任何可能需要在多個 Channel 中共享的)。

7.3 任務排程

7.3.1 JDK 的任務排程 API

在 Java 5 之前,任務排程是建立在 java.util.Timer 類之上的,其使用了一個背景 Thread, 并且具有與标準線程相同的限制。随後,JDK 提供了 java.util.concurrent 包,它定義了 interface ScheduledExecutorService。表 7-1 展示了 java.util.concurrent.Executors 的相關工廠方法。

表7-1 java.util.concurrent.Executors類的工廠方法

<col>

方法

描述

newScheduledThreadPool(int corePoolSize)

建立一個 ScheduledThreadExecutorService, 用于排程指令在指定延遲之後運作或者周期性地執 行。它使用 corePoolSize 參數來計算線程數

newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

newSingleThreadScheduledExecutor()

建立一個 ScheduledThreadExecutorService, 用于排程指令在指定延遲之後運作或者周期性地執 行。它使用一個線程來執行被排程的任務

newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

雖然選擇不是很多,但是這些預置的實作已經足以應對大多數的用例。雖然 ScheduledExecutorService API 是直截了當的,但是在高負載下它将帶來性能上 的負擔。

7.3.2 使用 EventLoop 排程任務

ScheduledExecutorService 的實作具有局限性,例如,事實上作為線程池管理的一部 分,将會有額外的線程建立。如果有大量任務被緊湊地排程,那麼這将成為一個瓶頸。Netty 通 過 Channel 的 EventLoop 實作任務排程解決了這一問題,如代碼清單 7-3 所示。

代碼清單 7-3 使用 EventLoop 排程任務

Channel ch = ... ScheduledFuture&lt;?&gt; future = ch.eventLoop().schedule( new Runnable() { @Override public void run() { System.out.println("60 seconds later"); } }, 60, TimeUnit.SECONDS);

經過 60 秒之後,Runnable 執行個體将由配置設定給 Channel 的 EventLoop 執行。如果要排程任務以每隔 60 秒執行一次,請使用 scheduleAtFixedRate()方法。

7.4 實作細節

7.4.1 線程管理

Netty線程模型的卓越性能取決于對于目前執行的Thread的身份的确定 2,也就是說,确定它是支撐 EventLoop 的那一個線程。(回想一下EventLoop将負責處理一個Channel的整個生命周期内的所有事件。)

如果是,那麼所送出的代碼塊将會被(直接)執行。否則,EventLoop 将排程該任務放入到内部隊列中,以便稍後執行。當 EventLoop 處理事件時,它會執行隊列中的那些任務/事件。這也就解釋了任何的 Thread 是如何 與 Channel 直接互動而無需在 ChannelHandler 中進行額外同步的。

注意,每個 EventLoop 都有它自已的任務隊列,獨立于任何其他的 EventLoop。圖 7-3 展示了 EventLoop 用于排程任務的執行邏輯。這是 Netty 線程模型的關鍵組成部分。

“永遠不要将一個長時間運作的任務放入到執行隊列中,因為它将阻塞需要在同一線程上執行的任何其他任務。” 如果必須要進行阻塞調用或者執行長時間運作的任務,我們建議使用一個專門的 EventExecutor。(見 6.2.1 節的“ChannelHandler 的執行和阻塞”)。

7.4.2 EventLoop/線程的配置設定

服務于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中。根據不同的 傳輸實作,EventLoop 的建立和配置設定方式也不同。

1.異步傳輸

異步傳輸實作隻使用了少量的 EventLoop(以及和它們相關聯的 Thread),在異步傳輸模型中,它們可能會被多個 Channel 所共享。這使得可以通過盡可能少量的 Thread 來支撐大量的 Channel,而不是每個 Channel 配置設定一個 Thread。

圖 7-4 顯示了一個 EventLoopGroup,它具有 3 個固定大小的 EventLoop,每個 EventLoop

都由一個 Thread 支撐。在建立 EventLoopGroup 時就直接配置設定了 EventLoop,以及支撐它們 的 Thread,以確定在需要時它們是可用的。

EventLoopGroup 負責為每個新建立的 Channel 配置設定一個 EventLoop。在目前實作中, 使用順序循環(round-robin)的方式進行配置設定以擷取一個均衡的分布,并且相同的 EventLoop 可能會被配置設定給多個 Channel。(這一點在将來的版本中可能會改變。)

一旦一個 Channel 被配置設定給一個 EventLoop,它将在它的整個生命周期中都使用這個 EventLoop(以及相關聯的 Thread)。請牢記這一點,因為它可以使你從擔憂你的 ChannelHandler 實作中的線程安全和同步問題中解脫出來。

另外,需要注意的是,EventLoop 的配置設定方式對 ThreadLocal 使用的影響。因為一個 EventLoop 通常會被用于支撐多個 Channel,是以對于所有相關聯的 Channel 來說, ThreadLocal 都将是一樣的。這使得它對于實作狀态追蹤等功能來說是個糟糕的選擇。然而, 在一些無狀态的上下文中,它仍然可以被用于在多個 Channel 之間共享一些重度的或者代價昂貴的對象,甚至是事件。

2.阻塞傳輸

用于像 OIO(舊的阻塞 I/O)這樣的其他傳輸的設計略有不同。這裡每一個 Channel 都将被配置設定給一個 EventLoop,以及它的 Thread。如果你開發的 應用程式使用過 java.io 包中的阻塞 I/O 實作,你可能就遇到過這種模型,如圖 7-5 所示。

但是,正如同之前一樣,得到的保證是每個 Channel 的 I/O 事件都将隻會被一個 Thread 處理。這也是另一個 Netty 設計一緻性的例子,這種設計對 Netty 的可靠性和易用性做出了巨大貢獻。

第8章 引導

在深入地學習了 ChannelPipeline、ChannelHandler 和 EventLoop 之後,你接下來 的問題可能是:“如何将這些部分組織起來,成為一個可實際運作的應用程式呢?”

答案是?“引導”(Bootstrapping)。簡單來說,引導一個應用程式是指對它進行配置,并使它運 行起來的過程。

 引導用戶端和伺服器

 從Channel内引導用戶端

 添加ChannelHandler

 使用ChannelOption和屬性

8.1 Bootstrap 類

引導類的層次結構包括一個抽象的父類和兩個具體的引導子類,如圖 8-1 所示。

我們分别來看作用于伺服器和用戶端的引導的目标。

伺服器緻力于使用一個父 Channel 來接受 來自用戶端的連接配接,并建立子 Channel 以用于它們之間的通信;

而用戶端将最可能隻需要一個 單獨的、沒有父 Channel 的 Channel 來用于所有的網絡互動。(正如同我們将要看到的,這也 适用于無連接配接的傳輸協定,如 UDP,因為它們并不是每個連接配接都需要一個單獨的 Channel。)

兩種應用程式類型之間通用的引導步驟由 AbstractBootstrap 處理,而特定于用戶端或者伺服器的引導步驟則分别由 Bootstrap 或 ServerBootstrap 處理。

AbstractBootstrap 類的完整聲明是:

public abstract class AbstractBootstrap

, C extends Channel&gt;

其子類的聲明如下:

public class Bootstrap

extends AbstractBootstrap&lt;bootstrap, channel=""&gt;

public class ServerBootstrap

extends AbstractBootstrap&lt;serverbootstrap, serverchannel=""&gt;

在AbstractBootstrap簽名中,子類型 B 是其父類型的一個類型參數,是以可以傳回到運作時執行個體的引用以

支援方法的鍊式調用(也就是所謂的流式文法)。

為什麼引導類是 Cloneable 的

你有時可能會需要建立多個具有類似配置或者完全相同配置的Channel。為了支援這種模式而又不

需要為每個Channel都建立并配置一個新的引導類執行個體,AbstractBootstrap被标記為了 Cloneable1。在一個已經配置完成的引導類執行個體上調用clone()方法将傳回另一個可以立即使用的引 導類執行個體。

注意,這種方式隻會建立引導類執行個體的EventLoopGroup的一個淺拷貝,是以,被淺拷貝的EventLoopGroup 将在所有克 隆的Channel執行個體之間共享。這是可以接受的,因為通常這些克隆的Channel的生命周期都很短暫,一 個典型的場景是——建立一個Channel以進行一次HTTP請求。

8.2 引導用戶端和無連接配接協定

Bootstrap 類被用于用戶端或者使用了無連接配接協定的應用程式中。表 8-1 提供了該類的一個概覽,其中許多方法都繼承自 AbstractBootstrap 類。

8.2.1 引導用戶端

Bootstrap 類負責為用戶端和使用無連接配接協定的應用程式建立 Channel,如圖 8-2 所示。

代碼清單 8-1 中的代碼引導了一個使用 NIO TCP 傳輸的用戶端。

EventLoopGroup group = new NioEventLoopGroup(); // 建立一個Bootstrap類的執行個體 以建立和 連接配接新的 用戶端 Channe Bootstrap bootstrap = new Bootstrap(); // 設定 EventLoopGroup, 提供用于處理 Channel 事件的 EventLoop bootstrap.group(group) // 指定要使用的 Channel 實作 .channel(NioSocketChannel.class) // 設定用于 Channel 事件和數 據的 ChannelInboundHandler .handler(new SimpleChannelInboundHandler&lt;ByteBuf&gt;() { @Override protected void channeRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); // 連接配接到遠端 主機 ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Connection established"); } else { System.err.println("Connection attempt failed"); } } } );

這個示例使用了前面提到的流式文法;這些方法(除了 connect()方法以外)将通過每次 方法調用所傳回的對 Bootstrap 執行個體的引用連結在一起。

8.2.2 Channel 和 EventLoopGroup 的相容性

代碼清單 8-2 所示的目錄清單來自 io.netty.channel 包。你可以從包名以及與其相對應 的類名的字首看到,對于 NIO 以及 OIO 傳輸兩者來說,都有相關的 EventLoopGroup 和 Channel 實作。

必須保持這種相容性,不能混用具有不同字首的元件,如 NioEventLoopGroup 和 OioSocketChannel。否則将會導緻 IllegalStateException,因為它混用了不相容的傳輸。

Exception in thread "main" java.lang.IllegalStateException: incompatible event loop type: io.netty.channel.nio.NioEventLoop at io.netty.channel.AbstractChannel$AbstractUnsafe.register( AbstractChannel.java:571)

關于 IllegalStateException 的更多讨論

在引導的過程中,在調用 bind()或者 connect()方法之前,必須調用以下方法來設定所需的元件:

 group();

 channel()或者 channelFactory();

 handler()。

如果不這樣做,則将會導緻 IllegalStateException。對 handler()方法的調用尤其重要,因為它需要配置好 ChannelPipeline。

8.3 引導伺服器

我們将從 ServerBootstrap API 的概要視圖開始我們對伺服器引導過程的概述。然後, 我們将會探讨引導伺服器過程中所涉及的幾個步驟,以及幾個相關的主題,包含從一個 ServerChannel 的子 Channel 中引導一個用戶端這樣的特殊情況。

8.3.1 ServerBootstrap 類

表 8-2 列出了 ServerBootstrap 類的方法。

8.3.2 引導伺服器

你可能已經注意到了,表 8-2 中列出了一些在表 8-1 中不存在的方法:childHandler()、 childAttr()和 childOption()。這些調用支援特别用于伺服器應用程式的操作。具體來說, ServerChannel 的實作負責建立子 Channel,這些子 Channel 代表了已被接受的連接配接。

是以,負責引導 ServerChannel 的 ServerBootstrap 提供了這些方法,以簡化将設定應用到 已被接受的子 Channel 的 ChannelConfig 的任務。

圖 8-3 展示了 ServerBootstrap 在 bind()方法被調用時建立了一個 ServerChannel, 并且該 ServerChannel 管理了多個子 Channel。

代碼清單 8-4 中的代碼實作了圖 8-3 中所展示的伺服器的引導過程。

/** * 代碼清單 8-4 引導伺服器 建立 ServerBootstrap */ NioEventLoopGroup group = new NioEventLoopGroup(); // 建立 ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap(); // 設定 EventLoopGroup,其提供了用 于處理 Channel 事件的 EventLoop bootstrap.group(group) // 指定要使用的 Channel 實作 .channel(NioServerSocketChannel.class) // 設定用于處理已被接受 的子 Channel 的 I/O 及資料的 ChannelInbound- Handler .childHandler(new SimpleChannelInboundHandler&lt;ByteBuf&gt;() { @Override protected void channelRead0(ChannelHandlerContext ctx,ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); // 通過配置好的ServerBootstrap的執行個體綁定該Channel ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Server bound"); } else { System.err.println("Bound attempt failed"); channelFuture.cause().printStackTrace(); } } } );

8.4 從 Channel 引導用戶端

假設你的伺服器正在處理一個用戶端的請求,這個請求需要它充當第三方系統的用戶端。在這種情況下,将需要從已經被接受的子 Channel 中引導一個客戶 端 Channel。

你可以按照 8.2.1 節中所描述的方式建立新的 Bootstrap 執行個體,但是這并不是最高效的解 決方案,因為它将要求你為每個新建立的用戶端 Channel 定義另一個 EventLoop。這會産生 額外的線程,以及在已被接受的子 Channel 和用戶端 Channel 之間交換資料時不可避免的上 下文切換。

一個更好的解決方案是:通過将已被接受的子 Channel 的 EventLoop 傳遞給 Bootstrap 的 group()方法來共享該 EventLoop。因為配置設定給 EventLoop 的所有 Channel 都使用同一 個線程,是以這避免了額外的線程建立,以及前面所提到的相關的上下文切換。這個共享的解決 方案如圖 8-4 所示。

實作 EventLoop 共享涉及通過調用 group()方法來設定 EventLoop,如代碼清單 8-5 所示。

/** * 代碼清單 8-5 引導伺服器 */ // 建立 ServerBootstrap 以建立 ServerSocketChannel,并綁定它 ServerBootstrap bootstrap = new ServerBootstrap(); // 設定 EventLoopGroup,其将提供用 以處理 Channel 事件的 EventLoop bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) // 指定要使用的 Channel 實作 .channel(NioServerSocketChannel.class) // 設定用于處理已被接受的 子 Channel 的 I/O 和資料的 ChannelInboundHandler .childHandler(new SimpleChannelInboundHandler&lt;ByteBuf&gt;() { ChannelFuture connectFuture; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 建立一個 Bootstrap 類的執行個體以連接配接 到遠端主機 Bootstrap bootstrap = new Bootstrap(); // 指定 Channel 的實作 bootstrap.channel(NioSocketChannel.class).handler( // 為入站 I/O 設定 ChannelInboundHandler new SimpleChannelInboundHandler&lt;ByteBuf&gt;() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { System.out.println("Received data"); } }); // 使用與配置設定給 已被接受的子 Channel 相同的 EventLoop bootstrap.group(ctx.channel().eventLoop()); // 連接配接到 遠端節點 connectFuture = bootstrap.connect( new InetSocketAddress("www.manning.com", 80)); } @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { if (connectFuture.isDone()) { // 當連接配接完成時,執行一 些資料操作(如代理) // do something with the data } } }); // 通過配置好的ServerBootstrap 綁定該 Server- SocketChannel ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Server bound"); } else { System.err.println("Bind attempt failed"); channelFuture.cause().printStackTrace(); } } });

我們在這一節中所讨論的主題以及所提出的解決方案都反映了編寫 Netty 應用程式的一個一 般準則:盡可能地重用 EventLoop,以減少線程建立所帶來的開銷。

8.5 在引導過程中添加多個 ChannelHandler

在所有我們展示過的代碼示例中,我們都在引導的過程中調用了 handler()或者 child-Handler()方法來添加單個的 ChannelHandler。

這對于簡單的應用程式來說可能已經足夠了,但是它不能滿足更加複雜的需求。例如,一個必須要支援多種協定的應用程式将會有很多的ChannelHandler,而不會是一個龐大而又笨重的類。

正如你經常所看到的一樣,你可以根據需要,通過在 ChannelPipeline 中将它們連結在一起來 部署盡可能多的ChannelHandler。但是,如果在引導的過程中你隻能設定一個 ChannelHandler, 那麼你應該怎麼做到這一點呢?

正是針對于這個用例,Netty 提供了一個特殊的 ChannelInboundHandlerAdapter 子類:

public abstract class ChannelInitializer&lt;C extends Channel&gt; extends ChannelInboundHandlerAdapter

它定義了下面的方法:

protected abstract void initChannel(C ch) throws Exception;

這個方法提供了一種将多個 ChannelHandler 添加到一個 ChannelPipeline 中的簡便 方法。你隻需要簡單地向 Bootstrap 或 ServerBootstrap 的執行個體提供你的 Channel- Initializer 實作即可,并且一旦 Channel 被注冊到了它的 EventLoop 之後,就會調用你的 initChannel()版本。在該方法傳回之後,ChannelInitializer 的執行個體将會從 Channel- Pipeline 中移除它自己。

代 碼 清 單 8-6 定 義 了 ChannelInitializerImpl 類 , 并 通 過 ServerBootstrap 的 childHandler()方法注冊它 1。你可以看到,這個看似複雜的操作實際上是相當簡單直接的。

/** * 引導和使用 ChannelInitializer * @throws Exception */ public void start2() throws Exception { // 建立 ServerBootstrap 以創 建和綁定新的 Channel ServerBootstrap bootstrap = new ServerBootstrap(); // 設定 EventLoopGroup,其将提供用 以處理 Channel 事件的 EventLoop bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) // 指定 Channel 的 實作 .channel(NioServerSocketChannel.class) // 注冊一個 ChannelInitializerImpl 的 執行個體來設定 ChannelPipeline .childHandler(new ChannelInitializerImpl()); ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)); future.sync(); } // 用以設定 ChannelPipeline 的自 定義 ChannelInitializerImpl 實作 final class ChannelInitializerImpl extends ChannelInitializer&lt;Channel&gt; { // 将所需的 ChannelHandler 添加到 ChannelPipeline @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); } }

如果你的應用程式使用了多個 ChannelHandler,請定義你自己的 ChannelInitializer 實作來将它們安裝到 ChannelPipeline 中。

8.6 使用 Netty 的 ChannelOption 和屬性

在每個 Channel 建立時都手動配置它可能會變得相當乏味。幸運的是,你不必這樣做。相 反,你可以使用 option()方法來将 ChannelOption 應用到引導。你所提供的值将會被自動 應用到引導所建立的所有 Channel。可用的 ChannelOption 包括了底層連接配接的詳細資訊,如 keep-alive 或者逾時屬性以及緩沖區設定。

Netty 應用程式通常與組織的專有軟體內建在一起,而像 Channel 這樣的元件可能甚至會在 正常的 Netty 生命周期之外被使用。在某些常用的屬性和資料不可用時,Netty 提供了 AttributeMap 抽象(一個由 Channel 和引導類提供的集合)以及 AttributeKey(一 個用于插入和擷取屬性值的泛型類)。使用這些工具,便可以安全地将任何類型的資料項與客戶 端和伺服器 Channel(包含 ServerChannel 的子 Channel)相關聯了。

代碼清單 8-7 展示了可以如何使用 ChannelOption 來配置 Channel,以及如果使用屬性 來存儲整型值。

// 建立一個 AttributeKey 以辨別該屬性 final AttributeKey&lt;Integer&gt; id = new AttributeKey&lt;Integer&gt;("ID"); Bootstrap bootstrap = new Bootstrap(); // 設定 EventLoopGroup,其 提供了用以處理 Channel 事件的 EventLoop bootstrap.group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler( new SimpleChannelInboundHandler&lt;ByteBuf&gt;() { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // 使用 AttributeKey 檢索 屬性以及它的值 Integer idValue = ctx.channel().attr(id).get(); // do something with the idValue } @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); // 設定 ChannelOption, 其将在 connect()或者 bind()方法被調用時 被設定到已經建立的 Channel 上 bootstrap.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); // 存儲該 id 屬性 bootstrap.attr(id, 123456); ChannelFuture future = bootstrap.connect( new InetSocketAddress("www.manning.com", 80)); future.syncUninterruptibly();

8.7 引導 DatagramChannel

前面的引導代碼示例使用的都是基于 TCP 協定的 SocketChannel,但是 Bootstrap 類 也可以被用于無連接配接的協定。為此,Netty 提供了各種 DatagramChannel 的實作。唯一差別就 是,不再調用 connect()方法,而是隻調用 bind()方法,如代碼清單 8-8 所示。

Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new OioEventLoopGroup()) // 指定Channel 的實作 .channel(OioDatagramChannel.class) .handler(new SimpleChannelInboundHandler&lt;DatagramPacket&gt;() { @Override public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { // Do something with the packet } } ); // 調用 bind()方 法,因為該協 議是無連接配接的 ChannelFuture future = bootstrap.bind(new InetSocketAddress(0)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Channel bound"); } else { System.err.println("Bind attempt failed"); channelFuture.cause().printStackTrace(); } } });

8.8 關閉

引導使你的應用程式啟動并且運作起來,但是遲早你都需要優雅地将它關閉。當然,你也可 以讓 JVM 在退出時處理好一切,但是這不符合優雅的定義,優雅是指幹淨地釋放資源。

最重要的是,你需要關閉 EventLoopGroup,它将處理任何挂起的事件和任務,并且随後 釋放所有活動的線程。這就是調用 EventLoopGroup.shutdownGracefully()方法的作用。 這個方法調用将會傳回一個 Future,這個 Future 将在關閉完成時接收到通知

需要注意的是, shutdownGracefully()方法也是一個異步的操作,是以你需要阻塞等待直到它完成,或者向 所傳回的 Future 注冊一個監聽器以在關閉完成時獲得通知。代碼清單 8-9 符合優雅關閉的定義。

/** * 代碼清單 8-9 優雅關閉 */ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class); ... // shutdownGracefully()方法将釋放 所有的資源,并且關閉所有的當 前正在使用中的 Channel Future&lt;?&gt; future = group.shutdownGracefully(); // block until the group has shutdown future.syncUninterruptibly();

或者,你也可以在調用 EventLoopGroup.shutdownGracefully()方法之前,顯式地 在所有活動的 Channel 上調用 Channel.close()方法。但是在任何情況下,都請記得關閉 EventLoopGroup 本身。