天天看点

Netty详解(五):Netty TCP粘包 拆包1. 概述2. TCP底层的粘包和拆包机制3. Netty提供半包解码器来解决TCP粘包/拆包问题4. 总结

1. 概述

无论是服务端还是客户端,我们读取或者发送消息的时候,都需要考虑TCP底层的粘包和拆包机制。下面我们来通过Netty来详解TCP底层的粘包和拆包机制。

2. TCP底层的粘包和拆包机制

TCP是一个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的水流,它们是连城有一片的,期间没有界限。TCP底层并不了解上层业务数据的具体含义,他会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的的数据包进行发送,这就是所谓的TCP的粘包和拆包机制。

2.1 TCP粘包和拆包问题说明

假设客户端分别发送了两个数据包D1 和D2给服务商厦 ,由于服务端一次读取到的字节数是不确定的,故可能存在4种情况:

  1. 服务端分两次读取到了两个独立的数据包,分别是D1 和 D2,没有粘包和拆包
  2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包
  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称为TCP拆包
  4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。
  5. 如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很可能会发生第5种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包
Netty详解(五):Netty TCP粘包 拆包1. 概述2. TCP底层的粘包和拆包机制3. Netty提供半包解码器来解决TCP粘包/拆包问题4. 总结

2.2 TCP粘包/拆包发生的原因

问题产生的原因有三个,分别如下:

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小
  2. 0进行MSS大小的TCP分段
  3. 以太网帧的payload大于MTU进行IP分片
  • MSS:TCP传输层(传输帧)最大报文段长度。Maxitum Segment Size最大分段大小。为了达到最佳的传输效能TCP协议在建立连接的时候通常要协商双方的MSS值,这个值TCP协议在实现的时候往往用MTU值代替,值往往为1460.IPV6中通常是1440
  • MTU:Maxitum Transmission Unit最大传输单元。这个最大传输单元实际上和链路层协议有着密切的关系,EthernetII 帧的结构DMAC+SMAC+Type+Data+CRC。由于以太网传输限制,每个以太网帧都有最小的大小64bytes,最大不能超过1518bytes,对于小于或大于这个限制的以太网帧我们都可以视之为错误的数据帧,一般的以太网转发设备会丢弃这些数据帧。
Netty详解(五):Netty TCP粘包 拆包1. 概述2. TCP底层的粘包和拆包机制3. Netty提供半包解码器来解决TCP粘包/拆包问题4. 总结

2.3 粘包问题的解决策略

底层的TCP无法理解上层的业务数据,需要在上层的应用协议栈调来来解决。

  1. 消息定义,例如每个报文的长度大小固定200字节,如果不够,空格补空位。
  2. 在包尾增加回车换行符,如FTP协议
  3. 将消息分成消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度
  4. 更复杂的应用层协议

3. Netty提供半包解码器来解决TCP粘包/拆包问题

3.1 LineBasedFrameDecoder

LineBasedFrameDecoder 是依次遍历ByteBuf中的可读字节,判断看是否有\n 或 \r\n,如果有,就以此位置为结束位置,以换行符为结束标志的解码器。它支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

StringDecoder的功能非常简单,就是将接受到的对象转换成字符串,然后继续调用后面的Handler。LineBasedFrameDecoder+StringDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。

在ChannelInitializer类中添加LineBasedFrameDecoder+StringDecoder

EventLoopGroup group=new NioEventLoopGroup();
        try{
            Bootstrap b=new Bootstrap();
            //Channel需要设置为NioSocketChannel,然后为其添加Handler
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY,true)
            .handler(new ChannelInitializer<SocketChannel>(){
                //为了简单直接创建匿名内部类,实现initChannel方法
                //其作用是当创建NioSocketChannel成功之后,在进行初始化时,
                //将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件
                @Override
                public void initChannel(SocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            //发起异步连接,然后调用同步方法等待连接成功
            ChannelFuture f=b.connect(host,port).sync();
            //当客户端连接关闭之后,客户端主函数退出,退出前释放NIO线程组的资源
            f.channel().closeFuture().sync();
        }finally{

        }
           

TimeServerHandler.java 关键代码

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
	    throws Exception {
		System.out.println("channelRead start");
		ByteBuf buf = (ByteBuf) msg;
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "UTF-8");
		System.out.println("The time server receive order : " + body);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
				System.currentTimeMillis()).toString() : "BAD ORDER";
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		ctx.write(resp);
		System.out.println("channelRead end");
	}

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		System.out.println("channelReadComplete start");
		ctx.flush();
		System.out.println("channelReadComplete end");
    }
           

