天天看點

粘包和拆包及Netty解決方案

在RPC架構中,粘包和拆包問題是必須解決一個問題,因為RPC架構中,各個微服務互相之間都是維系了一個TCP長連接配接,比如dubbo就是一個全雙工的長連接配接。由于微服務往對方發送資訊的時候,所有的請求都是使用的同一個連接配接,這樣就會産生粘包和拆包的問題。本文首先會對粘包和拆包問題進行描述,然後介紹其常用的解決方案,最後會對Netty提供的幾種解決方案進行講解。

1. 粘包和拆包

産生粘包和拆包問題的主要原因是,作業系統在發送TCP資料的時候,底層會有一個緩沖區,例如1024個位元組大小,如果一次請求發送的資料量比較小,沒達到緩沖區大小,TCP則會将多個請求合并為同一個請求進行發送,這就形成了粘包問題;如果一次請求發送的資料量比較大,超過了緩沖區大小,TCP就會将其拆分為多次發送,這就是拆包,也就是将一個大的包拆分為多個小包進行發送。如下圖展示了粘包和拆包的一個示意圖:

上圖中示範了粘包和拆包的三種情況:

  • A和B兩個包都剛好滿足TCP緩沖區的大小,或者說其等待時間已經達到TCP等待時長,進而還是使用兩個獨立的包進行發送;
  • A和B兩次請求間隔時間内較短,并且資料包較小,因而合并為同一個包發送給服務端;
  • B包比較大,因而将其拆分為兩個包B_1和B_2進行發送,而這裡由于拆分後的B_2比較小,其又與A包合并在一起發送。

2. 常見解決方案

對于粘包和拆包問題,常見的解決方案有四種:

  • 用戶端在發送資料包的時候,每個包都固定長度,比如1024個位元組大小,如果用戶端發送的資料長度不足1024個位元組,則通過補充空格的方式補全到指定長度;
  • 用戶端在每個包的末尾使用固定的分隔符,例如\r\n,如果一個包被拆分了,則等待下一個包發送過來之後找到其中的\r\n,然後對其拆分後的頭部部分與前一個包的剩餘部分進行合并,這樣就得到了一個完整的包;
  • 将消息分為頭部和消息體,在頭部中儲存有目前整個消息的長度,隻有在讀取到足夠長度的消息之後才算是讀到了一個完整的消息;
  • 通過自定義協定進行粘包和拆包的處理。

3. Netty提供的粘包拆包解決方案

1) FixedLengthFrameDecoder

對于使用固定長度的粘包和拆包場景,可以使用FixedLengthFrameDecoder,該解碼器會每次讀取固定長度的消息,如果目前讀取到的消息不足指定長度,那麼就會等待下一個消息到達後進行補足。其使用也比較簡單,隻需要在構造函數中指定每個消息的長度即可。這裡需要注意的是,FixedLengthFrameDecoder隻是一個解碼器,Netty也隻提供了一個解碼器,這是因為對于解碼是需要等待下一個包的進行補全的,代碼相對複雜,而對于編碼器,使用者可以自行編寫,因為編碼時隻需要将不足指定長度的部分進行補全即可。下面的示例中展示了如何使用FixedLengthFrameDecoder來進行粘包和拆包處理:

public class EchoServer {

  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 這裡将FixedLengthFrameDecoder添加到pipeline中,指定長度為20
            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
            // 将前一步解碼得到的資料轉碼為字元串
            ch.pipeline().addLast(new StringDecoder());
            // 這裡FixedLengthFrameEncoder是我們自定義的,用于将長度不足20的消息進行補全空格
            ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
            // 最終的資料處理
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });

      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new EchoServer().bind(8080);
  }
}
           

上面的pipeline中,對于入棧資料,這裡主要添加了FixedLengthFrameDecoder和StringDecoder,前面一個用于處理固定長度的消息的粘包和拆包問題,第二個則是将處理之後的消息轉換為字元串。最後由EchoServerHandler處理最終得到的資料,處理完成後,将處理得到的資料交由FixedLengthFrameEncoder處理,該編碼器是我們自定義的實作,主要作用是将長度不足20的消息進行空格補全。下面是FixedLengthFrameEncoder的實作代碼:

