Netty進階
Netty快速入門
什麼是Netty
Netty 是一個基于 JAVA NIO 類庫的異步通信架構,它的架構特點是:異步非阻塞、基于事件驅動、高性能、高可靠性和高可定制性。
Netty應用場景
1.分布式開源架構中dubbo、Zookeeper,RocketMQ底層rpc通訊使用就是netty。
2.遊戲開發中,底層使用netty通訊。
為什麼選擇netty
在本小節,我們總結下為什麼不建議開發者直接使用JDK的NIO類庫進行開發的原因:
1) NIO的類庫和API繁雜,使用麻煩,你需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;
2) 需要具備其它的額外技能做鋪墊,例如熟悉Java多線程程式設計,因為NIO程式設計涉及到Reactor模式,你必須對多線程和網路程式設計非常熟悉,才能編寫出高品質的NIO程式;
3) 可靠性能力補齊,工作量和難度都非常大。例如用戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處 n理等等,NIO程式設計的特點是功能開發相對容易,但是可靠性能力補齊工作量和難度都非常大;
4) JDK NIO的BUG,例如臭名昭著的epoll bug,它會導緻Selector空輪詢,最終導緻CPU 100%。官方聲稱在JDK1.6版本的update18修複了該問題,但是直到JDK1.7版本該問題仍舊存在,隻不過該bug發生機率降低了一些而已,它并沒有被根本解決。該BUG以及與該BUG相關的問題單如下:
Netty伺服器端
class ServerHandler extends SimpleChannelHandler { /** * 通道關閉的時候觸發 */ @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); } /** * 必須是連接配接已經建立,關閉通道的時候才會觸發. */ @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelDisconnected(ctx, e); System.out.println("channelDisconnected"); } /** * 捕獲異常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { super.exceptionCaught(ctx, e); System.out.println("exceptionCaught"); } /** * 接受消息 */ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { super.messageReceived(ctx, e); // System.out.println("messageReceived"); System.out.println("伺服器端收到用戶端消息:"+e.getMessage()); //回複内容 ctx.getChannel().write("好的"); } } // netty 伺服器端 public class NettyServer { public static void main(String[] args) { // 建立服務類對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 建立兩個線程池 分别為監聽監聽端口 ,nio監聽 ExecutorService boos = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); // 設定工程 并把兩個線程池加入中 serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos, worker)); // 設定管道工廠 serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); //将資料轉換為string類型. pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("serverHandler", new ServerHandler()); return pipeline; } }); // 綁定端口号 serverBootstrap.bind(new InetSocketAddress(9090)); System.out.println("netty server啟動...."); } } |
Netty用戶端
package com.itmayiedu; import java.net.InetSocketAddress; import java.util.Scanner; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; class ClientHandler extends SimpleChannelHandler { /** * 通道關閉的時候觸發 */ @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); } /** * 必須是連接配接已經建立,關閉通道的時候才會觸發. */ @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelDisconnected(ctx, e); System.out.println("channelDisconnected"); } /** * 捕獲異常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { super.exceptionCaught(ctx, e); System.out.println("exceptionCaught"); } /** * 接受消息 */ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { super.messageReceived(ctx, e); // System.out.println("messageReceived"); System.out.println("伺服器端向用戶端回複内容:"+e.getMessage()); //回複内容 // ctx.getChannel().write("好的"); } } public class NettyClient { public static void main(String[] args) { System.out.println("netty client啟動..."); // 建立用戶端類 ClientBootstrap clientBootstrap = new ClientBootstrap(); // 線程池 ExecutorService boos = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos, worker)); clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); // 将資料轉換為string類型. pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("clientHandler", new ClientHandler()); return pipeline; } }); //連接配接服務端 ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 9090)); Channel channel = connect.getChannel(); System.out.println("client start"); Scanner scanner= new Scanner(System.in); while (true) { System.out.println("請輸輸入内容..."); channel.write(scanner.next()); } } } |
Maven坐标
<dependency> <groupId>io.netty</groupId> <artifactId>netty</artifactId> <version>3.3.0.Final</version> </dependency> |
Netty5.0用法
建立伺服器端
class ServerHandler extends ChannelHandlerAdapter { /** * 當通道被調用,執行該方法 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 接收資料 String value = (String) msg; System.out.println("Server msg:" + value); // 回複給用戶端 “您好!” String res = "好的..."; ctx.writeAndFlush(Unpooled.copiedBuffer(res.getBytes())); } } public class NettyServer { public static void main(String[] args) throws InterruptedException { System.out.println("伺服器端已經啟動...."); // 1.建立2個線程,一個負責接收用戶端連接配接, 一個負責進行 傳輸資料 NioEventLoopGroup pGroup = new NioEventLoopGroup(); NioEventLoopGroup cGroup = new NioEventLoopGroup(); // 2. 建立伺服器輔助類 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) // 3.設定緩沖區與發送區大小 .option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8080).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } } |
建立用戶端
class ClientHandler extends ChannelHandlerAdapter { /** * 當通道被調用,執行該方法 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 接收資料 String value = (String) msg; System.out.println("client msg:" + value); } } public class NettyClient { public static void main(String[] args) throws InterruptedException { System.out.println("用戶端已經啟動...."); // 建立負責接收用戶端連接配接 NioEventLoopGroup pGroup = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8080).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes())); // 等待用戶端端口号關閉 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); } } |
Maven坐标
<dependencies> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>1.3.19.GA</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>1.3.18.GA</version> <scope>test</scope> </dependency> </dependencies> |
TCP粘包、拆包問題解決方案
什麼是粘包/拆包
一個完整的業務可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的資料包發送,這個就是TCP的拆包和封包問題。
下面可以看一張圖,是用戶端向服務端發送包:
1. 第一種情況,Data1和Data2都分開發送到了Server端,沒有産生粘包和拆包的情況。
2. 第二種情況,Data1和Data2資料粘在了一起,打成了一個大的包發送到Server端,這個情況就是粘包。
3. 第三種情況,Data2被分離成Data2_1和Data2_2,并且Data2_1在Data1之前到達了服務端,這種情況就産生了拆包。
由于網絡的複雜性,可能資料會被分離成N多個複雜的拆包/粘包的情況,是以在做TCP伺服器的時候就需要首先解決拆包/
解決辦法
消息定長,封包大小固定長度,不夠空格補全,發送和接收方遵循相同的約定,這樣即使粘包了通過接收方程式設計實作擷取定長封包也能區分。
sc.pipeline().addLast(new FixedLengthFrameDecoder(10)); |
包尾添加特殊分隔符,例如每條封包結束都添加回車換行符(例如FTP協定)或者指定特殊字元作為封包分隔符,接收方通過特殊分隔符切分封包區分。
ByteBuf buf = Unpooled.copiedBuffer("_mayi".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); |
将消息分為消息頭和消息體,消息頭中包含表示資訊的總長度(或者消息體長度)的字段
序列化協定與自定義序列化協定
序列化定義
序列化(serialization)就是将對象序列化為二進制形式(位元組數組),一般也将序列化稱為編碼(Encode),主要用于網絡傳輸、資料持久化等;
反序列化(deserialization)則是将從網絡、磁盤等讀取的位元組數組還原成原始對象,以便後續業務的進行,一般也将反序列化稱為解碼(Decode),主要用于網絡傳輸對象的解碼,以便完成遠端調用。
序列化協定“鼻祖”
我知道的第一種序列化協定就是Java預設提供的序列化機制,需要序列化的Java對象隻需要實作 Serializable / Externalizable 接口并生成序列化ID,這個類就能夠通過 ObjectInput 和 ObjectOutput 序列化和反序列化,若對Java預設的序列化協定不了解,或是遺忘了,請參考:序列化詳解
但是Java預設提供的序列化有很多問題,主要有以下幾個缺點:
無法跨語言:我認為這對于Java序列化的發展是緻命的“失誤”,因為Java序列化後的位元組數組,其它語言無法進行反序列化。;
序列化後的碼流太大::相對于目前主流的序列化協定,Java序列化後的碼流太大;
序列化的性能差:由于Java序列化采用同步阻塞IO,相對于目前主流的序列化協定,它的效率非常差。
影響序列化性能的關鍵因素
序列化後的碼流大小(網絡帶寬的占用);
序列化的性能(CPU資源占用);
是否支援跨語言(異構系統的對接和開發語言切換)。
幾種流行的序列化協定比較
XML
(1)定義:
XML(Extensible Markup Language)是一種常用的序列化和反序列化協定, 它曆史悠久,從1998年的1.0版本被廣泛使用至今。
(2)優點
人機可讀性好
可指定元素或特性的名稱
(3)缺點
序列化資料隻包含資料本身以及類的結構,不包括類型辨別和程式集資訊。
類必須有一個将由 XmlSerializer 序列化的預設構造函數。
隻能序列化公共屬性和字段
不能序列化方法
檔案龐大,檔案格式複雜,傳輸占帶寬
(4)使用場景
當做配置檔案存儲資料
實時資料轉換
JSON
(1)定義:
JSON(JavaScript Object Notation, JS 對象标記) 是一種輕量級的資料交換格式。它基于 ECMAScript (w3c制定的js規範)的一個子集, JSON采用與程式設計語言無關的文本格式,但是也使用了類C語言(包括C, C++, C#, Java, JavaScript, Perl, Python等)的習慣,簡潔和清晰的層次結構使得 JSON 成為理想的資料交換語言。
(2)優點
前後相容性高
資料格式比較簡單,易于讀寫
序列化後資料較小,可擴充性好,相容性好
與XML相比,其協定比較簡單,解析速度比較快
(3)缺點
資料的描述性比XML差
不适合性能要求為ms級别的情況
額外空間開銷比較大
(4)适用場景(可替代XML)
跨防火牆通路
可調式性要求高的情況
基于Web browser的Ajax請求
傳輸資料量相對小,實時性要求相對低(例如秒級别)的服務
Fastjson
(1)定義
Fastjson是一個Java語言編寫的高性能功能完善的JSON庫。它采用一種“假定有序快速比對”的算法,把JSON Parse的性能提升到極緻。
(2)優點
接口簡單易用
目前java語言中最快的json庫
(3)缺點
過于注重快,而偏離了“标準”及功能性
代碼品質不高,文檔不全
(4)适用場景
協定互動
Web輸出
Android用戶端
Thrift
(1)定義:
Thrift并不僅僅是序列化協定,而是一個RPC架構。它可以讓你選擇用戶端與服務端之間傳輸通信協定的類别,即文本(text)和二進制(binary)傳輸協定, 為節約帶寬,提供傳輸效率,一般情況下使用二進制類型的傳輸協定。
(2)優點
序列化後的體積小, 速度快
支援多種語言和豐富的資料類型
對于資料字段的增删具有較強的相容性
支援二進制壓縮編碼
(3)缺點
使用者較少
跨防火牆通路時,不安全
不具有可讀性,調試代碼時相對困難
不能與其他傳輸層協定共同使用(例如HTTP)
無法支援向持久層直接讀寫資料,即不适合做資料持久化序列化協定
(4)适用場景
分布式系統的RPC解決方案
Avro
(1)定義:
Avro屬于Apache Hadoop的一個子項目。 Avro提供兩種序列化格式:JSON格式或者Binary格式。Binary格式在空間開銷和解析性能方面可以和Protobuf媲美,Avro的産生解決了JSON的冗長和沒有IDL的問題
(2)優點
支援豐富的資料類型
簡單的動态語言結合功能
具有自我描述屬性
提高了資料解析速度
快速可壓縮的二進制資料形式
可以實作遠端過程調用RPC
支援跨程式設計語言實作
(3)缺點
對于習慣于靜态類型語言的使用者不直覺
(4)适用場景
在Hadoop中做Hive、Pig和MapReduce的持久化資料格式
Protobuf
(1)定義
protocol buffers 由谷歌開源而來,在谷歌内部久經考驗。它将資料結構以.proto檔案進行描述,通過代碼生成工具可以生成對應資料結構的POJO對象和Protobuf相關的方法和屬性。
(2)優點
序列化後碼流小,性能高
結構化資料存儲格式(XML JSON等)
通過辨別字段的順序,可以實作協定的前向相容
結構化的文檔更容易管理和維護
(3)缺點
需要依賴于工具生成代碼
支援的語言相對較少,官方隻支援Java 、C++ 、Python
(4)适用場景
對性能要求高的RPC調用
具有良好的跨防火牆的通路屬性
适合應用層對象的持久化
其它
protostuff 基于protobuf協定,但不需要配置proto檔案,直接導包即
Jboss marshaling 可以直接序列化java類, 無須實java.io.Serializable接口
Message pack 一個高效的二進制序列化格式
Hessian 采用二進制協定的輕量級remoting onhttp工具
kryo 基于protobuf協定,隻支援java語言,需要注冊(Registration),然後序列化(Output),反序列化(Input)
性能對比圖解
時間
空間
分析上圖知:
XML序列化(Xstream)無論在性能和簡潔性上比較差。
Thrift與Protobuf相比在時空開銷方面都有一定的劣勢。
Protobuf和Avro在兩方面表現都非常優越。
選型建議
不同的場景适用的序列化協定:
對于公司間的系統調用,如果性能要求在100ms以上的服務,基于XML的SOAP協定是一個值得考慮的方案。
基于Web browser的Ajax,以及Mobile app與服務端之間的通訊,JSON協定是首選。對于性能要求不太高,或者以動态類型語言為主,或者傳輸資料載荷很小的的運用場景,JSON也是非常不錯的選擇。
對于調試環境比較惡劣的場景,采用JSON或XML能夠極大的提高調試效率,降低系統開發成本。
當對性能和簡潔性有極高要求的場景,Protobuf,Thrift,Avro之間具有一定的競争關系。
對于T級别的資料的持久化應用場景,Protobuf和Avro是首要選擇。如果持久化後的資料存儲在Hadoop子項目裡,Avro會是更好的選擇。
由于Avro的設計理念偏向于動态類型語言,對于動态語言為主的應用場景,Avro是更好的選擇。
對于持久層非Hadoop項目,以靜态類型語言為主的應用場景,Protobuf會更符合靜态類型語言工程師的開發習慣。
如果需要提供一個完整的RPC解決方案,Thrift是一個好的選擇。
如果序列化之後需要支援不同的傳輸層協定,或者需要跨防火牆通路的高性能場景,Protobuf可以優先考慮。
Marshalling編碼器
public final class MarshallingCodeCFactory { /** * 建立Jboss Marshalling解碼器MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 建立Jboss Marshalling編碼器MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } } |