天天看點

Netty進階

Netty進階

Netty快速入門

什麼是Netty

 Netty 是一個基于 JAVA NIO 類庫的異步通信架構,它的架構特點是:異步非阻塞、基于事件驅動、高性能、高可靠性和高可定制性。

Netty應用場景

1.分布式開源架構中dubbo、Zookeeper,RocketMQ底層rpc通訊使用就是netty。

2.遊戲開發中,底層使用netty通訊。

為什麼選擇netty

在本小節,我們總結下為什麼不建議開發者直接使用JDK的NIO類庫進行開發的原因:

1)      NIO的類庫和API繁雜,使用麻煩,你需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;

2)      需要具備其它的額外技能做鋪墊,例如熟悉Java多線程程式設計,因為NIO程式設計涉及到Reactor模式,你必須對多線程和網路程式設計非常熟悉,才能編寫出高品質的NIO程式;

3)      可靠性能力補齊,工作量和難度都非常大。例如用戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處 n理等等,NIO程式設計的特點是功能開發相對容易,但是可靠性能力補齊工作量和難度都非常大;

4)      JDK NIO的BUG,例如臭名昭著的epoll bug,它會導緻Selector空輪詢,最終導緻CPU 100%。官方聲稱在JDK1.6版本的update18修複了該問題,但是直到JDK1.7版本該問題仍舊存在,隻不過該bug發生機率降低了一些而已,它并沒有被根本解決。該BUG以及與該BUG相關的問題單如下:

Netty伺服器端

class ServerHandler extends SimpleChannelHandler {

/**

 * 通道關閉的時候觸發

 */

@Override

public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

System.out.println("channelClosed");

}

/**

 * 必須是連接配接已經建立,關閉通道的時候才會觸發.

 */

@Override

public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

super.channelDisconnected(ctx, e);

System.out.println("channelDisconnected");

}

/**

 * 捕獲異常

 */

@Override

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {

super.exceptionCaught(ctx, e);

System.out.println("exceptionCaught");

}

/**

 * 接受消息

 */

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

super.messageReceived(ctx, e);

// System.out.println("messageReceived");

System.out.println("伺服器端收到用戶端消息:"+e.getMessage());

//回複内容

ctx.getChannel().write("好的");

}

}

// netty 伺服器端

public class NettyServer {

public static void main(String[] args) {

// 建立服務類對象

ServerBootstrap serverBootstrap = new ServerBootstrap();

// 建立兩個線程池 分别為監聽監聽端口 ,nio監聽

ExecutorService boos = Executors.newCachedThreadPool();

ExecutorService worker = Executors.newCachedThreadPool();

// 設定工程 并把兩個線程池加入中

serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos, worker));

// 設定管道工廠

serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

public ChannelPipeline getPipeline() throws Exception {

ChannelPipeline pipeline = Channels.pipeline();

//将資料轉換為string類型.

pipeline.addLast("decoder", new StringDecoder());

pipeline.addLast("encoder", new StringEncoder());

pipeline.addLast("serverHandler", new ServerHandler());

return pipeline;

}

});

// 綁定端口号

serverBootstrap.bind(new InetSocketAddress(9090));

System.out.println("netty server啟動....");

}

}

Netty用戶端

package com.itmayiedu;

import java.net.InetSocketAddress;

import java.util.Scanner;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;

import org.jboss.netty.channel.Channel;

import org.jboss.netty.channel.ChannelFuture;

import org.jboss.netty.channel.ChannelHandlerContext;

import org.jboss.netty.channel.ChannelPipeline;

import org.jboss.netty.channel.ChannelPipelineFactory;

import org.jboss.netty.channel.ChannelStateEvent;

import org.jboss.netty.channel.Channels;

import org.jboss.netty.channel.ExceptionEvent;

import org.jboss.netty.channel.MessageEvent;

import org.jboss.netty.channel.SimpleChannelHandler;

import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

import org.jboss.netty.handler.codec.string.StringDecoder;

import org.jboss.netty.handler.codec.string.StringEncoder;

class ClientHandler extends SimpleChannelHandler {

/**

 * 通道關閉的時候觸發

 */

@Override

public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

System.out.println("channelClosed");

}

/**

 * 必須是連接配接已經建立,關閉通道的時候才會觸發.

 */

@Override

public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

super.channelDisconnected(ctx, e);

System.out.println("channelDisconnected");

}

/**

 * 捕獲異常

 */

@Override

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {

super.exceptionCaught(ctx, e);

System.out.println("exceptionCaught");

}

/**

 * 接受消息

 */

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

super.messageReceived(ctx, e);

// System.out.println("messageReceived");

