Netty架構之概述及基本元件介紹
Reactor網絡程式設計模型解析
前言
在上篇部落格介紹完netty架構的基本元件介紹和概述,也跟着代碼看了下NioEventLoopGroup的啟動過程,以及基于Reactor線程模型的解析,它是開發Netty的核心思想,也是整個Netty架構的核心思想;這篇文章分析Netty中責任鍊模式,該模式給netty架構提供了大量一些擴充,使得netty架構更适合在業務場景上使用。
設計模式 - 責任鍊模式
Netty中的責任鍊模式也就是來自設計模式中責任鍊模式。
責任鍊模式( Chain of Responsibility Pattern)為請求建立了一個處理對象的鍊,每一個鍊條處理一件事情; 發起請求和具體處理請求的過程進行解耦: 職責鍊上的處理者負責處理請求,客戶隻需要将 請求發送到職責鍊上即可,無須關心請求的處理細節和請求的傳遞。 每一個環節都會做一個處理,到達最後得到最後結果。 中間點中每個點都解耦,并不依賴
類似于上面圖的情況,每個部分做的事情不同,并不感染,感覺就像是一個個元件進行組裝
責任鍊的好處
- 降低了對象之間的耦合度。該模式使得一個對象無須知道到底是哪一個對象處理其請求以及鍊的結構,發送者和接收者也無須擁有對方的明确資訊。
- 增強了系統的可擴充性。可以根據需要增加新的請求處理類,滿足開閉原則。
- 增強了給對象指派職責的靈活性。當工作流程發生變化,可以動态地改變鍊内的成員或者調動它們的次序,也可動态地新增或者删除責任。
- 責任鍊簡化了對象之間的連接配接。每個對象隻需保持一個指向其後繼者的引用,不需保持其他所有處理者的引用,這避免了使用衆多的 if 或者 if···else 語句。
- 責任分擔。每個類隻需要處理自己該處理的工作,不該處理的傳遞給下一個對象完成,明确各類的責任範圍,符合類的單一職責原則。
缺點
- 不能保證每個請求都被處理,有可能這個處理并不會到末端就抛出掉了某個請求
- 對比較長的職責鍊,對象是非常多的,有可能會出現占用資源過大的問題
- 職責鍊建立的合理性要靠用戶端來保證,是以也可能出現寫出錯的情況
在netty中主要就是利用責任鍊模式,進行業務上處理,達到netty的耦合降低,擴充性好的特點。
實作責任鍊模式
一定有四個要素:
- 處理抽象類 抽象出方法用于鍊條的傳遞 next 方法
- 具體的處理器實作類 用于處理鍊條之間的資料
- 儲存處理器資訊 無論是數組實作 還是鍊條實作 都需要一個容器來存儲這些實作類然後讓鍊條連接配接起來
- 處理執行 開始執行的方法 執行器
代碼實作
- 抽象處理類以及 處理抽象類 Pipeline 來進行管理并持有 抽象處理類
/**
* 主要持有類
*
* @author
*
*/
public class Pipeline {
/**
* handler上下文,主要負責維護鍊 ,和鍊的執行
*/
class HandlerChainContext {
HandlerChainContext next;// 持有下一個節點
AbstractHandler handler;
public HandlerChainContext(AbstractHandler handler) {
this.handler = handler;
}
// 将節點持有下去
void handler(Object arg0) {
this.handler.doHandler(this, arg0);
}
/**
* 繼續執行下一個
*/
void runNext(Object arg0) {
if (this.next != null) {
this.next.handler(arg0);
}
}
}
/**
* 抽象處理類 用于處理鍊條之間的資料
*
* @author
*
*/
static abstract class AbstractHandler {
abstract void doHandler(HandlerChainContext context, Object arg0);
}
}
- 對Pipeline 進行完善
// 持有上下文
public HandlerChainContext context = new HandlerChainContext(new AbstractHandler() {
@Override
void doHandler(HandlerChainContext context, Object arg0) {
System.out.println("這是頭部");
context.runNext(arg0);
}
});
- 增加責任鍊的方法及調用方法
// 添加責任鍊
public void addLast(AbstractHandler handler) {
HandlerChainContext next = context;
while (next.next != null) {
next = next.next;
}
next.next = new HandlerChainContext(handler);
}
// 開始調用
public void requestProcess(Object arg0) {
context.handler(arg0);
}
- 最後建立一個具體實作處理類 和測試代碼
static class Handler1 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
arg0 = arg0.toString() + "..handler1的的處理.....";
System.out.println("我是Handler1的執行個體,我在處理:" + arg0);
// 繼續執行下一個
handlerChainContext.runNext(arg0);
}
}
public static void main(String[] args) {
Pipeline p = new Pipeline();
p.addLast(new Handler1());
p.requestProcess("開始了....");
}
得到下面的列印結果
這是頭部
我是Handler1的執行個體,我在處理:開始了........這是頭部......handler1的的處理.....
完整代碼
/**
* 主要持有類
*
* @author
*
*/
public class Pipeline {
// 持有上下文
public HandlerChainContext context = new HandlerChainContext(new AbstractHandler() {
@Override
void doHandler(HandlerChainContext context, Object arg0) {
System.out.println("這是頭部");
context.runNext(arg0 + "....這是頭部....");
}
});
// 添加責任鍊
public void addLast(AbstractHandler handler) {
HandlerChainContext next = context;
while (next.next != null) {
next = next.next;
}
next.next = new HandlerChainContext(handler);
}
// 開始調用
public void requestProcess(Object arg0) {
context.handler(arg0);
}
/**
* handler上下文,主要負責維護鍊 ,和鍊的執行
*/
class HandlerChainContext {
HandlerChainContext next;// 持有下一個節點
AbstractHandler handler;
public HandlerChainContext(AbstractHandler handler) {
this.handler = handler;
}
// 将節點持有下去
void handler(Object arg0) {
this.handler.doHandler(this, arg0);
}
/**
* 繼續執行下一個
*/
void runNext(Object arg0) {
if (this.next != null) {
this.next.handler(arg0);
}
}
}
/**
* 具體的處理器實作類 用于處理鍊條之間的資料
*
* @author
*
*/
static abstract class AbstractHandler {
abstract void doHandler(HandlerChainContext context, Object arg0);
}
static class Handler1 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
arg0 = arg0.toString() + "..handler1的的處理.....";
System.out.println("我是Handler1的執行個體,我在處理:" + arg0);
// 繼續執行下一個
handlerChainContext.runNext(arg0);
}
}
public static void main(String[] args) {
Pipeline p = new Pipeline();
p.addLast(new Handler1());
p.requestProcess("開始了....");
}
}
Netty中的ChannelPipeline責任鍊
在netty中實作責任鍊的就是Pipeline管道
Pipeline 管道 儲存了通道所有處理器資訊。 建立新channel 時自動建立一個專有的 pipeline 。 入站 事件 和出站 操作 會調用 pipeline 上的處理器
在源碼中怎麼為每個channel 建立一個pipeline的
- NioEventLoop的run是在初始化時就會啟動run方法,然後監聽是否有事件過來
- 走到 processSelectedKey 方法中的 事件讀取 niomessageunsafe中
- doReadMessages 方法中 建立 一個NioSocketChannel
- 在抽象類AbstractChannel中持有者pipeline
- 會在初始化時就建立 這個pipeline
pipeline中如何将功能代碼和業務代碼進行分離開。如何解決下面的問題 會從源碼中找到對應的代碼
- Pipeline管道中入站事件 ,以及出站事件
- handler處理器是什麼,有什麼作用?
- Pipeline 如何維護 handler 的。
- handler 的執行。
Pipeline
入站事件:
通常指I/O線程生成了入站資料。(通俗了解: 從socket底層自己往上冒上來的事件都是入站)比如EventLoop收到selector的OP_READ事件,入站處理器調用socketChannel.read(ByteBuffer) 接收到資料後,這将導緻通道的ChannelPipeline中包含的下一個中的channelRead方法被調用。
native層往應用寫資料 包括各個事件就是入站事件。
出站事件: 經常是指I/O線程執行實際的輸出操作。 (通俗了解:想主動往socket底層操作的事件的都是出站) 比如bind方法用意是請求server socket綁定到給定的SocketAddress,這将導緻通道的 ChannelPipeline中包含的下一個出站處理器中的 bind 方法被調用。
應用層往native層寫資料 連接配接關閉 寫入資料等等都是出站事件
- 在DefaultChannelPipeline中就有頭和尾 的Context 類似責任鍊中 持有的 具體的處理器實作類
- 初始化時 将連接配接起來 頭和尾連接配接起來
- 當消息過來時就會走AbstractNioByteChannel 中read 方法做包括申請多少記憶體 以及 pipeline.fireChannelRead(byteBuf); 去讀取資料内容 這裡的ByteBuf 是封裝過了ByteBuffer的一個類。
- 就是fire 開頭的都是觸發入站的事件
- 而對于出站事件則是 包括wite read bind等方法來表示的
- 源碼中對于入站和出戰事件,完全按照 入站中頭開始,出戰按尾部開始
- 繼續走下去找到讀取的方法
- 調用到下一個channelpipline方法
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
- 在pipeline 中 add 和remove則是維護連結清單上的資料
- 對于context中也存在許多的讀取方法等等
AbstractChannelHandlerContext
作為有 pipeline 以及context 事件的持有者,并在實作的子類中持有對應的handler 利用handler方法,而context就是為了維護鍊的結構而産生的
是以要對事件進行做處理,是以需要實作 ChannelInboundHandlerAdapter 或者 對應的事件即可
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("收到用戶端資料,還給用戶端:" + msg);
ctx.write(msg);
// ctx.channel().write("");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
在使用時,添加進去就行
ServerBootstrap b = new ServerBootstrap();
// 3、配置啟動器
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelDuplexHandler
這個Handler 是繼承ChannelInboundHandlerAdapter 并且實作了ChannelOutboundHandler的既能入站 也可以出站
在源碼中有很多這種handler 實作 提供給我們可以我們對入站事件和出站事件進行處理。
Netty中事件的定義
Pipeline中的handler
ChannelHandler : 用于處理 I/O 事件或攔截 I/O 操作,并轉發到 ChannelPipeline 中的下一個處理器。 這個頂級接口定義功能很弱,實際使用時會去實作下面兩大子接口: 處理入站 I/O 事件的 ChannelInboundHandler 、處理出站 I/O 操作的 ChannelOutboundHandler
擴充卡類: 為了開發友善,避免所有 handler 去實作一遍接口方法, Netty 提供了簡單的實作類: ChannelInboundHandlerAdapter 處理入站 I/O 事件 ChannelOutboundHandlerAdapter 來處理出站 I/O 操作 ChannelDuplexHandler 來支援同時處理入站和出站事件 ChannelHandlerContext : 實際存儲在 Pipeline 中的對象并非 ChannelHandler ,而是上下文對象。 将handler ,包裹在上下文對象中,通過上下文對象與它所屬的 ChannelPipeline 互動,向上或向下傳遞事件 或者修改 pipeline 都是通過上下文對象。
Pipeline中的handler
ChannelPipeline是線程安全的,ChannelHandler可以在任何時候添加或删除。
例如,你可以在即将交換敏感資訊時插入加密處理程式,并在交換後删除它。 一般操作,初始化的時候增加進去,較少删除。下面是 Pipeline 中管理 handler 的 API
Handler的執行分析
當入站事件時,執行順序是 1 、 2 、 3 、 4 、 5 當出站事件時,執行順序是 5 、 4 、 3 、 2 、 1 在這一原則之上, ChannelPipeline 在執行時會進行選擇 3 和 4 為出站處理器,是以入站事件的實際執行是 :1 、 2 、 5 1 和 2 為入站處理器,是以出站事件的實際執行是 :5 、 4 、 3 不同的入站事件會觸發 handler 不同的方法執行: 上下文對象中 fire** 開頭的方法,代表入站事件傳播和處理 其餘的方法代表出站事件的傳播和處理。
結構架構圖
構成一個channel
構成整體的圖
總結
整篇文章從責任鍊模式的一種實作方式開始,從源碼去深入了解netty怎麼使用責任鍊模式來擴充,以及使用者在管道中有一個或多個channelhandler來接收I/O事件(例如讀取)和請求I/O操作(例如寫入和關閉)。 一個典型的伺服器在每個通道的管道中都有以下處理程式,但是根據協定和業務邏輯的複雜性和特 征,可能會有所不同: 協定解碼器——将二進制資料(例如ByteBuf)轉換為Java對象。 協定編碼器——将Java對象轉換為二進制資料。 業務邏輯處理程式——執行實際的業務邏輯(例如資料庫通路)。