天天看點

Tcp網絡通訊詳解二(解決分包粘包)解決分包粘包

解決分包粘包

系統緩沖區

要想知道為什麼在Tcp通訊中會存在分包粘包的現象,首先你必須先了解Tcp網絡通訊的消息傳播機制,而系統緩沖區将是不得不講的一個話題,那麼什麼是系統緩沖區呢?其實就是接到對端資訊資料的時候,作業系統會将資料存入到Socket的接收緩沖區中,而在這一段時間,系統緩沖區完全是由作業系統進行操作,程式并不能直接操作它們,隻能通過Socket.Receive();Socket.Send等方法來間接進行操作。其中,Socket.Receive()方法隻是把接收緩沖區的資料提取出來, 比如調用Receive(readBuff,0,2) , 接收2個位元組的資料到了使用者緩沖區readbuff,當系統的接收緩沖區為空, Receive方法會被阻塞, 直到裡面有資料。同樣地, Socket的Send方法隻是把資料寫入到發送緩沖區裡, 具體的發送過程由作業系統負責。當作業系統的發送緩沖區滿了, Send方法将會阻塞。

粘包半包現象

粘包

  • 如果發送端快速發送多條資料,接收端沒有及時調用Receive,那麼資料便會在接收端的緩沖區中累積。用戶端先發送“ 1、2、3、4"四個位元組的資料,緊接看又發送“ 5、6、7、8"四個位元組的資料。等到服務端調用Receive時,服務端作業系統巳經将接收到的資料全部寫入緩沖區,共接收到8個資料。這樣一來,明明對方發送的是兩條消息,但卻當成了一條資料進行處理,明顯與功能不符。Receive方法傳回多少個資料,取決于作業系統接收緩沖區中存放的内容。

半包

  • 發送端發送的資料還有可能被拆分,如發送“ HelloWorld",但在接收端調用Receive時,作業系統隻接收到了部分資料,如“ Hel ” ,在等待一小段時間後再次調用Receive才接收到另一部分資料“ loWorld"。這樣一來對方明明發送的是一條消息,但卻被當成了兩條進行處理,肯定也是不能符合規則。

解決粘包問題的方法

一般有三種方法可以解決粘包和半包問題, 分别是長度資訊法、固定長度法和結束符号法。一般的遊戲開發會在每個資料包前面加上長度位元組, 以友善解析, 本文也将詳細介紹這種方法。

長度資訊法

長度資訊法是指在每個資料包前面加上長度資訊。每次接收到資料後, 先讀取表示長度的位元組, 如果緩沖區的資料長度大于要取的位元組數, 則取出相應的位元組, 否則等待下一次資料接收。假如用戶端要發送“HelloWorld”,那麼為了讓服務端判斷是不是接收到了完整的消息,通常在“HelloWorld”前面加上長度即“10HelloWorld”。加入服務端第一次Receive接收到的是“10Hello”,先讀取第一個位元組“10”,這時候服務端知道了完整消息的長度,而顯然此時消息的長度并不能達到要求,是以服務端不進行任何處理,等待下一次接收。這樣就可以保證每次接收到的消息都是完整的。

結束符号法

規定一個結束符号,作為消息間的分隔符假設規定結束符号為"@",那麼發送" Hello" “Unity"兩條資訊可以發送成"Hello@“和“Unity@”接收方每次讀取資料,直到”@ ” 出現為止,并且使用“ @ “ 去分割消息。比如接收方第一次讀到“[email protected]”,那它把結束符前面的Hello提取出來,作為第一條消息去處理,再把“Un"儲存起來。待後續讀到“ ity@". 再把“ Un"和“ ity"拼成第二條消息。

固定長度法

每次都以相同的長度發送資料,假設規定每條資訊的長度都為10個字元,那麼發送"Hello" “Unity"兩條資訊可以發送成“ Hello…” “Unity… “,其中的”.”表示填充字元,是為湊數,沒有實際意義,隻為了每次發送的資料都有固定長度,接收方每次讀取10個字元,作為一條消息去處理。如果讀到的字元數大于10,比如第1次讀到“Hello…Un” , 那它隻要把前10個位元組“Hello "抽取出來,再把後面的兩個位元組”Un"存起來,等到再次接收資料,拼接第二條資訊。

代碼實作

本文會展示在異步用戶端上,實作帶有32位元組長度資訊的協定,來解決粘包問題。用Vs建立控制台應用進行測試長度資訊發解決分包粘包問題。

