天天看点

Netty系列6-TCP粘包拆包

(1)TCP粘包拆包现象

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

Netty系列6-TCP粘包拆包

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下5种情况。

  • 服务端分两次读取完数据,第一次读到D1,第二次读到D2,这里没有粘包和拆包;
  • 服务端一次性读取完数据,D1和D2粘合在一起,这里发生TCP粘包;
  • 服务端分两次读取完数据,第一次读取到D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这里发生TCP拆包;
  • 服务端分两次读取完数据,第一次读取到了D1包的部分内容,第二次读取到了D1包的剩余内容D1_2和D2包的整包,这里发生TCP拆包。
  • 服务端分多次读取完数据,D!和D2都背拆分为了多份,发生了多次拆包。

(2)原因

粘包:

在TCP中可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用Nagle算法对较小的数据包进行合并发送(超时或者包大小足够)。服务器在接收到消息的时候无法区分一个完整的数据包或者服务器在接收到数据后没有被及时从缓存区取走,这样也会出现一次取出多个数据包的情况,这就是粘包。

在UDP中没有粘包的现象,因为作为无连接的不可靠的传输协议,不会对数据包进行合并发送,来什么发什么。

拆包:

  • 应用程序写入数据的字节大小大于套接字发送缓冲区的大小;
  • 进行MSS大小的TCP分段。也就是TCP的报文长度超出了MSS大小。MSS=TCP报文段长度-TCP首部长度,也就是说MSS是TCP报文段中的数据字段的最大长度。
  • 以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成托干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上

(3)解决方案

在包尾增加分割符:

在netty中可以使用LineBasedFrameDecoder和DelimiterBasedFrameDecoder,前一个使用的是系统换行符,后一个使用的是自定义分隔符。

以LineBasedFrameDecoder实例:

客户端核心代码:

public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            final Bootstrap b = new Bootstrap();;
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host,LineBaseEchoServer.PORT))
                    .handler(new ChannelInitializerImp());
            ChannelFuture f = b.connect().sync();
            System.out.println("已连接到服务器.....");
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            //	加分割符:系统回车符
            ch.pipeline().addLast(new LineBasedFrameDecoder(10240));
            ch.pipeline().addLast(new LineBaseClientHandler());
        }
    }
           
public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "heiio,client"
                + System.getProperty("line.separator");
        for(int i=0;i<100;i++){
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

           

服务端核心代码:

public void start() throws InterruptedException {
        final LineBaseServerHandler serverHandler = new LineBaseServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                .channel(NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(PORT))
                .childHandler(new ChannelInitializerImp());
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new LineBasedFrameDecoder(10240));
            ch.pipeline().addLast(new LineBaseServerHandler());
        }
    }

}
           
public class LineBaseServerHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        String resp = "Hello server!"
                + System.getProperty("line.separator");
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

           

消息定长:

设定每个报文的固定大小,如果不够,补空格。 在netty中可以使用FixedLengthFrameDecoder。

客户端核心代码:

public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            final Bootstrap b = new Bootstrap();;
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host,FixedLengthEchoServer.PORT))
                    .handler(new ChannelInitializerImp());
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(
                    new FixedLengthFrameDecoder(
                            FixedLengthEchoServer.RESPONSE.length()));
            ch.pipeline().addLast(new FixedLengthClientHandler());
        }
    }

           
public class FixedLengthClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        for(int i=0;i<10;i++){
            msg = Unpooled.buffer(FixedLengthEchoClient.REQUEST.length());
            msg.writeBytes(FixedLengthEchoClient.REQUEST.getBytes());
            ctx.writeAndFlush(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

           

服务端核心代码:

public void start() throws InterruptedException {
        final FixedLengthServerHandler serverHandler = new FixedLengthServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                .channel(NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(PORT))
                .childHandler(new ChannelInitializerImp());
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            //消息定长
            ch.pipeline().addLast(
                    new FixedLengthFrameDecoder(
                            FixedLengthEchoClient.REQUEST.length()));
            ch.pipeline().addLast(new FixedLengthServerHandler());
        }
    }
           
public class FixedLengthServerHandler extends ChannelInboundHandlerAdapter {

    private AtomicInteger counter = new AtomicInteger(0);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        ctx.writeAndFlush(
                Unpooled.copiedBuffer(FixedLengthEchoServer.RESPONSE.getBytes()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

           

消息分为消息头和消息体:

将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常在消息头的第一个字段使用int来表示消息的总长度。在netty中可以使用LengthFieldBasedFrameDecoder。

LengthFieldBasedFrame构造函数:

  • maxFrameLength:包的最大长度,
  • lengthFieldOffset:指的是长度域的偏移量,表示跳过指定个数字节之后的才是长度域;
  • lengthFieldLength:数据长度的字段本身的长度;
  • lengthAdjustment:长度的一个修正值,可正可负;
  • initialBytesToStrip:从数据帧中跳过的字节数,表示得到一个完整的数据包之后,忽略多少字节,开始读取实际我要的数据
  • failFast:如果为true,则表示读取到长度域,TA的值的超过maxFrameLength,就抛出一个 TooLongFrameException,而为false表示只有当真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,默认情况下设置为true,建议不要修改,否则可能会造成内存溢出。

实际数据包长度 = 长度域中记录的数据长度 + lengthFieldOffset + lengthFieldLength + lengthAdjustment。

服务端核心代码:

public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();
        //EventLoopGroup group2 = new EpollEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group,work)
            .channel(NioServerSocketChannel.class)
                    //.channel(EpollServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                                .childOption(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                        	  ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
                                      0,4,0,4));
                            ch.pipeline().addLast(new EchoServerHandler());
                            //ch.pipeline().addLast(new EchoServerHandler2());
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
           

客户端代码:

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
            throws Exception {
        System.out.println("Client accetp:"+msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //ctx.writeAndFlush("String");
        ctx.writeAndFlush(Unpooled.copyInt(11));
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Netty",
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}