天天看點

IOCP詳解

IOCP詳解

IOCP(I/O Completion Port,I/O完成端口)是性能最好的一種I/O模型。它是應用程式使用線程池處理異步I/O請求的一種機制。在處理多個并發的異步I/O請求時,以往的模型都是在接收請求是建立一個線程來應答請求。這樣就有很多的線程并行地運作在系統中。而這些線程都是可運作的,Windows核心花費大量的時間在進行線程的上下文切換,并沒有多少時間花線上程運作上。再加上建立新線程的開銷比較大,是以造成了效率的低下。

Windows Sockets應用程式在調用WSARecv()函數後立即傳回,線程繼續運作。當系統接收資料完成後,向完成端口發送通知包(這個過程對應用程式不可見)。

應用程式在發起接收資料操作後,在完成端口上等待操作結果。當接收到I/O操作完成的通知後,應用程式對資料進行處理。

IOCP詳解

完成端口其實就是上面兩項的聯合使用基礎上進行了一定的改進。

一個完成端口其實就是一個通知隊列,由作業系統把已經完成的重疊I/O請求的通知放入其中。當某項I/O操作一旦完成,某個可以對該操作結果進行處理的工作者線程就會收到一則通知。而套接字在被建立後,可以在任何時候與某個完成端口進行關聯。

衆所皆知,完成端口是在WINDOWS平台下效率最高,擴充性最好的IO模型,特别針對于WINSOCK的海量連接配接時,更能顯示出其威力。其實建立一個完成端口的伺服器也很簡單,隻要注意幾個函數,了解一下關鍵的步驟也就行了。

分為以下幾步來說明完成端口:

0)       同步IO與異步IO

1)       函數

2)       常見問題以及解答

3)       步驟

4)       例程

0、同步IO與異步IO

同步I/O首先我們來看下同步I/O操作,同步I/O操作就是對于同一個I/O對象句柄在同一時刻隻允許一個I/O操作,原理圖如下:

IOCP詳解

由圖可知,核心開始處理I/O操作到結束的時間段是T2~T3,這個時間段中使用者線程一直處于等待狀态,如果這個時間段比較短,則不會有什麼問題,但是如果時間比較長,那麼這段時間線程會一直處于挂起狀态,這就會很嚴重影響效率,是以我們可以考慮在這段時間做些事情。

異步I/O操作則很好的解決了這個問題,它可以使得核心開始處理I/O操作到結束的這段時間,讓使用者線程可以去做其他事情,進而提高了使用效率。

IOCP詳解

由圖可知,核心開始I/O操作到I/O結束這段時間,使用者層可以做其他的操作,然後,當核心I/O結束的時候,可以讓I/O對象或者時間對象通知使用者層,而使用者線程GetOverlappedResult來檢視核心I/O的完成情況。

1、函數

我們在完成端口模型下會使用到的最重要的兩個函數是:

CreateIoCompletionPort、GetQueuedCompletionStatus

CreateIoCompletionPort  的作用是建立一個完成端口和把一個IO句柄和完成端口關聯起來:

// 建立完成端口

HANDLECompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

// 把一個IO句柄和完成端口關聯起來,這裡的句柄是一個socket 句柄

CreateIoCompletionPort((HANDLE)sClient,CompletionPort, (DWORD)PerHandleData, 0);

其中第一個參數是句柄,可以是檔案句柄、SOCKET句柄。

第二個就是我們上面建立出來的完成端口,這裡就把兩個東西關聯在一起了。

第三個參數很關鍵,叫做PerHandleData,就是對應于每個句柄的資料塊。我們可以使用這個參數在後面取到與這個SOCKET對應的資料。

最後一個參數給0,意思就是根據CPU的個數,允許盡可能多的線程并發執行。

GetQueuedCompletionStatus的作用就是取得完成端口的結果:

// 從完成端口中取得結果

GetQueuedCompletionStatus(CompletionPort,&BytesTransferred, (LPDWORD)&PerHandleData,(LPOVERLAPPED*)&PerIoData, INFINITE)

第一個參數是完成端口