用戶端代碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace Socket_Package_Test
{
    /// <summary>
    /// 用戶端
    /// </summary>
    class _Client
    {
        Socket socket;
        int BUFFER_SIZE = 1024;//緩沖區的長度
        byte[] readBuff;//接收消息的緩沖區

        int nowBuffLength = 0;//緩沖區現在的位元組長度

        byte[] fontBuff=new byte[sizeof(Int32)];//接收到消息數組的長度的數組
        
        public _Client() {
            readBuff = new byte[BUFFER_SIZE];
            StartClient();

        }
        //開始啟動用戶端
        void StartClient() {
            Console.WriteLine("開始啟動用戶端");
            socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
            IPEndPoint end = new IPEndPoint(ipAdr, 1234);
            socket.Connect(end);

            try
            {

                socket.BeginReceive(readBuff,nowBuffLength,BUFFER_SIZE-nowBuffLength,SocketFlags.None,ReceiveCb,socket);
              

            }
            catch (Exception)
            {

                //throw;
            }
        }
        /// <summary>
        /// 接收消息的回調函數
        /// </summary>
        /// <param name="ar"></param>
        void ReceiveCb(IAsyncResult ar) {
            Socket listen = (Socket)ar.AsyncState;

            try
            {
                //本次接收到資料的位元組長度
                int count = listen.EndReceive(ar);
                //現在緩存區中資料的位元組長度
                nowBuffLength += count;
                //處理接收到的資料
                HandleDate(listen);
                //繼續回調接收消息
                listen.BeginReceive(readBuff, nowBuffLength, BUFFER_SIZE-nowBuffLength, SocketFlags.None, ReceiveCb, listen);
            }
            catch (Exception e)
            {

               // throw;
            }
        }

        /// <summary>
        /// 處理接收到的資料
        /// </summary>
        /// <param name="listen"></param>
        void HandleDate(Socket listen) {
            if (nowBuffLength<sizeof(Int32))//緩沖區中的資料長度小于四個位元組
            {
                return;
            }
            //消息頭長度的數組更新
            Array.Copy(readBuff,fontBuff,sizeof(Int32));
            //消息頭(一段消息的長度)
            int receiveLength = BitConverter.ToInt32(fontBuff,0);
            if (nowBuffLength<sizeof(Int32)+receiveLength)//如果緩沖區小于4位元組+有效消息的長度(消息還沒有接收完)
            {
                //一段話沒有接收完整,等待下次一塊處理
                return;
            }
            //解析出一條消息
            string str = UTF8Encoding.UTF8.GetString(readBuff,sizeof(Int32),receiveLength);
            Console.WriteLine("接收到服務端的消息:"+str);
            Console.WriteLine("用戶端回複:");
            string s = Console.ReadLine();
            if (s != "")
            {
              
                //伺服器發送消息 
                SendDate(listen, s);
            }

            //清除掉已經處理過的資料
            int remainCount = nowBuffLength - sizeof(Int32) - receiveLength;
            //把剩餘沒有處理的本次接收到的資料重新拷貝到緩存區
            Array.Copy(readBuff,receiveLength+sizeof(Int32),readBuff,0,remainCount);
            //緩沖區現在的資料長度
            nowBuffLength = remainCount;
            if (nowBuffLength>0)
            {
                HandleDate(listen);
            }
            
        }

        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="listen"></param>
        /// <param name="str"></param>
        void SendDate(Socket listen,string str) {
            //消息的内容數組(消息體)
            byte[] sendDate = UTF8Encoding.UTF8.GetBytes(str);
            //消息體的位元組長度
            int dateLength = sendDate.Length;
            //要發送消息的長度的數組(消息頭)
            byte[] length = BitConverter.GetBytes(dateLength);
            //消息頭和消息體進行拼接
            byte[] bytes = length.Concat(sendDate).ToArray();
            listen.Send(bytes);
        }
    }
}

           

