天天看點

thrift之TTransport層的堵塞的套接字I/O傳輸類TSocket

本節将介紹第一個實作具體傳輸功能的類TSocket,這個類是基于TCP socket實作TTransport的接口。下面具體介紹這個類的相關函數功能實作。

1.構造函數

分析一個類的功能首先看它的定義和構造函數實作,先看看它的定義:

class TSocket : public TVirtualTransport<TSocket> { ......}      

  由定義可以看書TSocket繼承至虛拟傳輸類,并且把自己當做模闆參數傳遞過去,是以從虛拟傳輸類繼承下來的虛拟函數(如read_virt)調用非虛拟函數(如read)就是TSocket自己實作的。

TSocket類的構造函數有4個,當然還有一個析構函數。四個構造函數就是根據不同的參數來構造,它們的聲明如下:

  TSocket();//所有參數都預設       TSocket(std::string host, int port);//根據主機名和端口構造一個socket       TSocket(std::string path);//構造unix域的一個socket       TSocket(int socket);//構造一個原始的unix句柄socket      

  四個構造函數分别用于不同的情況下來産生不同的TSocket對象,不過這些構造函數都隻是簡單的初始化一些最基本的成員變量,而沒有真正的連接配接socket。它們初始化的變量基本如下:

  TSocket::TSocket() :         host_(""),         port_(0),         path_(""),         socket_(-1),         connTimeout_(0),         sendTimeout_(0),         recvTimeout_(0),         lingerOn_(1),         lingerVal_(0),         noDelay_(1),         maxRecvRetries_(5) {         recvTimeval_.tv_sec = (int)(recvTimeout_/1000);         recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);         cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;       }      

  大部分簡單的參數都采用初始化清單初始化了,需要簡單計算的就放在函數體内初始化,其他幾個都是這種情況。下面需要單獨介紹一下的是unix domain socket。

socket API原本是為網絡通訊設計的,但後來在socket的架構上發展出一種IPC機制,就是UNIX Domain Socket。雖然網絡socket也可用于同一台主機的程序間通訊(通過loopback位址127.0.0.1),但是UNIX Domain Socket用于IPC更有效率:不需要經過網絡協定棧,不需要打包拆包、計算校驗和、維護序号和應答等,隻是将應用層資料從一個程序拷貝到另一個程序。這是因為,IPC機制本質上是可靠的通訊,而網絡協定是為不可靠的通訊設計的。UNIX Domain Socket也提供面向流和面向資料包兩種API接口,類似于TCP和UDP,但是面向消息的UNIX Domain Socket也是可靠的,消息既不會丢失也不會順序錯亂。

UNIX Domain Socket是全雙工的,API接口語義豐富,相比其它IPC機制有明顯的優越性,目前已成為使用最廣泛的IPC機制,比如X Window伺服器和GUI程式之間就是通過UNIX Domain Socket通訊的。

使用UNIX Domain Socket的過程和網絡socket十分相似,也要先調用socket()建立一個socket檔案描述符,address family指定為AF_UNIX,type可以選擇SOCK_DGRAM或SOCK_STREAM,protocol參數仍然指定為0即可。

UNIX Domain Socket與網絡socket程式設計最明顯的不同在于位址格式不同,用結構體sockaddr_un表示,網絡程式設計的socket位址是IP位址加端口号,而UNIX Domain Socket的位址是一個socket類型的檔案在檔案系統中的路徑,這個socket檔案由bind()調用建立,如果調用bind()時該檔案已存在,則bind()錯誤傳回。

打開連接配接函數open

