天天看點

TCP 粘包問題淺析及其解決方案

最近一直在做中間件相關的東西,是以接觸到的各種協定比較多,總的來說有TCP,UDP,HTTP等各種網絡傳輸協定,是以樓主想先從協定最基本的TCP粘包問題搞起,把計算機網絡這部分基礎夯實一下。

貼個廣告

樓主的部落格已全部搬遷至自己的部落格,感興趣的小夥伴請移步haifeiWu與他朋友們的部落格專欄

TCP協定的簡單介紹

TCP是面向連接配接的運輸層協定

簡單來說,在使用TCP協定之前,必須先建立TCP連接配接,就是我們常說的三次握手。在資料傳輸完畢之後,必須是釋放已經建立的TCP連接配接,否則會發生不可預知的問題,造成服務的不可用狀态。

每一條TCP連接配接都是可靠連接配接,且隻有兩個端點

TCP連接配接是從Server端到Client端的點對點的,通過TCP傳輸資料,無差錯,不重複不丢失。

TCP協定的通信是全雙工的

TCP協定允許通信雙方的應用程式在任何時候都能發送資料。TCP 連接配接的兩端都設有發送緩沖區和接收緩沖區,用來臨時存放雙向通信的資料。發送資料時,應用程式把資料傳送給TCP的緩沖後,就可以做自己的事情,而TCP在合适的時候将資料發送出去。在接收的時候,TCP把收到的資料放入接收緩沖區,上層應用在合适的時候讀取資料。

TCP協定是面向位元組流的

TCP中的流是指流入程序或者從程序中流出的位元組序列。是以向Java,golang等進階語言在進行TCP通信是都需要将相應的實體序列化才能進行傳輸。還有就是在我們使用Redis做緩存的時候,都需要将放入Redis的資料序列化才可以,原因就是Redis底層就是實作的TCP協定。

TCP并不知道所傳輸的位元組流的含義,TCP并不能保證接收方應用程式和發送方應用程式所發出的資料塊具有對應大小的關系(這就是TCP傳輸過程中産生的粘包問題)。但是應用程式接收方最終受到的位元組流與發送方發送的位元組流是一定相同的。是以,我們在使用TCP協定的時候應該制定合理的粘包拆包政策。

下圖是TCP的協定傳輸的整個過程:

TCP 粘包問題淺析及其解決方案

下面這個圖是從老錢的部落格裡面取到的,非常生動

TCP粘包問題複現

理論推敲

如下圖所示,出現的粘包問題一共有三種情況

TCP 粘包問題淺析及其解決方案

第一種情況:

如上圖中的第一根bar所示,服務端一共讀到兩個資料包,每個資料包都是完成的,并沒有發生粘包的問題,這種情況比較好處理,伺服器隻需要簡單的從網絡緩沖區去讀就好了,每次服務端讀取到的消息都是完成的,并不會出現資料不正确的情況。

第二種情況:

服務端僅收到一個資料包,這個資料包包含用戶端發出的兩條消息的完整資訊,這個時候基于第一種情況的邏輯實作的服務端就蒙了,因為服務端并不能很好的處理這個資料包,甚至不能處理,這種情況其實就是TCP的粘包問題。

第三種情況:

服務端收到了兩個資料包,第一個資料包隻包含了第一條消息的一部分,第一條消息的後半部分和第二條消息都在第二個資料包中,或者是第一個資料包包含了第一條消息的完整資訊和第二條消息的一部分資訊,第二個資料包包含了第二條消息的剩下部分,這種情況其實是發送了TCP拆包問題,因為發生了一條消息被拆分在兩個包裡面發送了,同樣上面的伺服器邏輯對于這種情況是不好處理的。

為什麼會發生TCP粘包、拆包

  1. 應用程式寫入的資料大于套接字緩沖區大小,這将會發生拆包。
  2. 應用程式寫入資料小于套接字緩沖區大小,網卡将應用多次寫入的資料發送到網絡上,這将會發生粘包。
  3. 進行MSS(最大封包長度)大小的TCP分段,當TCP封包長度-TCP頭部長度>MSS的時候将發生拆包。
  4. 接收方法不及時讀取套接字緩沖區資料,這将發生粘包。

如何處理粘包、拆包

通常會有以下一些常用的方法:

  1. 使用帶消息頭的協定、消息頭存儲消息開始辨別及消息長度資訊,服務端擷取消息頭的時候解析出消息長度,然後向後讀取該長度的内容。
  2. 設定定長消息,服務端每次讀取既定長度的内容作為一條完整消息,當消息不夠長時,空位補上固定字元。
  3. 設定消息邊界,服務端從網絡流中按消息編輯分離出消息内容,一般使用‘\n’。
  4. 更為複雜的協定,例如樓主最近接觸比較多的車聯網協定808,809協定。