第二個參數是表明這次的操作傳遞了多少個位元組的資料

第三個參數是OUT類型的參數,就是前面CreateIoCompletionPort傳進去的單句柄資料,這裡就是前面的SOCKET句柄以及與之相對應的資料,這裡作業系統給我們傳回,讓我們不用自己去做清單查詢等操作了。

第四個參數就是進行IO操作的結果,是我們在投遞WSARecv / WSASend 等操作時傳遞進去的,這裡作業系統做好準備後,給我們傳回了。非常省事!!

個人感覺完成端口就是作業系統為我們包裝了很多重疊IO的不爽的地方,讓我們可以更友善的去使用,下篇我将會嘗試去講述完成端口的原理。

2、常見問題和解答

1)什麼是單句柄資料(PerHandle)和單IO資料(PerIO)

單句柄資料就是和句柄對應的資料,像socket句柄,檔案句柄這種東西。

單IO資料,就是對應于每次的IO操作的資料。例如每次的WSARecv/WSASend等等

其實我覺得PER是每次的意思,翻譯成每個句柄資料和每次IO資料還比較清晰一點。

在完成端口中,單句柄資料直接通過GetQueuedCompletionStatus 傳回,省去了我們自己做容器去管理。單IO資料也容許我們自己擴充OVERLAPPED結構,是以,在這裡所有與應用邏輯有關的東西都可以在此擴充。

2)如何判斷用戶端的斷開

我們要處理幾種情況

a)如果用戶端調用了closesocket,我們就可以這樣判斷他的斷開:

if(0== GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, 。。。)

{

}

if(BytesTransferred == 0)

   // 用戶端斷開,釋放資源

b)如果是用戶端直接退出,那就會出現64錯誤,指定的網絡名不可再用。這種情況我們也要處理的:

if(0== GetQueuedCompletionStatus(。。。))

  if( (GetLastError() == WAIT_TIMEOUT) ||(GetLastError() == ERROR_NETNAME_DELETED) )

  {

       // 用戶端斷開,釋放資源

  }

3)什麼是IOCP?

我們已經提到IOCP 隻不過是一個專門實作用來進行線程間的通信的技術,和信号量(semaphore)相似,是以IOCP并不是一個複雜的概念。一個IOCP 對象是與多個I/O對象關聯的,這些對象支援挂起異步IO調用。直到一個挂起的異步IO調用結束為止,一個通路IOCP的線程都有可能被挂起。

完成端口的目标是使CPU保持在滿負荷狀态下工作。

4)為什麼使用IOCP?

使用IOCP,我們可以克服”一個用戶端一個線程”的問題。我們知道,這樣做的話,如果軟體不是運作在一個多核及其上性能就會急劇下降。線程是系統資源,他們既不是無限制的、也不是代價低廉的。

IOCP提供了一種隻使用一些(I/O worker)線程去“相對公平地”完成多用戶端的”輸入輸出”。線程會一直被挂起,而不會使用CPU時間片,直到有事情做完為止。

5)IOCP是如何工作的?

當使用IOCP時,你必須處理三件事情:a)将一個Socket關聯到完成端口;b)建立一個異步I/O調用; c)與線程進行同步。為了獲得異步IO調用的結果,比如哪個用戶端執行了調用,你必須傳入兩個參數:pCompletionKey參數和OVERLAPPED結構。

3、步驟

編寫完成端口服務程式,無非就是以下幾個步驟:

 1、建立一個完成端口

 2、根據CPU個數建立工作者線程,把完成端口傳進去線程裡

 3、建立偵聽SOCKET,把SOCKET和完成端口關聯起來

 4、建立PerIOData,向連接配接進來的SOCKET投遞WSARecv操作

 5、線程裡所做的事情:

a、GetQueuedCompletionStatus,在退出的時候就可以使用PostQueudCompletionStatus使線程退出;

b、取得資料并處理;

4、例程

下面是服務端的例程,可以使用sunxin視訊中中的用戶端程式來測試服務端。稍微研究一下,也就會對完成端口模型有個大概的了解了。