首先看這個函數的代碼實作,如下:

  void TSocket::open() {         if (isOpen()) {//如果已經打開就直接傳回           return;         }         if (! path_.empty()) {//如果unix路徑不為空就打開unix domian socket           unix_open();         } else {           local_open();//打開通用socket         }       }      

  Open函數又根據路徑為不為空(不為空就是unix domain socket)調用相應的函數來繼續打開連接配接,首先看看打開unix domain socket,代碼如下:

  void TSocket::unix_open(){         if (! path_.empty()) {//保證path_不為空           // Unix Domain SOcket does not need addrinfo struct, so we pass NULL           openConnection(NULL);//調用真正的打開連接配接函數         }       }      

  由代碼可以看出,真正實作打開連接配接的函數是openConnection,這個函數根據傳遞的參數來決定是否是打開unix domain socket,實作代碼如下(這個函數代碼比較多,其中除了錯誤部分代碼省略):

  void TSocket::openConnection(struct addrinfo *res) {         if (isOpen()) {           return;//如果已經打開了直接傳回         }         if (! path_.empty()) {//根據路徑是否為空建立不同的socket           socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);//建立unix domain socket         } else {           socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);//建立通用的網絡通信socket         }         if (sendTimeout_ > 0) {//如果發生逾時設定大于0就調用設定發送逾時函數設定發送逾時           setSendTimeout(sendTimeout_);         }         if (recvTimeout_ > 0) {//如果接收逾時設定大于0就調用設定接收逾時函數設定接收逾時           setRecvTimeout(recvTimeout_);         }         setLinger(lingerOn_, lingerVal_);//設定優雅斷開連接配接或關閉連接配接參數         setNoDelay(noDelay_);//設定無延時       #ifdef TCP_LOW_MIN_RTO         if (getUseLowMinRto()) {//設定是否使用較低的最低TCP重傳逾時            int one = 1;           setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));         }       #endif         //如果逾時已經存在設定連接配接為非阻塞         int flags = fcntl(socket_, F_GETFL, 0);//得到socket_的辨別         if (connTimeout_ > 0) {//逾時已經存在           if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {//設定為非阻塞           }         } else {           if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {//設定為阻塞           }         }         // 連接配接socket         int ret;         if (! path_.empty()) {//unix domain socket       #ifndef _WIN32 //window不支援           struct sockaddr_un address;           socklen_t len;           if (path_.length() > sizeof(address.sun_path)) {//path_長度不能超過最長限制           }           address.sun_family = AF_UNIX;           snprintf(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());           len = sizeof(address);           ret = connect(socket_, (struct sockaddr *) &address, len);//連接配接unix domain socket       #else             //window不支援unix domain socket       #endif                } else {           ret = connect(socket_, res->ai_addr, res->ai_addrlen);//連接配接通用的非unix domain socket         }         if (ret == 0) {//失敗了就會執行後面的代碼,用poll來監聽寫事件           goto done;//成功了就直接跳轉到完成處         }         struct pollfd fds[1];//定于用于poll的描述符         std::memset(fds, 0 , sizeof(fds));//初始化為0         fds[0].fd = socket_;//描述符為socket         fds[0].events = POLLOUT;//接收寫事件         ret = poll(fds, 1, connTimeout_);//調用poll,有一個逾時值         if (ret > 0) {           // 確定socket已經被連接配接并且沒有錯誤被設定           int val;           socklen_t lon;           lon = sizeof(int);           int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);//得到錯誤選項參數           if (val == 0) {// socket沒有錯誤也直接到完成處了             goto done;           }         } else if (ret == 0) {// socket 逾時           //相應處理代碼省略         } else {           // poll()出錯了,相應處理代碼省略         }               done:         fcntl(socket_, F_SETFL, flags);//設定socket到原來的模式了(阻塞)         if (path_.empty()) {//如果是unix domain socket就設定緩存位址           setCachedAddress(res->ai_addr, res->ai_addrlen);         }       }      

  上面這個函數代碼确實比較長,不過還好都是比較簡單的代碼實作,沒有什麼很繞的代碼,整個流程也很清晰,在代碼中也有比較詳細的注釋了。下面繼續看通用socket打開函數local_open(它也真正的執行打開功能也是調用上面剛才介紹的那個函數,隻是傳遞了具體的位址資訊):

  void TSocket::local_open(){       #ifdef _WIN32           TWinsockSingleton::create();//相容window平台       #endif // _WIN32         if (isOpen()) {//打開了就直接傳回           return;         }         if (port_ < 0 || port_ > 0xFFFF) {//驗證端口是否為有效值           throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid");         }         struct addrinfo hints, *res, *res0;         res = NULL;         res0 = NULL;         int error;         char port[sizeof("65535")];         std::memset(&hints, 0, sizeof(hints));//記憶體設定為0         hints.ai_family = PF_UNSPEC;         hints.ai_socktype = SOCK_STREAM;         hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;         sprintf(port, "%d", port_);         error = getaddrinfo(host_.c_str(), port, &hints, &res0);//根據主機名得到所有網卡位址資訊         // 循環周遊所有的網卡位址資訊,直到有一個成功打開         for (res = res0; res; res = res->ai_next) {           try {             openConnection(res);//調用打開函數             break;//成功就退出循環           } catch (TTransportException& ttx) {             if (res->ai_next) {//異常處理,是否還有下一個位址,有就繼續               close();             } else {               close();               freeaddrinfo(res0); // 清除位址資訊記憶體和資源               throw;//抛出異常             }           }         }         freeaddrinfo(res0);//釋放位址結構記憶體       }      

  整個local_open函數就是根據主機名得到所有的網卡資訊,然後依次嘗試打開,直到打開一個為止就退出循環,如果所有都不成功就抛出一個異常資訊。

讀函數read

在實作讀函數的時候需要注意區分傳回錯誤為EAGAIN的情況,因為當逾時和系統資源耗盡都會産生這個錯誤(沒有明顯的特征可以區分它們),是以Thrift在實作的時候設定一個最大的嘗試次數,如果超過這個了這個次數就認為是系統資源耗盡了。下面具體看看read函數的實作,代碼如下(省略一些參數檢查和錯誤處理的代碼):

  uint32_t TSocket::read(uint8_t* buf, uint32_t len) {         int32_t retries = 0;//重試的次數         uint32_t eagainThresholdMicros = 0;         if (recvTimeout_) {//如果設定了接收逾時時間,那麼計算最大時間間隔來判斷是否系統資源耗盡           eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2);         }        try_again:         struct timeval begin;         if (recvTimeout_ > 0) {           gettimeofday(&begin, NULL);//得到開始時間         } else {           begin.tv_sec = begin.tv_usec = 0;//預設為0,不需要時間來判斷是逾時了         }         int got = recv(socket_, cast_sockopt(buf), len, 0);//從socket接收資料         int errno_copy = errno; //儲存錯誤代碼         ++g_socket_syscalls;//系統調用次數統計加1         if (got < 0) {//如果讀取錯誤           if (errno_copy == EAGAIN) {//是否為EAGAIN             if (recvTimeout_ == 0) {//如果沒有設定逾時時間,那麼就是資源耗盡錯誤了!抛出異常               throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (unavailable resources)");             }             struct timeval end;             gettimeofday(&end, NULL);//得到結束時間,會改變errno,是以前面需要儲存就是這個原因             uint32_t readElapsedMicros =  (((end.tv_sec - begin.tv_sec) * 1000 * 1000)//計算消耗的時間                                            + (((uint64_t)(end.tv_usec - begin.tv_usec))));             if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {               if (retries++ < maxRecvRetries_) {//重試次數還小于最大重試次數                 usleep(50);//睡眠50毫秒                 goto try_again;//再次嘗試從socket讀取資料               } else {//否則就認為是資源不足了                 throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (unavailable resources)");               }             } else {//推測為逾時了               throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (timed out)");             }           }           if (errno_copy == EINTR && retries++ < maxRecvRetries_) {//如果是中斷并且重試次數沒有超過             goto try_again;//那麼重試           }           #if defined __FreeBSD__ || defined __MACH__           if (errno_copy == ECONNRESET) {//FreeBSD和MACH特殊處理錯誤代碼             return 0;           }           #endif       #ifdef _WIN32           if(errno_copy == WSAECONNRESET) {//win32平台處理錯誤代碼             return 0; // EOF           }       #endif         return got;       }      

  整個讀函數其實沒有什麼特别的,主要的任務就是錯誤情況的處理,從這裡可以看出其實實作一個功能是很容易的,但是要做到穩定和容錯性确實需要發很大功夫。

寫函數write

寫函數和讀函數實作差不多,主要的代碼還是在處理錯誤上面,還有一點不同的是寫函數寫的内容可能一次沒有發送完畢,是以是在一個while循環中一直發送直到指定的内容全部發送完畢。代碼實作如下:

  void TSocket::write(const uint8_t* buf, uint32_t len) {         uint32_t sent = 0;//記錄已經發送了的位元組數         while (sent < len) {//是否已經發送了指定的位元組長度           uint32_t b = write_partial(buf + sent, len - sent);//調部分寫入函數           if (b == 0) {//發送逾時過期了             throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired");           }           sent += b;//已經發送的位元組數         }       }       上面的函數還沒有這種的調用send函數發送寫入的内容,而是調用部分寫入函數write_partial寫入,這個函數實作如下:       uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {         uint32_t sent = 0;         int flags = 0;       #ifdef MSG_NOSIGNAL          //使用這個代替SIGPIPE 錯誤,代替我們檢查傳回EPIPE錯誤條件和關閉socket的情況         flags |= MSG_NOSIGNAL;//設定這個标志位       #endif          int b = send(socket_, const_cast_sockopt(buf + sent), len - sent, flags);//發送資料         ++g_socket_syscalls;//系統調用計數加1         if (b < 0) { //錯誤處理           if (errno == EWOULDBLOCK || errno == EAGAIN) {             return 0;//應該阻塞錯誤直接傳回           }           int errno_copy = errno;//儲存錯誤代碼           if (errno_copy == EPIPE || errno_copy == ECONNRESET || errno_copy == ENOTCONN) {             close();//連接配接錯誤關閉掉socket           }         }         return b;//傳回寫入的位元組數       }      

  這個寫入的實作邏輯和過程也是非常簡單的,隻是需要考慮到各種錯誤的情況并且相應的處理之。

其他函數

TSocket類還有一些其他函數,不過功能都比較簡單,比如設定一些逾時和得到一些成員變量值的函數,哪些函數一般都是幾句代碼完成了。

繼續閱讀