System.out.println("伺服器端向用戶端回複内容:"+e.getMessage());

//回複内容

// ctx.getChannel().write("好的");

}

}

public class NettyClient {

public static void main(String[] args) {

System.out.println("netty client啟動...");

// 建立用戶端類

ClientBootstrap clientBootstrap = new ClientBootstrap();

// 線程池

ExecutorService boos = Executors.newCachedThreadPool();

ExecutorService worker = Executors.newCachedThreadPool();

clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos, worker));

clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

public ChannelPipeline getPipeline() throws Exception {

ChannelPipeline pipeline = Channels.pipeline();

// 将資料轉換為string類型.

pipeline.addLast("decoder", new StringDecoder());

pipeline.addLast("encoder", new StringEncoder());

pipeline.addLast("clientHandler", new ClientHandler());

return pipeline;

}

});

//連接配接服務端

ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));

Channel channel = connect.getChannel();

System.out.println("client start");

Scanner scanner= new Scanner(System.in);

while (true) {

System.out.println("請輸輸入内容...");

channel.write(scanner.next());

}

}

}

Maven坐标

<dependency>

<groupId>io.netty</groupId>

<artifactId>netty</artifactId>

<version>3.3.0.Final</version>

</dependency>

Netty5.0用法

建立伺服器端

class ServerHandler extends ChannelHandlerAdapter {

/**

 * 當通道被調用,執行該方法

 */

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

// 接收資料

String value = (String) msg;

System.out.println("Server msg:" + value);

// 回複給用戶端 “您好!”

String res = "好的...";

ctx.writeAndFlush(Unpooled.copiedBuffer(res.getBytes()));

}

}

public class NettyServer {

public static void main(String[] args) throws InterruptedException {

System.out.println("伺服器端已經啟動....");

// 1.建立2個線程,一個負責接收用戶端連接配接, 一個負責進行 傳輸資料

NioEventLoopGroup pGroup = new NioEventLoopGroup();

NioEventLoopGroup cGroup = new NioEventLoopGroup();

// 2. 建立伺服器輔助類

ServerBootstrap b = new ServerBootstrap();

b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)

// 3.設定緩沖區與發送區大小

.option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024)

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel sc) throws Exception {

sc.pipeline().addLast(new StringDecoder());

sc.pipeline().addLast(new ServerHandler());

}

});

ChannelFuture cf = b.bind(8080).sync();

cf.channel().closeFuture().sync();

pGroup.shutdownGracefully();

cGroup.shutdownGracefully();

}

}

建立用戶端

class ClientHandler extends ChannelHandlerAdapter {

/**

 * 當通道被調用,執行該方法

 */

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

// 接收資料

String value = (String) msg;

System.out.println("client msg:" + value);

}

}

public class NettyClient {

public static void main(String[] args) throws InterruptedException {

System.out.println("用戶端已經啟動....");

// 建立負責接收用戶端連接配接

NioEventLoopGroup pGroup = new NioEventLoopGroup();

Bootstrap b = new Bootstrap();

b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel sc) throws Exception {

sc.pipeline().addLast(new StringDecoder());

sc.pipeline().addLast(new ClientHandler());

}

});

ChannelFuture cf = b.connect("127.0.0.1", 8080).sync();

 cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes()));

 cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes()));

// 等待用戶端端口号關閉

cf.channel().closeFuture().sync();

pGroup.shutdownGracefully();

}

}

Maven坐标

<dependencies>

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->

<dependency>

<groupId>io.netty</groupId>

<artifactId>netty-all</artifactId>

<version>5.0.0.Alpha2</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->

<dependency>

<groupId>org.jboss.marshalling</groupId>

<artifactId>jboss-marshalling</artifactId>

<version>1.3.19.GA</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->

<dependency>

<groupId>org.jboss.marshalling</groupId>

<artifactId>jboss-marshalling-serial</artifactId>

<version>1.3.18.GA</version>

<scope>test</scope>

</dependency>

</dependencies>

TCP粘包、拆包問題解決方案

什麼是粘包/拆包

   一個完整的業務可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的資料包發送,這個就是TCP的拆包和封包問題。

下面可以看一張圖,是用戶端向服務端發送包:

Netty進階

1. 第一種情況,Data1和Data2都分開發送到了Server端,沒有産生粘包和拆包的情況。

2. 第二種情況,Data1和Data2資料粘在了一起,打成了一個大的包發送到Server端,這個情況就是粘包。

3. 第三種情況,Data2被分離成Data2_1和Data2_2,并且Data2_1在Data1之前到達了服務端,這種情況就産生了拆包。