public class FixedLengthFrameEncoder extends MessageToByteEncoder<String> {
  private int length;

  public FixedLengthFrameEncoder(int length) {
    this.length = length;
  }

  @Override
  protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
      throws Exception {
    // 對于超過指定長度的消息,這裡直接抛出異常
    if (msg.length() > length) {
      throw new UnsupportedOperationException(
          "message length is too large, it's limited " + length);
    }

    // 如果長度不足,則進行補全
    if (msg.length() < length) {
      msg = addSpace(msg);
    }

    ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));
  }

  // 進行空格補全
  private String addSpace(String msg) {
    StringBuilder builder = new StringBuilder(msg);
    for (int i = 0; i < length - msg.length(); i++) {
      builder.append(" ");
    }

    return builder.toString();
  }
}
           

這裡FixedLengthFrameEncoder實作了decode()方法,在該方法中,主要是将消息長度不足20的消息進行空格補全。EchoServerHandler的作用主要是列印接收到的消息,然後發送響應給用戶端:

public class EchoServerHandler extends SimpleChannelInboundHandler<String> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("server receives message: " + msg.trim());
    ctx.writeAndFlush("hello client!");
  }
}
           

對于用戶端,其實作方式基本與服務端的使用方式類似,隻是在最後進行消息發送的時候與服務端的處理方式不同。如下是用戶端EchoClient的代碼:

public class EchoClient {

  public void connect(String host, int port) throws InterruptedException {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.group(group)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 對服務端發送的消息進行粘包和拆包處理,由于服務端發送的消息已經進行了空格補全,
            // 并且長度為20,因而這裡指定的長度也為20
            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
            // 将粘包和拆包處理得到的消息轉換為字元串
            ch.pipeline().addLast(new StringDecoder());
            // 對用戶端發送的消息進行空格補全,保證其長度為20
            ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
            // 用戶端發送消息給服務端,并且處理服務端響應的消息
            ch.pipeline().addLast(new EchoClientHandler());
          }
        });

      ChannelFuture future = bootstrap.connect(host, port).sync();
      future.channel().closeFuture().sync();
    } finally {
      group.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new EchoClient().connect("127.0.0.1", 8080);
  }
}
           

對于用戶端而言,其消息的處理流程其實與服務端是相似的,對于入站消息,需要對其進行粘包和拆包處理,然後将其轉碼為字元串,對于出站消息,則需要将長度不足20的消息進行空格補全。用戶端與服務端處理的主要差別在于最後的消息處理handler不一樣,也即這裡的EchoClientHandler,如下是該handler的源碼:

public class EchoClientHandler extends SimpleChannelInboundHandler<String> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("client receives message: " + msg.trim());
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("hello server!");
  }
}
           

這裡用戶端的處理主要是重寫了channelActive()和channelRead0()兩個方法,這兩個方法的主要作用在于,channelActive()會在用戶端連接配接上伺服器時執行,也就是說,其連上伺服器之後就會往伺服器發送消息。而channelRead0()主要是在伺服器發送響應給用戶端時執行,這裡主要是列印伺服器的響應消息。對于服務端而言,前面我們我們可以看到,EchoServerHandler隻重寫了channelRead0()方法,這是因為伺服器隻需要等待用戶端發送消息過來,然後在該方法中進行處理,處理完成後直接将響應發送給用戶端。如下是分别啟動服務端和用戶端之後控制台列印的資料:

// server
server receives message: hello server!
           
// client
client receives message: hello client!
           

2) LineBasedFrameDecoder與DelimiterBasedFrameDecoder