TimeClientHandler.java 关键代码

/**
     * Creates a client-side handler.
     */
    public TimeClientHandler() {
	req = ("QUERY TIME ORDER" + System.getProperty("line.separator"))
		.getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
	ByteBuf message = null;
	for (int i = 0; i < 100; i++) {
	    message = Unpooled.buffer(req.length);
	    message.writeBytes(req);
	    ctx.writeAndFlush(message);
	}
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
	    throws Exception {
	String body = (String) msg;
	System.out.println("Now is : " + body + " ; the counter is : "
		+ ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
	// 释放资源
	logger.warning("Unexpected exception from downstream : "
		+ cause.getMessage());
	ctx.close();
    }
           

3.2 DelimiterBasedFrameDecoder

以分隔符作为码流结束标识的消息的解码。示例:Echo服务,以$_作为分隔符。

EchoServer.java关键代码

@Override
public void initChannel(SocketChannel ch) throws Exception{
    //创建分隔符缓冲对象ByteBuf,以$_为分隔符
   ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
   //1024表示单条消息的最大长度,当达到该长度后仍然没有查找到分隔符
   //就抛出TooLongFrameException异常
   //第二个参数是分隔符缓冲对象
   new DelimiterBasedFrameDecoder(1024,delimiter));  //后续的ChannelHandler接收到的msg对象将会是完整的消息包
   ch.pipeline().addLast(new StringDecoder()); //将ByteBuf解码成字符串对象 
   ch.pipeline().addLast(new EchoServerHandler());  //接收到的msg消息就是解码后的字符串对象
}
           

DelimiterBasedFrameDecoder 有多个构造方法,这里我们传递两个参数:第一个1024表示单条最大长度,当达到该长度之后仍然没有找到分隔符,就抛出TooLongFrameException异常,防止由于异常流缺失分隔符导致的内存溢出,这就是Netty解码器的可靠性保证。第二个参数就是分隔符缓冲对象。

EchoServerHandler.java 关键代码

@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
   String body=(String)msg;
   System.out.println("This is " + ++counter + " times receive client : [" + body + "]");
   body+="$_"; //$_已被过滤掉了,所以这里要拼接上
   ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
   ctx.writeAndFlush(echo);
}
           

由于DelimiterBasedFrameDecoder自动对请求消息进行了编码,后续的ChannelHandler接受到的msg对象就是个完整的消息包;第二个ChannelHandler是StringDecoder,它将ByteBuffer解码成字符串对象;第三个EchoServerHandler接受到的msg消息就是解码后的字符串对象。

EchoClient.java 关键代码

@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
   String body=(String)msg;
   System.out.println("This is " + ++counter + " times receive client : [" + body + "]");
   body+="$_"; //$_已被过滤掉了,所以这里要拼接上
   ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
   ctx.writeAndFlush(echo);
}
           

EchoClientHandler.java 关键代码

@Override
public void channelActive(ChannelHandlerContext ctx){
   for(int i=0;i<10;i++){
      ctx.writeAndFlush(Unpooled.copiedBuffer(ECHOREQ.getBytes()));
   }
}
           

3.3 FixedLengthFrameDecoder

FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题。

EchoServer.java 关键代码

@Override
public void initChannel (SocketChannel ch) throws Exception{
   ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
   ch.pipeline().addLast(new StringDecoder());
   ch.pipeline().addLast(new EchoServerHandler()));
}
           

在服务端的ChannelPipeline中新增FixedLengthFrameDecoder,长度设置为20,然后再依次增加字符串解码器和EchoHandler。

EchoServerHandler.java 关键代码

@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
   System.out.println("Receive client : [" + msg + "]");
}
           

利用FixedLengthFrameDecoder解码器,无论一次接收到多少数据报,他都会按照构造函数中设置的固定长度间解码,如果是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下一个包到达后进行拼包,直到读取到一个完整的包。

4. 总结

DelimiterBasedFrameDecoder 用于对使用分隔符结尾的消息间自动解码,FixedLengthFrameDecoder用于对固定长度的消息进行自动解码。有了上述两种解码器,再结合其他的解码器,如字符串解码器等,可以轻松完成对很多消息的自动解码,而且不需要考虑TCP粘包和拆包问题。