執行個體結果伺服器、用戶端如下:

IOCP詳解
IOCP詳解

/*

  完成端口伺服器

  接收到用戶端的資訊,直接顯示出來

*/

#include"winerror.h"

#include"Winsock2.h"

#pragmacomment(lib, "ws2_32")

#include"windows.h"

#include<iostream>

usingnamespace std;

/// 宏定義

#define PORT 5050

#define DATA_BUFSIZE 8192

#define OutErr(a) cout << (a) << endl \

     << "出錯代碼:"<< WSAGetLastError() << endl \

     << "出錯檔案:"<< __FILE__ << endl  \

     << "出錯行數:"<< __LINE__ << endl \

#define OutMsg(a) cout << (a) << endl;

/// 全局函數定義

///

//

// 函數名       : InitWinsock

// 功能描述     : 初始化WINSOCK

// 傳回值       : void

void InitWinsock()

      // 初始化WINSOCK

       WSADATA wsd;

       if( WSAStartup(MAKEWORD(2, 2), &wsd) != 0)

       {

              OutErr("WSAStartup()");

       }

// 函數名       : BindServerOverlapped

// 功能描述     : 綁定端口,并傳回一個 Overlapped 的ListenSocket

// 參數         : int nPort

// 傳回值       : SOCKET

SOCKET BindServerOverlapped(int nPort)

// 建立socket

SOCKET sServer = WSASocket(AF_INET,SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

// 綁定端口

struct sockaddr_in servAddr;

servAddr.sin_family = AF_INET;

servAddr.sin_port = htons(nPort);

servAddr.sin_addr.s_addr = htonl(INADDR_ANY);

if(bind(sServer, (struct sockaddr*)&servAddr, sizeof(servAddr)) < 0)

{

       OutErr("bind Failed!");

       return NULL;

}

// 設定監聽隊列為200

if(listen(sServer, 200) != 0)

       OutErr("listen Failed!");

return sServer;

/// 結構體定義

typedef struct

  OVERLAPPED Overlapped;

  WSABUF DataBuf;

  CHAR Buffer[DATA_BUFSIZE];

}PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA;

  SOCKET Socket;

}PER_HANDLE_DATA,* LPPER_HANDLE_DATA;

DWORD WINAPI ProcessIO(LPVOID lpParam)

   HANDLE CompletionPort = (HANDLE)lpParam;

   DWORD BytesTransferred;

   LPPER_HANDLE_DATA PerHandleData;

   LPPER_IO_OPERATION_DATA PerIoData;