對于通過分隔符進行粘包和拆包問題的處理,Netty提供了兩個編解碼的類,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。這裡LineBasedFrameDecoder的作用主要是通過換行符,即\n或者\r\n對資料進行處理;而DelimiterBasedFrameDecoder的作用則是通過使用者指定的分隔符對資料進行粘包和拆包處理。同樣的,這兩個類都是解碼器類,而對于資料的編碼,也即在每個資料包最後添加換行符或者指定分割符的部分需要使用者自行進行處理。這裡以DelimiterBasedFrameDecoder為例進行講解,如下是EchoServer中使用該類的代碼片段,其餘部分與前面的例子中的完全一緻:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    String delimiter = "_$";
    // 将delimiter設定到DelimiterBasedFrameDecoder中,經過該解碼器進行處理之後,源資料将會
    // 被按照_$進行分隔,這裡1024指的是分隔的最大長度,即當讀取到1024個位元組的資料之後,若還是未
    // 讀取到分隔符,則舍棄目前資料段,因為其很有可能是由于碼流紊亂造成的
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
        Unpooled.wrappedBuffer(delimiter.getBytes())));
    // 将分隔之後的位元組資料轉換為字元串資料
    ch.pipeline().addLast(new StringDecoder());
    // 這是我們自定義的一個編碼器,主要作用是在傳回的響應資料最後添加分隔符
    ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
    // 最終處理資料并且傳回響應的handler
    ch.pipeline().addLast(new EchoServerHandler());
}
           

上面pipeline的設定中,添加的解碼器主要有DelimiterBasedFrameDecoder和StringDecoder,經過這兩個處理器處理之後,接收到的位元組流就會被分隔,并且轉換為字元串資料,最終交由EchoServerHandler處理。這裡DelimiterBasedFrameEncoder是我們自定義的編碼器,其主要作用是在傳回的響應資料之後添加分隔符。如下是該編碼器的源碼:

public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {

  private String delimiter;

  public DelimiterBasedFrameEncoder(String delimiter) {
    this.delimiter = delimiter;
  }

  @Override
  protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) 
      throws Exception {
    // 在響應的資料後面添加分隔符
    ctx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes()));
  }
}
           

對于用戶端而言,這裡的處理方式與服務端類似,其pipeline的添加方式如下:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    String delimiter = "_$";
    // 對服務端傳回的消息通過_$進行分隔,并且每次查找的最大大小為1024位元組
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, 
        Unpooled.wrappedBuffer(delimiter.getBytes())));
    // 将分隔之後的位元組資料轉換為字元串
    ch.pipeline().addLast(new StringDecoder());
    // 對用戶端發送的資料進行編碼,這裡主要是在用戶端發送的資料最後添加分隔符
    ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
    // 用戶端發送資料給服務端,并且處理從服務端響應的資料
    ch.pipeline().addLast(new EchoClientHandler());
}
           

這裡用戶端的處理方式與服務端基本一緻,關于這裡沒展示的代碼,其與示例一中的代碼完全一緻,這裡則不予展示。

3) LengthFieldBasedFrameDecoder與LengthFieldPrepender

這裡LengthFieldBasedFrameDecoder與LengthFieldPrepender需要配合起來使用,其實本質上來講,這兩者一個是解碼,一個是編碼的關系。它們處理粘拆包的主要思想是在生成的資料包中添加一個長度字段,用于記錄目前資料包的長度。LengthFieldBasedFrameDecoder會按照參數指定的包長度偏移量資料對接收到的資料進行解碼,進而得到目标消息體資料;而LengthFieldPrepender則會在響應的資料前面添加指定的位元組資料,這個位元組資料中儲存了目前消息體的整體位元組資料長度。LengthFieldBasedFrameDecoder的解碼過程如下圖所示:

關于LengthFieldBasedFrameDecoder,這裡需要對其構造函數參數進行介紹:

  • maxFrameLength:指定了每個包所能傳遞的最大資料包大小;
  • lengthFieldOffset:指定了長度字段在位元組碼中的偏移量;
  • lengthFieldLength:指定了長度字段所占用的位元組長度;
  • lengthAdjustment:對一些不僅包含有消息頭和消息體的資料進行消息頭的長度的調整,這樣就可以隻得到消息體的資料,這裡的lengthAdjustment指定的就是消息頭的長度;
  • initialBytesToStrip:對于長度字段在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長度字段占用的位元組。

這裡我們以json序列化為例對LengthFieldBasedFrameDecoder和LengthFieldPrepender的使用方式進行講解。如下是EchoServer的源碼:

public class EchoServer {

  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 這裡将LengthFieldBasedFrameDecoder添加到pipeline的首位,因為其需要對接收到的資料
            // 進行長度字段解碼,這裡也會對資料進行粘包和拆包處理
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
            // LengthFieldPrepender是一個編碼器,主要是在響應位元組資料前面添加位元組長度字段
            ch.pipeline().addLast(new LengthFieldPrepender(2));
            // 對經過粘包和拆包處理之後的資料進行json反序列化,進而得到User對象
            ch.pipeline().addLast(new JsonDecoder());
            // 對響應資料進行編碼,主要是将User對象序列化為json
            ch.pipeline().addLast(new JsonEncoder());
            // 處理用戶端的請求的資料,并且進行響應
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });

      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new EchoServer().bind(8080);
  }
}
           

這裡EchoServer主要是在pipeline中添加了兩個編碼器和兩個解碼器,編碼器主要是負責将響應的User對象序列化為json對象,然後在其位元組數組前面添加一個長度字段的位元組數組;解碼器主要是對接收到的資料進行長度字段的解碼,然後将其反序列化為一個User對象。下面是JsonDecoder的源碼:

public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) 
      throws Exception {
    byte[] bytes = new byte[buf.readableBytes()];
    buf.readBytes(bytes);
    User user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), User.class);
    out.add(user);
  }
}
           

JsonDecoder首先從接收到的資料流中讀取位元組數組,然後将其反序列化為一個User對象。下面我們看看JsonEncoder的源碼:

public class JsonEncoder extends MessageToByteEncoder<User> {

  @Override
  protected void encode(ChannelHandlerContext ctx, User user, ByteBuf buf)
      throws Exception {
    String json = JSON.toJSONString(user);
    ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));
  }
}
           

JsonEncoder将響應得到的User對象轉換為一個json對象,然後寫入響應中。對于EchoServerHandler,其主要作用就是接收用戶端資料,并且進行響應,如下是其源碼:

public class EchoServerHandler extends SimpleChannelInboundHandler<User> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
    System.out.println("receive from client: " + user);
    ctx.write(user);
  }
}
           

對于用戶端,其主要邏輯與服務端的基本類似,這裡主要展示其pipeline的添加方式,以及最後發送請求,并且對伺服器響應進行處理的過程:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
    ch.pipeline().addLast(new LengthFieldPrepender(2));
    ch.pipeline().addLast(new JsonDecoder());
    ch.pipeline().addLast(new JsonEncoder());
    ch.pipeline().addLast(new EchoClientHandler());
}
           
public class EchoClientHandler extends SimpleChannelInboundHandler<User> {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.write(getUser());
  }

  private User getUser() {
    User user = new User();
    user.setAge(27);
    user.setName("zhangxufeng");
    return user;
  }

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
    System.out.println("receive message from server: " + user);
  }
}
           

這裡用戶端首先會在連接配接上伺服器時,往伺服器發送一個User對象資料,然後在接收到伺服器響應之後,會列印伺服器響應的資料。

4) 自定義粘包與拆包器

對于粘包與拆包問題,其實前面三種基本上已經能夠滿足大多數情形了,但是對于一些更加複雜的協定,可能有一些定制化的需求。對于這些場景,其實本質上,我們也不需要手動從頭開始寫一份粘包與拆包處理器,而是通過繼承LengthFieldBasedFrameDecoder和LengthFieldPrepender來實作粘包和拆包的處理。

如果使用者确實需要不通過繼承的方式實作自己的粘包和拆包處理器,這裡可以通過實作MessageToByteEncoder和ByteToMessageDecoder來實作。這裡MessageToByteEncoder的作用是将響應資料編碼為一個ByteBuf對象,而ByteToMessageDecoder則是将接收到的ByteBuf資料轉換為某個對象資料。通過實作這兩個抽象類,使用者就可以達到實作自定義粘包和拆包處理的目的。如下是這兩個類及其抽象方法的聲明:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) 
        throws Exception;
}
           
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) 
        throws Exception;
}           

每天一點成長,歡迎指正!