解決分包粘包
系統緩沖區
要想知道為什麼在Tcp通訊中會存在分包粘包的現象,首先你必須先了解Tcp網絡通訊的消息傳播機制,而系統緩沖區将是不得不講的一個話題,那麼什麼是系統緩沖區呢?其實就是接到對端資訊資料的時候,作業系統會将資料存入到Socket的接收緩沖區中,而在這一段時間,系統緩沖區完全是由作業系統進行操作,程式并不能直接操作它們,隻能通過Socket.Receive();Socket.Send等方法來間接進行操作。其中,Socket.Receive()方法隻是把接收緩沖區的資料提取出來, 比如調用Receive(readBuff,0,2) , 接收2個位元組的資料到了使用者緩沖區readbuff,當系統的接收緩沖區為空, Receive方法會被阻塞, 直到裡面有資料。同樣地, Socket的Send方法隻是把資料寫入到發送緩沖區裡, 具體的發送過程由作業系統負責。當作業系統的發送緩沖區滿了, Send方法将會阻塞。
粘包半包現象
粘包
- 如果發送端快速發送多條資料,接收端沒有及時調用Receive,那麼資料便會在接收端的緩沖區中累積。用戶端先發送“ 1、2、3、4"四個位元組的資料,緊接看又發送“ 5、6、7、8"四個位元組的資料。等到服務端調用Receive時,服務端作業系統巳經将接收到的資料全部寫入緩沖區,共接收到8個資料。這樣一來,明明對方發送的是兩條消息,但卻當成了一條資料進行處理,明顯與功能不符。Receive方法傳回多少個資料,取決于作業系統接收緩沖區中存放的内容。
半包
- 發送端發送的資料還有可能被拆分,如發送“ HelloWorld",但在接收端調用Receive時,作業系統隻接收到了部分資料,如“ Hel ” ,在等待一小段時間後再次調用Receive才接收到另一部分資料“ loWorld"。這樣一來對方明明發送的是一條消息,但卻被當成了兩條進行處理,肯定也是不能符合規則。
解決粘包問題的方法
一般有三種方法可以解決粘包和半包問題, 分别是長度資訊法、固定長度法和結束符号法。一般的遊戲開發會在每個資料包前面加上長度位元組, 以友善解析, 本文也将詳細介紹這種方法。
長度資訊法
長度資訊法是指在每個資料包前面加上長度資訊。每次接收到資料後, 先讀取表示長度的位元組, 如果緩沖區的資料長度大于要取的位元組數, 則取出相應的位元組, 否則等待下一次資料接收。假如用戶端要發送“HelloWorld”,那麼為了讓服務端判斷是不是接收到了完整的消息,通常在“HelloWorld”前面加上長度即“10HelloWorld”。加入服務端第一次Receive接收到的是“10Hello”,先讀取第一個位元組“10”,這時候服務端知道了完整消息的長度,而顯然此時消息的長度并不能達到要求,是以服務端不進行任何處理,等待下一次接收。這樣就可以保證每次接收到的消息都是完整的。
結束符号法
規定一個結束符号,作為消息間的分隔符假設規定結束符号為"@",那麼發送" Hello" “Unity"兩條資訊可以發送成"Hello@“和“Unity@”接收方每次讀取資料,直到”@ ” 出現為止,并且使用“ @ “ 去分割消息。比如接收方第一次讀到“[email protected]”,那它把結束符前面的Hello提取出來,作為第一條消息去處理,再把“Un"儲存起來。待後續讀到“ ity@". 再把“ Un"和“ ity"拼成第二條消息。
固定長度法
每次都以相同的長度發送資料,假設規定每條資訊的長度都為10個字元,那麼發送"Hello" “Unity"兩條資訊可以發送成“ Hello…” “Unity… “,其中的”.”表示填充字元,是為湊數,沒有實際意義,隻為了每次發送的資料都有固定長度,接收方每次讀取10個字元,作為一條消息去處理。如果讀到的字元數大于10,比如第1次讀到“Hello…Un” , 那它隻要把前10個位元組“Hello "抽取出來,再把後面的兩個位元組”Un"存起來,等到再次接收資料,拼接第二條資訊。
代碼實作
本文會展示在異步用戶端上,實作帶有32位元組長度資訊的協定,來解決粘包問題。用Vs建立控制台應用進行測試長度資訊發解決分包粘包問題。
用戶端代碼:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace Socket_Package_Test
{
/// <summary>
/// 用戶端
/// </summary>
class _Client
{
Socket socket;
int BUFFER_SIZE = 1024;//緩沖區的長度
byte[] readBuff;//接收消息的緩沖區
int nowBuffLength = 0;//緩沖區現在的位元組長度
byte[] fontBuff=new byte[sizeof(Int32)];//接收到消息數組的長度的數組
public _Client() {
readBuff = new byte[BUFFER_SIZE];
StartClient();
}
//開始啟動用戶端
void StartClient() {
Console.WriteLine("開始啟動用戶端");
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
IPEndPoint end = new IPEndPoint(ipAdr, 1234);
socket.Connect(end);
try
{
socket.BeginReceive(readBuff,nowBuffLength,BUFFER_SIZE-nowBuffLength,SocketFlags.None,ReceiveCb,socket);
}
catch (Exception)
{
//throw;
}
}
/// <summary>
/// 接收消息的回調函數
/// </summary>
/// <param name="ar"></param>
void ReceiveCb(IAsyncResult ar) {
Socket listen = (Socket)ar.AsyncState;
try
{
//本次接收到資料的位元組長度
int count = listen.EndReceive(ar);
//現在緩存區中資料的位元組長度
nowBuffLength += count;
//處理接收到的資料
HandleDate(listen);
//繼續回調接收消息
listen.BeginReceive(readBuff, nowBuffLength, BUFFER_SIZE-nowBuffLength, SocketFlags.None, ReceiveCb, listen);
}
catch (Exception e)
{
// throw;
}
}
/// <summary>
/// 處理接收到的資料
/// </summary>
/// <param name="listen"></param>
void HandleDate(Socket listen) {
if (nowBuffLength<sizeof(Int32))//緩沖區中的資料長度小于四個位元組
{
return;
}
//消息頭長度的數組更新
Array.Copy(readBuff,fontBuff,sizeof(Int32));
//消息頭(一段消息的長度)
int receiveLength = BitConverter.ToInt32(fontBuff,0);
if (nowBuffLength<sizeof(Int32)+receiveLength)//如果緩沖區小于4位元組+有效消息的長度(消息還沒有接收完)
{
//一段話沒有接收完整,等待下次一塊處理
return;
}
//解析出一條消息
string str = UTF8Encoding.UTF8.GetString(readBuff,sizeof(Int32),receiveLength);
Console.WriteLine("接收到服務端的消息:"+str);
Console.WriteLine("用戶端回複:");
string s = Console.ReadLine();
if (s != "")
{
//伺服器發送消息
SendDate(listen, s);
}
//清除掉已經處理過的資料
int remainCount = nowBuffLength - sizeof(Int32) - receiveLength;
//把剩餘沒有處理的本次接收到的資料重新拷貝到緩存區
Array.Copy(readBuff,receiveLength+sizeof(Int32),readBuff,0,remainCount);
//緩沖區現在的資料長度
nowBuffLength = remainCount;
if (nowBuffLength>0)
{
HandleDate(listen);
}
}
/// <summary>
/// 發送消息
/// </summary>
/// <param name="listen"></param>
/// <param name="str"></param>
void SendDate(Socket listen,string str) {
//消息的内容數組(消息體)
byte[] sendDate = UTF8Encoding.UTF8.GetBytes(str);
//消息體的位元組長度
int dateLength = sendDate.Length;
//要發送消息的長度的數組(消息頭)
byte[] length = BitConverter.GetBytes(dateLength);
//消息頭和消息體進行拼接
byte[] bytes = length.Concat(sendDate).ToArray();
listen.Send(bytes);
}
}
}
服務端代碼:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace Socket_Package_Test
{
/// <summary>
/// 服務端
/// </summary>
class _Socket
{
Socket socket;
int BUFFER_SIZE = 1024;//緩沖區的長度
byte[] readBuff ;//接收消息的緩沖區
int nowBuffLength=0;//緩沖區現在的位元組長度
byte[] fontBuff=new byte[sizeof(Int32)];//接收到消息數組的長度的數組
int fontLenth = sizeof(Int32);//消息頭占用的位元組長度
int receiveDateLength;//接收到消息的長度
public _Socket() {
readBuff = new byte[BUFFER_SIZE];
StartServer();
}
int maxListen = 50;
/// <summary>
/// 開啟伺服器
/// </summary>
void StartServer()
{
Console.WriteLine("開始啟動伺服器");
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
IPEndPoint end = new IPEndPoint(ipAdr, 1234);
socket.Bind(end);
socket.Listen(maxListen);
try
{
socket.BeginAccept(AcceptCb, null);
}
catch (Exception)
{
// throw;
}
}
/// <summary>
/// BeginAccept的回調
/// </summary>
/// <param name="ar"></param>
void AcceptCb(IAsyncResult ar)
{
Socket listen = socket.EndAccept(ar);//直到有人連接配接伺服器 傳回連接配接者的Socket
Console.WriteLine(listen.RemoteEndPoint+"進入房間");
SendDate(listen,"歡迎您進入房間");
//開始接收消息
listen.BeginReceive(readBuff,nowBuffLength,BUFFER_SIZE-nowBuffLength,SocketFlags.None,ReceiveCb,listen);
socket.BeginAccept(AcceptCb, null);
}
/// <summary>
/// 接收消息的回調
/// </summary>
/// <param name="ar"></param>
void ReceiveCb(IAsyncResult ar) {
Socket listen = ar.AsyncState as Socket;
try
{
//本次接收到的資料長度
int count = listen.EndReceive(ar);
//現在的緩沖區長度增加新接收的長度
nowBuffLength += count;
//處理接收的資料
HandleDate(listen);
//循環接收消息
listen.BeginReceive(readBuff, nowBuffLength, BUFFER_SIZE-nowBuffLength, SocketFlags.None, ReceiveCb, listen);
}
catch (Exception)
{
// throw;
}
}
/// <summary>
/// 處理伺服器接收的消息
/// </summary>
void HandleDate(Socket listen) {
if (nowBuffLength<=fontLenth)//緩沖區中的資料長度小于四個位元組
{
return;
}
//消息頭長度的數組更新
Array.Copy(readBuff,fontBuff,fontLenth);
//消息頭(一段消息的長度)
receiveDateLength = BitConverter.ToInt32(fontBuff,0);
if (nowBuffLength<sizeof(Int32)+receiveDateLength)//如果緩沖區小于4位元組+有效消息的長度(消息還沒有接收完)
{
return;
}
//解析出一條消息
string str = UTF8Encoding.UTF8.GetString(readBuff,sizeof(Int32),receiveDateLength);
Console.WriteLine("收到用戶端消息:"+str);
Console.WriteLine("服務端回複:");
string s = Console.ReadLine();
if (s!="")
{
//伺服器發送消息
SendDate(listen, s);
}
int remainCount = nowBuffLength-sizeof(Int32)-receiveDateLength;
//把剩餘沒有處理的本次接收到的資料重新拷貝到緩存區
Array.Copy(readBuff,sizeof(Int32)+ receiveDateLength, readBuff,0,remainCount);
//緩沖區現在的資料長度
nowBuffLength = remainCount;
if (nowBuffLength>0)//如果緩沖區還有資料
{
HandleDate(listen);
}
}
/// <summary>
/// 發送消息
/// </summary>
/// <param name="listen"></param>
/// <param name="str"></param>
void SendDate(Socket listen, string str) {
//消息的内容數組(消息體)
byte[] Date = UTF8Encoding.UTF8.GetBytes(str);
//消息體的位元組長度
int sendLength = Date.Length;
//要發送消息的長度的數組(消息頭)
byte[] sendDateLength = BitConverter.GetBytes(sendLength);
//消息頭和消息體進行拼接
byte[] sendBytes = sendDateLength.Concat(Date).ToArray();
//發送給用戶端
listen.Send(sendBytes);
}
}
}
上面的處理方式基本上解決了分包粘包的問題,但是還是存在一些問題,例如:大端小端問題、線程沖突問題。這些問題本片文章先不進行處理。除了這些問題之外還存在一些其他的不足之處,例如:在Copy操作的時候,每次成功接收一條完整的資料後,程式會調用Array.Copy,将緩沖區的資料往前移動。但Array.Copy是個時間複雜度為o(n)的操作,假如緩沖區中的資料很多,那移動全部資料将會花費較長的時間。一個可行的辦法是,使用ByteArray結構作為緩沖區,使用readldx指向的資料作為緩沖區的第一個資料,當接收完資料後,隻移動readldx,時間複雜度為o(l),當然,肯定還有一些其它的問題,這裡就不一一列舉了。