while(true)

      if(0 == GetQueuedCompletionStatus(CompletionPort,&BytesTransferred, (LPDWORD)&PerHandleData,(LPOVERLAPPED*)&PerIoData, INFINITE))

      {

             if( (GetLastError() ==WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )

             {

                    cout << "closingsocket" << PerHandleData->Socket << endl;

                    closesocket(PerHandleData->Socket);

                    delete PerIoData;

                    delete PerHandleData;

                    continue;

             }

             else

              OutErr("GetQueuedCompletionStatus failed!");

             return 0;

      }

      // 說明用戶端已經退出

      if(BytesTransferred == 0)

        cout << "closing socket" <<PerHandleData->Socket << endl;

        closesocket(PerHandleData->Socket);

        delete PerIoData;

        delete PerHandleData;

        continue;

      // 取得資料并處理

      cout << PerHandleData->Socket<< "發送過來的消息:" << PerIoData->Buffer<< endl;

      // 繼續向 socket 投遞WSARecv操作

      DWORD Flags = 0;

      DWORD dwRecv = 0;

      ZeroMemory(PerIoData,sizeof(PER_IO_OPERATION_DATA));

      PerIoData->DataBuf.buf =PerIoData->Buffer;

      PerIoData->DataBuf.len = DATA_BUFSIZE;

      WSARecv(PerHandleData->Socket,&PerIoData->DataBuf, 1, &dwRecv, &Flags,&PerIoData->Overlapped, NULL);

return 0;

void main()

       InitWinsock();

       HANDLE CompletionPort =CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

       //根據系統的CPU來建立工作者線程

       SYSTEM_INFO SystemInfo;

       GetSystemInfo(&SystemInfo);

       //線程數目=系統程序數目的兩倍.

       for(int i = 0; i <SystemInfo.dwNumberOfProcessors * 2; i++)

              HANDLE hProcessIO = CreateThread(NULL, 0,ProcessIO, CompletionPort, 0, NULL);

              if(hProcessIO)

              {

                     CloseHandle(hProcessIO);

              }

       //建立偵聽SOCKET

       SOCKET sListen = BindServerOverlapped(PORT);

       SOCKET sClient;

       LPPER_HANDLE_DATA PerHandleData;

       LPPER_IO_OPERATION_DATA PerIoData;

       while(true)

              // 等待用戶端接入

              //sClient = WSAAccept(sListen, NULL, NULL, NULL, 0);

              sClient = accept(sListen, 0, 0);

              cout << "Socket " << sClient << "連接配接進來"<< endl;

              PerHandleData = new PER_HANDLE_DATA();

              PerHandleData->Socket = sClient;

              // 将接入的用戶端和完成端口聯系起來

              CreateIoCompletionPort((HANDLE)sClient, CompletionPort,(DWORD)PerHandleData, 0);

              // 建立一個Overlapped,并使用這個Overlapped結構對socket投遞操作

              PerIoData = new PER_IO_OPERATION_DATA();

              ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));

              PerIoData->DataBuf.buf = PerIoData->Buffer;

              PerIoData->DataBuf.len = DATA_BUFSIZE;

              // 投遞一個WSARecv操作

              DWORD Flags = 0;

              DWORD dwRecv = 0;

              WSARecv(sClient, &PerIoData->DataBuf, 1, &dwRecv, &Flags,&PerIoData->Overlapped, NULL);

      DWORD dwByteTrans;

      //将一個已經完成的IO通知添加到IO完成端口的隊列中.

       //提供了與線程池中的所有線程通信的方式.

       PostQueuedCompletionStatus(CompletionPort,dwByteTrans, 0, 0);  //IO操作完成時接收的位元組數.

       closesocket(sListen);

/*--------------------------------------------

**---------用戶端例程式-----------------------

---------------------------------------------*/

#include<stdio.h>

#include<Winsock2.h>

#define MAXCNT 30000

      WORD wVersionRequested;

      WSADATA wsaData;

      int err;

      wVersionRequested = MAKEWORD( 2, 2);

      err = WSAStartup( wVersionRequested,&wsaData );//WSAStartup()加載套接字庫

      if ( err != 0 ) {

             return;

      if ( LOBYTE( wsaData.wVersion ) != 2 ||

             HIBYTE( wsaData.wVersion ) != 2 ){

             WSACleanup( );

      static int nCnt = 0;

      char sendBuf[2000];

//     char recvBuf[100];

      while(nCnt < MAXCNT)

             SOCKETsockClient=socket(AF_INET,SOCK_STREAM,0);

             SOCKADDR_IN addrSrv;

             addrSrv.sin_addr.S_un.S_addr=inet_addr("127.0.0.1");//本地回路位址127,用于一台機器上測試的IP

             addrSrv.sin_family=AF_INET;

             addrSrv.sin_port=htons(5050);//和伺服器端的端口号保持一緻

             connect(sockClient,(SOCKADDR*)&addrSrv,sizeof(SOCKADDR));//連接配接伺服器端(套接字,位址轉換,長度)

             sprintf(sendBuf,"This is TestNo : %d\n",++nCnt);

             send(sockClient,sendBuf,strlen(sendBuf)+1,0);//向伺服器端發送資料,"+1"是為了給'\0'留白間

             printf("send:%s",sendBuf);

//           memset(recvBuf,0,100);

//           recv(sockClient,recvBuf,100,0);//接收資料

//           printf("%s\n",recvBuf);//列印

             closesocket(sockClient);//關閉套接字,釋放為這個套接字配置設定的資源

             Sleep(1);

      WSACleanup();//終止對這個套接字庫的使用