服務端代碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace Socket_Package_Test
{
    /// <summary>
    /// 服務端
    /// </summary>
    class _Socket
    {
        Socket socket;
        int BUFFER_SIZE = 1024;//緩沖區的長度
        byte[] readBuff ;//接收消息的緩沖區

        int nowBuffLength=0;//緩沖區現在的位元組長度

        byte[] fontBuff=new byte[sizeof(Int32)];//接收到消息數組的長度的數組

        int fontLenth = sizeof(Int32);//消息頭占用的位元組長度

        int receiveDateLength;//接收到消息的長度


        public _Socket() {
            readBuff = new byte[BUFFER_SIZE];
            StartServer();
        }


        int maxListen = 50;
        /// <summary>
        /// 開啟伺服器
        /// </summary>
        void StartServer()
        {
            Console.WriteLine("開始啟動伺服器");
            socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
            IPEndPoint end = new IPEndPoint(ipAdr, 1234);
            socket.Bind(end);
            socket.Listen(maxListen);

            try
            {
                socket.BeginAccept(AcceptCb, null);
            }
            catch (Exception)
            {

               // throw;
            }
        }


        /// <summary>
        /// BeginAccept的回調
        /// </summary>
        /// <param name="ar"></param>
        void AcceptCb(IAsyncResult ar)
        {
           Socket listen = socket.EndAccept(ar);//直到有人連接配接伺服器  傳回連接配接者的Socket
            Console.WriteLine(listen.RemoteEndPoint+"進入房間");
            
                SendDate(listen,"歡迎您進入房間");
            

            //開始接收消息
            listen.BeginReceive(readBuff,nowBuffLength,BUFFER_SIZE-nowBuffLength,SocketFlags.None,ReceiveCb,listen);
            socket.BeginAccept(AcceptCb, null);
        }


        /// <summary>
        /// 接收消息的回調
        /// </summary>
        /// <param name="ar"></param>
        void ReceiveCb(IAsyncResult ar) {
            Socket listen = ar.AsyncState as Socket;
            try
            {
                //本次接收到的資料長度
                int count = listen.EndReceive(ar);
                //現在的緩沖區長度增加新接收的長度
                nowBuffLength += count;
                //處理接收的資料
                HandleDate(listen);
                //循環接收消息
                listen.BeginReceive(readBuff, nowBuffLength, BUFFER_SIZE-nowBuffLength, SocketFlags.None, ReceiveCb, listen);
            }
            catch (Exception)
            {

               // throw;
            }
            
        }


        /// <summary>
        /// 處理伺服器接收的消息
        /// </summary>
        void HandleDate(Socket listen) {
            if (nowBuffLength<=fontLenth)//緩沖區中的資料長度小于四個位元組
            {
                return;
            }
            //消息頭長度的數組更新
            Array.Copy(readBuff,fontBuff,fontLenth);
            //消息頭(一段消息的長度)
            receiveDateLength = BitConverter.ToInt32(fontBuff,0);

            if (nowBuffLength<sizeof(Int32)+receiveDateLength)//如果緩沖區小于4位元組+有效消息的長度(消息還沒有接收完)
            {
                return;
            }

            //解析出一條消息
            string str = UTF8Encoding.UTF8.GetString(readBuff,sizeof(Int32),receiveDateLength);
            Console.WriteLine("收到用戶端消息:"+str);
            Console.WriteLine("服務端回複:");
            string s = Console.ReadLine();
            if (s!="")
            {
                //伺服器發送消息 
                SendDate(listen, s);
            }
            
            int remainCount = nowBuffLength-sizeof(Int32)-receiveDateLength;
            //把剩餘沒有處理的本次接收到的資料重新拷貝到緩存區
            Array.Copy(readBuff,sizeof(Int32)+ receiveDateLength, readBuff,0,remainCount);
            //緩沖區現在的資料長度
            nowBuffLength = remainCount;
            if (nowBuffLength>0)//如果緩沖區還有資料
            {
                HandleDate(listen);
            }
        }


        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="listen"></param>
        /// <param name="str"></param>
        void SendDate(Socket listen, string str) {
            
            //消息的内容數組(消息體)
            byte[] Date = UTF8Encoding.UTF8.GetBytes(str);
            //消息體的位元組長度
            int sendLength = Date.Length;
            //要發送消息的長度的數組(消息頭)
            byte[] sendDateLength = BitConverter.GetBytes(sendLength);
            //消息頭和消息體進行拼接
            byte[] sendBytes = sendDateLength.Concat(Date).ToArray();
            //發送給用戶端
            listen.Send(sendBytes);
        }
    }
}

           

上面的處理方式基本上解決了分包粘包的問題,但是還是存在一些問題,例如:大端小端問題、線程沖突問題。這些問題本片文章先不進行處理。除了這些問題之外還存在一些其他的不足之處,例如:在Copy操作的時候,每次成功接收一條完整的資料後,程式會調用Array.Copy,将緩沖區的資料往前移動。但Array.Copy是個時間複雜度為o(n)的操作,假如緩沖區中的資料很多,那移動全部資料将會花費較長的時間。一個可行的辦法是,使用ByteArray結構作為緩沖區,使用readldx指向的資料作為緩沖區的第一個資料,當接收完資料後,隻移動readldx,時間複雜度為o(l),當然,肯定還有一些其它的問題,這裡就不一一列舉了。

繼續閱讀