TCP粘包拆包的代碼實踐

下面代碼樓主主要示範了使用規定消息頭,消息體的方式來解決TCP的粘包,拆包問題。

server端代碼: server端代碼的主要邏輯是接收用戶端發送過來的消息,重新組裝出消息,并列印出來。

import java.io.*;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author wuhf
 * @Date 2018/7/16 15:50
 **/
public class TestSocketServer {
    public static void main(String args[]) {
        ServerSocket serverSocket;
        try {
            serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(8089));
            while (true) {
                Socket socket = serverSocket.accept();
                new ReceiveThread(socket).start();

            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    static class ReceiveThread extends Thread {
        public static final int PACKET_HEAD_LENGTH = 2;//標頭長度
        private Socket socket;
        private volatile byte[] bytes = new byte[0];

        public ReceiveThread(Socket socket) {
            this.socket = socket;
        }

        public byte[] mergebyte(byte[] a, byte[] b, int begin, int end) {
            byte[] add = new byte[a.length + end - begin];
            int i = 0;
            for (i = 0; i < a.length; i++) {
                add[i] = a[i];
            }
            for (int k = begin; k < end; k++, i++) {
                add[i] = b[k];
            }
            return add;
        }

        @Override
        public void run() {
            int count = 0;
            while (true) {
                try {
                    InputStream reader = socket.getInputStream();
                    if (bytes.length < PACKET_HEAD_LENGTH) {
                        byte[] head = new byte[PACKET_HEAD_LENGTH - bytes.length];
                        int couter = reader.read(head);
                        if (couter < 0) {
                            continue;
                        }
                        bytes = mergebyte(bytes, head, 0, couter);
                        if (couter < PACKET_HEAD_LENGTH) {
                            continue;
                        }
                    }
                    // 下面這個值請注意,一定要取2長度的位元組子數組作為封包長度,你懂得
                    byte[] temp = new byte[0];
                    temp = mergebyte(temp, bytes, 0, PACKET_HEAD_LENGTH);
                    String templength = new String(temp);
                    int bodylength = Integer.parseInt(templength);//包體長度
                    if (bytes.length - PACKET_HEAD_LENGTH < bodylength) {//不夠一個包
                        byte[] body = new byte[bodylength + PACKET_HEAD_LENGTH - bytes.length];//剩下應該讀的位元組(湊一個包)
                        int couter = reader.read(body);
                        if (couter < 0) {
                            continue;
                        }
                        bytes = mergebyte(bytes, body, 0, couter);
                        if (couter < body.length) {
                            continue;
                        }
                    }
                    byte[] body = new byte[0];
                    body = mergebyte(body, bytes, PACKET_HEAD_LENGTH, bytes.length);
                    count++;
                    System.out.println("server receive body:  " + count + new String(body));
                    bytes = new byte[0];
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

           

client端代碼:用戶端代碼主要邏輯是組裝要發送的消息,确定消息頭,消息體,然後發送到服務端。

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * @author wuhf
 * @Date 2018/7/16 15:45
 **/
public class TestSocketClient {
    public static void main(String args[]) throws IOException {
        Socket clientSocket = new Socket();
        clientSocket.connect(new InetSocketAddress(8089));
        new SendThread(clientSocket).start();

    }

    static class SendThread extends Thread {
        Socket socket;
        PrintWriter printWriter = null;

        public SendThread(Socket socket) {
            this.socket = socket;
            try {
                printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            String reqMessage = "HelloWorld! from clientsocket this is test half packages!";
            for (int i = 0; i < 100; i++) {
                sendPacket(reqMessage);
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

        }

        public void sendPacket(String message) {
            try {
                OutputStream writer = socket.getOutputStream();
                writer.write(message.getBytes());
                writer.flush();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}


           

小結

最近一直在寫一些架構性的部落格,專門針對某些問題進行原理性的技術探讨的部落格還比較少,是以樓主想着怎樣能在自己學到東西的同時也可以給一同在技術這條野路子上奮鬥的小夥伴們一些啟發,是樓主一直努力的方向。

參考文章

  • Netty精粹之TCP粘包拆包問題
  • 計算機網絡-謝希仁-第七版

作者:

haifeiWu

出處:

http://www.hchstudio.cn/

關于作者:專注大後端,分布式,高并發等領域,請多多賜教!

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出,

原文連結