由于網絡的複雜性,可能資料會被分離成N多個複雜的拆包/粘包的情況,是以在做TCP伺服器的時候就需要首先解決拆包/

解決辦法

     消息定長,封包大小固定長度,不夠空格補全,發送和接收方遵循相同的約定,這樣即使粘包了通過接收方程式設計實作擷取定長封包也能區分。

sc.pipeline().addLast(new FixedLengthFrameDecoder(10));

包尾添加特殊分隔符,例如每條封包結束都添加回車換行符(例如FTP協定)或者指定特殊字元作為封包分隔符,接收方通過特殊分隔符切分封包區分。

ByteBuf buf = Unpooled.copiedBuffer("_mayi".getBytes());

sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));

将消息分為消息頭和消息體,消息頭中包含表示資訊的總長度(或者消息體長度)的字段

序列化協定與自定義序列化協定

序列化定義

序列化(serialization)就是将對象序列化為二進制形式(位元組數組),一般也将序列化稱為編碼(Encode),主要用于網絡傳輸、資料持久化等;

反序列化(deserialization)則是将從網絡、磁盤等讀取的位元組數組還原成原始對象,以便後續業務的進行,一般也将反序列化稱為解碼(Decode),主要用于網絡傳輸對象的解碼,以便完成遠端調用。

序列化協定“鼻祖”

我知道的第一種序列化協定就是Java預設提供的序列化機制,需要序列化的Java對象隻需要實作 Serializable / Externalizable 接口并生成序列化ID,這個類就能夠通過 ObjectInput 和 ObjectOutput 序列化和反序列化,若對Java預設的序列化協定不了解,或是遺忘了,請參考:​​序列化詳解​​

但是Java預設提供的序列化有很多問題,主要有以下幾個缺點:

無法跨語言:我認為這對于Java序列化的發展是緻命的“失誤”,因為Java序列化後的位元組數組,其它語言無法進行反序列化。;

序列化後的碼流太大::相對于目前主流的序列化協定,Java序列化後的碼流太大;

序列化的性能差:由于Java序列化采用同步阻塞IO,相對于目前主流的序列化協定,它的效率非常差。

影響序列化性能的關鍵因素

序列化後的碼流大小(網絡帶寬的占用);

序列化的性能(CPU資源占用);

是否支援跨語言(異構系統的對接和開發語言切換)。

幾種流行的序列化協定比較

XML

(1)定義:

XML(Extensible Markup Language)是一種常用的序列化和反序列化協定, 它曆史悠久,從1998年的1.0版本被廣泛使用至今。

(2)優點

人機可讀性好

可指定元素或特性的名稱

(3)缺點

序列化資料隻包含資料本身以及類的結構,不包括類型辨別和程式集資訊。

類必須有一個将由 XmlSerializer 序列化的預設構造函數。

隻能序列化公共屬性和字段

不能序列化方法

檔案龐大,檔案格式複雜,傳輸占帶寬

(4)使用場景

當做配置檔案存儲資料

實時資料轉換

JSON

(1)定義:

JSON(JavaScript Object Notation, JS 對象标記) 是一種輕量級的資料交換格式。它基于 ECMAScript (w3c制定的js規範)的一個子集, JSON采用與程式設計語言無關的文本格式,但是也使用了類C語言(包括C, C++, C#, Java, JavaScript, Perl, Python等)的習慣,簡潔和清晰的層次結構使得 JSON 成為理想的資料交換語言。

(2)優點

前後相容性高

資料格式比較簡單,易于讀寫

序列化後資料較小,可擴充性好,相容性好

與XML相比,其協定比較簡單,解析速度比較快

(3)缺點

資料的描述性比XML差

不适合性能要求為ms級别的情況

額外空間開銷比較大

(4)适用場景(可替代XML)

跨防火牆通路

可調式性要求高的情況

基于Web browser的Ajax請求

傳輸資料量相對小,實時性要求相對低(例如秒級别)的服務

Fastjson

(1)定義

Fastjson是一個Java語言編寫的高性能功能完善的JSON庫。它采用一種“假定有序快速比對”的算法,把JSON Parse的性能提升到極緻。

(2)優點

接口簡單易用

目前java語言中最快的json庫

(3)缺點

過于注重快,而偏離了“标準”及功能性

代碼品質不高,文檔不全

(4)适用場景

協定互動

Web輸出

Android用戶端

Thrift

(1)定義:

Thrift并不僅僅是序列化協定,而是一個RPC架構。它可以讓你選擇用戶端與服務端之間傳輸通信協定的類别,即文本(text)和二進制(binary)傳輸協定, 為節約帶寬,提供傳輸效率,一般情況下使用二進制類型的傳輸協定。

(2)優點

序列化後的體積小, 速度快

支援多種語言和豐富的資料類型

對于資料字段的增删具有較強的相容性

支援二進制壓縮編碼

(3)缺點

使用者較少

跨防火牆通路時,不安全

不具有可讀性,調試代碼時相對困難

不能與其他傳輸層協定共同使用(例如HTTP)

無法支援向持久層直接讀寫資料,即不适合做資料持久化序列化協定

(4)适用場景

分布式系統的RPC解決方案

Avro

(1)定義:

Avro屬于Apache Hadoop的一個子項目。 Avro提供兩種序列化格式:JSON格式或者Binary格式。Binary格式在空間開銷和解析性能方面可以和Protobuf媲美,Avro的産生解決了JSON的冗長和沒有IDL的問題

(2)優點

支援豐富的資料類型

簡單的動态語言結合功能

具有自我描述屬性

提高了資料解析速度

快速可壓縮的二進制資料形式

可以實作遠端過程調用RPC

支援跨程式設計語言實作

(3)缺點

對于習慣于靜态類型語言的使用者不直覺

(4)适用場景

在Hadoop中做Hive、Pig和MapReduce的持久化資料格式

Protobuf

(1)定義

protocol buffers 由谷歌開源而來,在谷歌内部久經考驗。它将資料結構以.proto檔案進行描述,通過代碼生成工具可以生成對應資料結構的POJO對象和Protobuf相關的方法和屬性。

(2)優點

序列化後碼流小,性能高

結構化資料存儲格式(XML JSON等)

通過辨別字段的順序,可以實作協定的前向相容

結構化的文檔更容易管理和維護

(3)缺點

需要依賴于工具生成代碼

支援的語言相對較少,官方隻支援Java 、C++ 、Python

(4)适用場景

對性能要求高的RPC調用

具有良好的跨防火牆的通路屬性

适合應用層對象的持久化

其它

protostuff 基于protobuf協定,但不需要配置proto檔案,直接導包即

Jboss marshaling 可以直接序列化java類, 無須實java.io.Serializable接口

Message pack 一個高效的二進制序列化格式

Hessian 采用二進制協定的輕量級remoting onhttp工具

kryo 基于protobuf協定,隻支援java語言,需要注冊(Registration),然後序列化(Output),反序列化(Input)

性能對比圖解

時間

Netty進階

空間

Netty進階

分析上圖知:

XML序列化(Xstream)無論在性能和簡潔性上比較差。

Thrift與Protobuf相比在時空開銷方面都有一定的劣勢。

Protobuf和Avro在兩方面表現都非常優越。

選型建議

不同的場景适用的序列化協定:

對于公司間的系統調用,如果性能要求在100ms以上的服務,基于XML的SOAP協定是一個值得考慮的方案。

基于Web browser的Ajax,以及Mobile app與服務端之間的通訊,JSON協定是首選。對于性能要求不太高,或者以動态類型語言為主,或者傳輸資料載荷很小的的運用場景,JSON也是非常不錯的選擇。

對于調試環境比較惡劣的場景,采用JSON或XML能夠極大的提高調試效率,降低系統開發成本。

當對性能和簡潔性有極高要求的場景,Protobuf,Thrift,Avro之間具有一定的競争關系。

對于T級别的資料的持久化應用場景,Protobuf和Avro是首要選擇。如果持久化後的資料存儲在Hadoop子項目裡,Avro會是更好的選擇。

由于Avro的設計理念偏向于動态類型語言,對于動态語言為主的應用場景,Avro是更好的選擇。

對于持久層非Hadoop項目,以靜态類型語言為主的應用場景,Protobuf會更符合靜态類型語言工程師的開發習慣。

如果需要提供一個完整的RPC解決方案,Thrift是一個好的選擇。

如果序列化之後需要支援不同的傳輸層協定,或者需要跨防火牆通路的高性能場景,Protobuf可以優先考慮。

Marshalling編碼器

public final class MarshallingCodeCFactory {

/**

 * 建立Jboss Marshalling解碼器MarshallingDecoder

 */

public static MarshallingDecoder buildMarshallingDecoder() {

final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");

final MarshallingConfiguration configuration = new MarshallingConfiguration();

configuration.setVersion(5);

UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);

MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);

return decoder;

}

/**

 * 建立Jboss Marshalling編碼器MarshallingEncoder

 */

public static MarshallingEncoder buildMarshallingEncoder() {

final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");

final MarshallingConfiguration configuration = new MarshallingConfiguration();

configuration.setVersion(5);

MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);

MarshallingEncoder encoder = new MarshallingEncoder(provider);

return encoder;

}

}