天天看點

Netty解決TCP粘包/拆包的問題

什麼是TCP粘包/拆包

 首先要明确, 粘包問題中的 “包”, 是指應用層的資料包.在TCP的協定頭中, 沒有如同UDP一樣的 “封包長度” 字段,但是有一個序号字段.

 站在傳輸層的角度, TCP是一個一個封包傳過來的. 按照序号排好序放在緩沖區中.

 站在應用層的角度, 看到的隻是一串連續的位元組資料.那麼應用程式看到了這一連串的位元組資料, 就不知道從哪個部分開始到哪個部分是一個完整的應用層資料包.此時資料之間就沒有了邊界, 就産生了粘包問題,那麼如何避免粘包問題呢?歸根結底就是一句話, 明确兩個包之間的邊界

Netty解決TCP粘包/拆包的問題

 如圖所示,假設用戶端分别發送兩個資料包D1和D2給伺服器端,由于伺服器端一次讀取到的位元組數是不确定的,是以可能存在以下幾種情況:

   服務端分兩次讀取到了兩個獨立的資料包,分别是D1和D2,這種情況沒有粘包和拆包

   服務端一次接收到了兩個資料包,D1和D2粘合在一起,被稱為TCP粘包

   服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的D2包和D1包的部分内容,第二次讀取到了D1包剩餘的内容,這被稱為TCP拆包

   和第3中情況相反,也是拆包

   如果服務端的TCP接收滑窗非常小,而資料包D1和D2比較大,那麼伺服器要分多次才能将D1和D2完全接收完,期間發生了多次拆包

未考慮TCP粘包案例

 上面我們介紹了TCP粘包和拆包的原因,現在我們通過Netty案例來實作下不考慮TCP粘包和拆包問題而造成的影響。代碼如下

服務端代碼

 服務端每讀取到一條消息,就計數一次,然後發送應答消息給用戶端,按照設計,服務端接受到的消息總數應該跟用戶端發送消息總數相同,而且請求消息删除回車換行符後應該為"Query time order".

package com.dpb.netty.demo1;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimerServer {

    public void bind(int port) throws Exception{
        // 配置服務端的NIO線程組
        // 服務端接受用戶端的連接配接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        // 進行SocketChannel的網絡讀寫
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            // 用于啟動NIO服務端的輔助啟動類,目的是降低服務端的開發複雜度
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup)
            // 設定Channel
            .channel(NioServerSocketChannel.class)
            // 設定TCP參數
            .option(ChannelOption.SO_BACKLOG, 1024)
            // 綁定I/O事件的處理類ChildChannelHandler
            .childHandler(new ChildChannelHandler());
        
        // 綁定端口,同步等待成功
        ChannelFuture f = b.bind(port).sync();
        // 等待服務端監聽端口關閉
        f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
        
    }
    
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            arg0.pipeline().addLast(new TimerServerHandler());
            
        }
        
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimerServer().bind(port);
    }
}
      
package com.dpb.netty.demo1;

import java.nio.ByteBuffer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
 * 用于對網絡事件進行讀寫操作
 * @author 波波烤鴨
 * @email [email protected]
 *
 */
public class TimerServerHandler extends ChannelHandlerAdapter{

    private int counter;
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // ByteBuf 類似于NIO中的java.nio.ByteBuffer對象,
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req,"utf-8")
                .substring(0,req.length-System.getProperty("line.separator").length());
        System.out.println("The time server receive order : "+body+",counter is:"+ ++counter);
        String currentTime = "Query time order".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // TODO Auto-generated method stub
        ctx.close();
    }
}      

用戶端代碼

 用戶端跟伺服器連接配接建立成功後,循環發送100條消息,每發送一條就重新整理一次,保證每條消息都會被寫入Channel中,按照設計,服務端應該接收到100條查詢時間指令的請求消息,用戶端每接收到服務端一條應答消息後,就列印一次計數器,按照設計用戶端應該列印100次服務端的系統時間。

package com.dpb.netty.demo1;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeClient().connect(port, "127.0.0.1");
    }

    public void connect(int port,String host)throws Exception{
        // 配置用戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel arg0) throws Exception {
                        arg0.pipeline().addLast(new TimeClientHandler());
                    }
                    
                });
            // 發起異步連接配接操作
            ChannelFuture f = b.connect(host,port).sync();
            // 等待用戶端鍊路關閉
            f.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // 優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }
}      
package com.dpb.netty.demo1;

import java.util.logging.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeClientHandler extends ChannelHandlerAdapter{

    private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
    
    private int counter;
    
    private byte[] req;
    
    
    public TimeClientHandler(){
        // 用戶端每次發送的消息後面都跟了一個換行符
        req = ("Query time order"+System.getProperty("line.separator")).getBytes();
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for(int i =0;i < 100 ; i++){
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req  = new  byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req,"UTF-8");
        System.out.println("Now is :"+body+";counter is :"+ ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // TODO Auto-generated method stub
        logger.warning("Unexpected exception from downstream:"+cause.getMessage());
        ctx.close();
    }
}      

測試結果

服務端輸出結果:

The time server receive order : Query time order
Query time order
... 省略55
Query time ord,counter is:1
The time server receive order : 
Query time order
... 省略41
Query time order,counter is:2      

用戶端輸出結果

Now is :BAD ORDERBAD BAD ORDERBAD ;counter is :1      

 服務端的運作結果表明它隻接受到了兩條消息,第一條包含57個"Query time order"指令,第二條包含43條"Query time order"指令,總數剛好是100條,我們期待伺服器會接收到100次,結果隻接受到了兩次,說明發送了TCP粘包。而用戶端設計應該受到100條響應,實際伺服器發送了兩次響應,用戶端隻受到了一條響應,說明伺服器傳回給用戶端的應答資訊也發生了粘包問題。

Netty解決TCP粘包

 為了解決TCP粘包/拆包導緻的半包讀寫問題,Netty預設提供了多種編解碼器用于處理半包,此處我們使用LineBasedFrameDecoder來解決,實作如下

服務端修改

Netty解決TCP粘包/拆包的問題

用戶端修改

Netty解決TCP粘包/拆包的問題
Netty解決TCP粘包/拆包的問題

服務端輸出

Netty解決TCP粘包/拆包的問題

用戶端輸出

Netty解決TCP粘包/拆包的問題

 程式的運作結果完全符合我們的預期,說明通過LineBasedFrameDecoder和StringDecoder成功解決了TCP粘包導緻的讀半包問題,對于使用者來說,隻要将支援半包解碼的Handler添加到ChannelPiPeline中即可,不需要額外的代碼,使用者使用起來非常簡單。

LineBasedFrameDecoder和StringDecoder的原理

 的工作原理是依次周遊ByteBuf中的可讀位元組,判斷看是否有"\n"或者"\r\n",如果有就以此位置為結束位置,從可讀索引到結束位置區間的位元組就組成了一行,它是以換行符為結束标志的解碼器,StringDecoder的功能非常簡單,就是将接收到的對象轉換成字元串,然後繼續調用後面的Handler, LineBasedFrameDecoder + StringDecoder組合就是按行切換的文本解碼器,它被設計用來支援TCP的粘包和拆包問題。

Netty解決TCP粘包/拆包的問題