天天看點

【Netty學習總結】三. 核心元件

1. Channel

與NIO中的SocketChannel類似,Netty中的Channel也用于處理基本的I/O操作(連接配接,讀寫,綁定端口),但在Netty中對Channel進行了重新實作,主要實作類有:

NioSocketChannel,NioServerSocketChannel,NioDatagramChannel

通過channel類的API,可以擷取網絡的連接配接通道狀态,配置參數等

2. EventLoop和EventLoopGroup

  • EventLoop是Netty的核心抽象,用于處理連接配接的生命周期中發生的事件
  • 一個EventLoopGroup包含多個EventLoop,可以将EventLoop看作線程池的某一個線程
  • 每個EventLoop有自己的Selector,是以可能會有多個Channel被注冊到一個EventLoop上
  • 當Channel發生讀寫事件時,EventLoop可以進行相應地處理
EventLoop可通過group()方法擷取其所屬的EventLoopGroup

任務隊列:

  • EventLoop中有一任務隊列(和定時任務隊列),隊列中的任務會被另一線程異步執行,可通過execute或schedule方法向隊列中送出任務或定時任務

3. ChannelFuture與異步模型

  • Netty中所有I/O操作都是異步的,在執行I/O操作後會立即傳回一個ChannelFuture
  • 可以為ChannelFuture設定一個監聽器,當事件執行完畢後主動回調通知監聽者
ChannelFuture cf = bootstrap.connect("127.0.0.1",6666);
cf.addListener(new ChannelFutureListener(){
		@Override
		public void operationComplete(ChannelFuture channelFuture) throws Exception{
			if(channelFuture.isSuccess){
				sout("success!");
			}else{
				channelFuture.cause().printStackTrace();
			}
		}
}
           
  • 也可以使用sync()方法等待直到異步操作完成

4. ChannelHandler和ChannelPipeline

ChannelHandler和ChannelPipeline是Netty中最核心的元件,當Channel發生I/O事件需要被處理時,ChannelPipeline可以看作是對該事件進行處理的流水線,而ChannelHandler可以看作是流水線上的一道道工序。當各個工序都完成後,該事件也就被成功處理。

是以,當需要處理各種網絡事件時,隻需實作Channel的Handler,規定業務處理流程,而無需關心網絡程式設計的細節,進而大大簡化開發。

根據資料的流入流出方向,可以将ChannelHandler分為入站Handler和出站Handler:

  • 當進行入站事件處理時(如資料讀取),ChannelHandler的執行順序是從頭執行到尾。通過繼承ChannelInboundHandlerAdapter自定義入站Handler。
  • 當進行出站事件處理時(如資料發送),ChannelHandler的執行順序是從尾執行到頭。通過繼承ChannelOutboundHandlerAdapter自定義入站Handler。
常用方法(當有對應事件發生時,就會調用相關方法):
  • ChannelUnRegistered(),ChannelRegistered()
  • ChannelActive(),ChannelInActive()
  • channelRead(),channelReadComplete()
  • handlerAdded(),handlerRemoved()
  • exceptionCaught()

使用示例:

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 讀取資料方法
     * @param ctx 上下文對象,含有pipeline, channel, 位址
     * @param msg 用戶端發送的資料
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //将msg轉為ByteBuf
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("用戶端發送消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 讀取事件完畢
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将資料寫入到緩沖并重新整理,且對資料進行編碼
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, client~", CharsetUtil.UTF_8));
    }

    /**
     * 處理異常,一般需要關閉通道
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
           
注意事項
  1. 對于每一個Channel,都會為其重新生成ChannelPipeline和一組ChannelHandler,是以ChannelPipeline和ChannelHandler都是每個Channel私有的,不會發生線程安全問題
  2. ChannelInboundHandler和ChannelOutboundHandler是獨立運作的,不會互相幹擾,在所有ChannelInboundHandler執行完畢後才會去執行ChannelOutboundHandler。是以在向ChannelPipeline添加ChannelHandler時不用特意區分兩者的順序。

5. ChannelHandlerContext

ChannelHandlerContext可以看作是ChannelPipeline中的上下文資訊,其綁定了Pipeline和Channel資訊,且負責各ChannelHandler之間的互動

常用方法:

  • writeAndFlush():通過該執行個體寫入并沖刷消息并傳遞給下一個Handler進行處理(如果是Channel.wirte()則是從Pipeline中的第一Handler開始處理)
  • close():關閉該Channel