IOCP詳解
IOCP(I/O Completion Port,I/O完成端口)是性能最好的一種I/O模型。它是應用程式使用線程池處理異步I/O請求的一種機制。在處理多個并發的異步I/O請求時,以往的模型都是在接收請求是建立一個線程來應答請求。這樣就有很多的線程并行地運作在系統中。而這些線程都是可運作的,Windows核心花費大量的時間在進行線程的上下文切換,并沒有多少時間花線上程運作上。再加上建立新線程的開銷比較大,是以造成了效率的低下。
Windows Sockets應用程式在調用WSARecv()函數後立即傳回,線程繼續運作。當系統接收資料完成後,向完成端口發送通知包(這個過程對應用程式不可見)。
應用程式在發起接收資料操作後,在完成端口上等待操作結果。當接收到I/O操作完成的通知後,應用程式對資料進行處理。
完成端口其實就是上面兩項的聯合使用基礎上進行了一定的改進。
一個完成端口其實就是一個通知隊列,由作業系統把已經完成的重疊I/O請求的通知放入其中。當某項I/O操作一旦完成,某個可以對該操作結果進行處理的工作者線程就會收到一則通知。而套接字在被建立後,可以在任何時候與某個完成端口進行關聯。
衆所皆知,完成端口是在WINDOWS平台下效率最高,擴充性最好的IO模型,特别針對于WINSOCK的海量連接配接時,更能顯示出其威力。其實建立一個完成端口的伺服器也很簡單,隻要注意幾個函數,了解一下關鍵的步驟也就行了。
分為以下幾步來說明完成端口:
0) 同步IO與異步IO
1) 函數
2) 常見問題以及解答
3) 步驟
4) 例程
0、同步IO與異步IO
同步I/O首先我們來看下同步I/O操作,同步I/O操作就是對于同一個I/O對象句柄在同一時刻隻允許一個I/O操作,原理圖如下:
由圖可知,核心開始處理I/O操作到結束的時間段是T2~T3,這個時間段中使用者線程一直處于等待狀态,如果這個時間段比較短,則不會有什麼問題,但是如果時間比較長,那麼這段時間線程會一直處于挂起狀态,這就會很嚴重影響效率,是以我們可以考慮在這段時間做些事情。
異步I/O操作則很好的解決了這個問題,它可以使得核心開始處理I/O操作到結束的這段時間,讓使用者線程可以去做其他事情,進而提高了使用效率。
由圖可知,核心開始I/O操作到I/O結束這段時間,使用者層可以做其他的操作,然後,當核心I/O結束的時候,可以讓I/O對象或者時間對象通知使用者層,而使用者線程GetOverlappedResult來檢視核心I/O的完成情況。
1、函數
我們在完成端口模型下會使用到的最重要的兩個函數是:
CreateIoCompletionPort、GetQueuedCompletionStatus
CreateIoCompletionPort 的作用是建立一個完成端口和把一個IO句柄和完成端口關聯起來:
// 建立完成端口
HANDLECompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// 把一個IO句柄和完成端口關聯起來,這裡的句柄是一個socket 句柄
CreateIoCompletionPort((HANDLE)sClient,CompletionPort, (DWORD)PerHandleData, 0);
其中第一個參數是句柄,可以是檔案句柄、SOCKET句柄。
第二個就是我們上面建立出來的完成端口,這裡就把兩個東西關聯在一起了。
第三個參數很關鍵,叫做PerHandleData,就是對應于每個句柄的資料塊。我們可以使用這個參數在後面取到與這個SOCKET對應的資料。
最後一個參數給0,意思就是根據CPU的個數,允許盡可能多的線程并發執行。
GetQueuedCompletionStatus的作用就是取得完成端口的結果:
// 從完成端口中取得結果
GetQueuedCompletionStatus(CompletionPort,&BytesTransferred, (LPDWORD)&PerHandleData,(LPOVERLAPPED*)&PerIoData, INFINITE)
第一個參數是完成端口
第二個參數是表明這次的操作傳遞了多少個位元組的資料
第三個參數是OUT類型的參數,就是前面CreateIoCompletionPort傳進去的單句柄資料,這裡就是前面的SOCKET句柄以及與之相對應的資料,這裡作業系統給我們傳回,讓我們不用自己去做清單查詢等操作了。
第四個參數就是進行IO操作的結果,是我們在投遞WSARecv / WSASend 等操作時傳遞進去的,這裡作業系統做好準備後,給我們傳回了。非常省事!!
個人感覺完成端口就是作業系統為我們包裝了很多重疊IO的不爽的地方,讓我們可以更友善的去使用,下篇我将會嘗試去講述完成端口的原理。
2、常見問題和解答
1)什麼是單句柄資料(PerHandle)和單IO資料(PerIO)
單句柄資料就是和句柄對應的資料,像socket句柄,檔案句柄這種東西。
單IO資料,就是對應于每次的IO操作的資料。例如每次的WSARecv/WSASend等等
其實我覺得PER是每次的意思,翻譯成每個句柄資料和每次IO資料還比較清晰一點。
在完成端口中,單句柄資料直接通過GetQueuedCompletionStatus 傳回,省去了我們自己做容器去管理。單IO資料也容許我們自己擴充OVERLAPPED結構,是以,在這裡所有與應用邏輯有關的東西都可以在此擴充。
2)如何判斷用戶端的斷開
我們要處理幾種情況
a)如果用戶端調用了closesocket,我們就可以這樣判斷他的斷開:
if(0== GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, 。。。)
{
}
if(BytesTransferred == 0)
// 用戶端斷開,釋放資源
b)如果是用戶端直接退出,那就會出現64錯誤,指定的網絡名不可再用。這種情況我們也要處理的:
if(0== GetQueuedCompletionStatus(。。。))
if( (GetLastError() == WAIT_TIMEOUT) ||(GetLastError() == ERROR_NETNAME_DELETED) )
{
// 用戶端斷開,釋放資源
}
3)什麼是IOCP?
我們已經提到IOCP 隻不過是一個專門實作用來進行線程間的通信的技術,和信号量(semaphore)相似,是以IOCP并不是一個複雜的概念。一個IOCP 對象是與多個I/O對象關聯的,這些對象支援挂起異步IO調用。直到一個挂起的異步IO調用結束為止,一個通路IOCP的線程都有可能被挂起。
完成端口的目标是使CPU保持在滿負荷狀态下工作。
4)為什麼使用IOCP?
使用IOCP,我們可以克服”一個用戶端一個線程”的問題。我們知道,這樣做的話,如果軟體不是運作在一個多核及其上性能就會急劇下降。線程是系統資源,他們既不是無限制的、也不是代價低廉的。
IOCP提供了一種隻使用一些(I/O worker)線程去“相對公平地”完成多用戶端的”輸入輸出”。線程會一直被挂起,而不會使用CPU時間片,直到有事情做完為止。
5)IOCP是如何工作的?
當使用IOCP時,你必須處理三件事情:a)将一個Socket關聯到完成端口;b)建立一個異步I/O調用; c)與線程進行同步。為了獲得異步IO調用的結果,比如哪個用戶端執行了調用,你必須傳入兩個參數:pCompletionKey參數和OVERLAPPED結構。
3、步驟
編寫完成端口服務程式,無非就是以下幾個步驟:
1、建立一個完成端口
2、根據CPU個數建立工作者線程,把完成端口傳進去線程裡
3、建立偵聽SOCKET,把SOCKET和完成端口關聯起來
4、建立PerIOData,向連接配接進來的SOCKET投遞WSARecv操作
5、線程裡所做的事情:
a、GetQueuedCompletionStatus,在退出的時候就可以使用PostQueudCompletionStatus使線程退出;
b、取得資料并處理;
4、例程
下面是服務端的例程,可以使用sunxin視訊中中的用戶端程式來測試服務端。稍微研究一下,也就會對完成端口模型有個大概的了解了。
執行個體結果伺服器、用戶端如下:
/*
完成端口伺服器
接收到用戶端的資訊,直接顯示出來
*/
#include"winerror.h"
#include"Winsock2.h"
#pragmacomment(lib, "ws2_32")
#include"windows.h"
#include<iostream>
usingnamespace std;
/// 宏定義
#define PORT 5050
#define DATA_BUFSIZE 8192
#define OutErr(a) cout << (a) << endl \
<< "出錯代碼:"<< WSAGetLastError() << endl \
<< "出錯檔案:"<< __FILE__ << endl \
<< "出錯行數:"<< __LINE__ << endl \
#define OutMsg(a) cout << (a) << endl;
/// 全局函數定義
///
//
// 函數名 : InitWinsock
// 功能描述 : 初始化WINSOCK
// 傳回值 : void
void InitWinsock()
// 初始化WINSOCK
WSADATA wsd;
if( WSAStartup(MAKEWORD(2, 2), &wsd) != 0)
{
OutErr("WSAStartup()");
}
// 函數名 : BindServerOverlapped
// 功能描述 : 綁定端口,并傳回一個 Overlapped 的ListenSocket
// 參數 : int nPort
// 傳回值 : SOCKET
SOCKET BindServerOverlapped(int nPort)
// 建立socket
SOCKET sServer = WSASocket(AF_INET,SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
// 綁定端口
struct sockaddr_in servAddr;
servAddr.sin_family = AF_INET;
servAddr.sin_port = htons(nPort);
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(sServer, (struct sockaddr*)&servAddr, sizeof(servAddr)) < 0)
{
OutErr("bind Failed!");
return NULL;
}
// 設定監聽隊列為200
if(listen(sServer, 200) != 0)
OutErr("listen Failed!");
return sServer;
/// 結構體定義
typedef struct
OVERLAPPED Overlapped;
WSABUF DataBuf;
CHAR Buffer[DATA_BUFSIZE];
}PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA;
SOCKET Socket;
}PER_HANDLE_DATA,* LPPER_HANDLE_DATA;
DWORD WINAPI ProcessIO(LPVOID lpParam)
HANDLE CompletionPort = (HANDLE)lpParam;
DWORD BytesTransferred;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
while(true)
if(0 == GetQueuedCompletionStatus(CompletionPort,&BytesTransferred, (LPDWORD)&PerHandleData,(LPOVERLAPPED*)&PerIoData, INFINITE))
{
if( (GetLastError() ==WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )
{
cout << "closingsocket" << PerHandleData->Socket << endl;
closesocket(PerHandleData->Socket);
delete PerIoData;
delete PerHandleData;
continue;
}
else
OutErr("GetQueuedCompletionStatus failed!");
return 0;
}
// 說明用戶端已經退出
if(BytesTransferred == 0)
cout << "closing socket" <<PerHandleData->Socket << endl;
closesocket(PerHandleData->Socket);
delete PerIoData;
delete PerHandleData;
continue;
// 取得資料并處理
cout << PerHandleData->Socket<< "發送過來的消息:" << PerIoData->Buffer<< endl;
// 繼續向 socket 投遞WSARecv操作
DWORD Flags = 0;
DWORD dwRecv = 0;
ZeroMemory(PerIoData,sizeof(PER_IO_OPERATION_DATA));
PerIoData->DataBuf.buf =PerIoData->Buffer;
PerIoData->DataBuf.len = DATA_BUFSIZE;
WSARecv(PerHandleData->Socket,&PerIoData->DataBuf, 1, &dwRecv, &Flags,&PerIoData->Overlapped, NULL);
return 0;
void main()
InitWinsock();
HANDLE CompletionPort =CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
//根據系統的CPU來建立工作者線程
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo);
//線程數目=系統程序數目的兩倍.
for(int i = 0; i <SystemInfo.dwNumberOfProcessors * 2; i++)
HANDLE hProcessIO = CreateThread(NULL, 0,ProcessIO, CompletionPort, 0, NULL);
if(hProcessIO)
{
CloseHandle(hProcessIO);
}
//建立偵聽SOCKET
SOCKET sListen = BindServerOverlapped(PORT);
SOCKET sClient;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
while(true)
// 等待用戶端接入
//sClient = WSAAccept(sListen, NULL, NULL, NULL, 0);
sClient = accept(sListen, 0, 0);
cout << "Socket " << sClient << "連接配接進來"<< endl;
PerHandleData = new PER_HANDLE_DATA();
PerHandleData->Socket = sClient;
// 将接入的用戶端和完成端口聯系起來
CreateIoCompletionPort((HANDLE)sClient, CompletionPort,(DWORD)PerHandleData, 0);
// 建立一個Overlapped,并使用這個Overlapped結構對socket投遞操作
PerIoData = new PER_IO_OPERATION_DATA();
ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->DataBuf.len = DATA_BUFSIZE;
// 投遞一個WSARecv操作
DWORD Flags = 0;
DWORD dwRecv = 0;
WSARecv(sClient, &PerIoData->DataBuf, 1, &dwRecv, &Flags,&PerIoData->Overlapped, NULL);
DWORD dwByteTrans;
//将一個已經完成的IO通知添加到IO完成端口的隊列中.
//提供了與線程池中的所有線程通信的方式.
PostQueuedCompletionStatus(CompletionPort,dwByteTrans, 0, 0); //IO操作完成時接收的位元組數.
closesocket(sListen);
/*--------------------------------------------
**---------用戶端例程式-----------------------
---------------------------------------------*/
#include<stdio.h>
#include<Winsock2.h>
#define MAXCNT 30000
WORD wVersionRequested;
WSADATA wsaData;
int err;
wVersionRequested = MAKEWORD( 2, 2);
err = WSAStartup( wVersionRequested,&wsaData );//WSAStartup()加載套接字庫
if ( err != 0 ) {
return;
if ( LOBYTE( wsaData.wVersion ) != 2 ||
HIBYTE( wsaData.wVersion ) != 2 ){
WSACleanup( );
static int nCnt = 0;
char sendBuf[2000];
// char recvBuf[100];
while(nCnt < MAXCNT)
SOCKETsockClient=socket(AF_INET,SOCK_STREAM,0);
SOCKADDR_IN addrSrv;
addrSrv.sin_addr.S_un.S_addr=inet_addr("127.0.0.1");//本地回路位址127,用于一台機器上測試的IP
addrSrv.sin_family=AF_INET;
addrSrv.sin_port=htons(5050);//和伺服器端的端口号保持一緻
connect(sockClient,(SOCKADDR*)&addrSrv,sizeof(SOCKADDR));//連接配接伺服器端(套接字,位址轉換,長度)
sprintf(sendBuf,"This is TestNo : %d\n",++nCnt);
send(sockClient,sendBuf,strlen(sendBuf)+1,0);//向伺服器端發送資料,"+1"是為了給'\0'留白間
printf("send:%s",sendBuf);
// memset(recvBuf,0,100);
// recv(sockClient,recvBuf,100,0);//接收資料
// printf("%s\n",recvBuf);//列印
closesocket(sockClient);//關閉套接字,釋放為這個套接字配置設定的資源
Sleep(1);
WSACleanup();//終止對這個套接字庫的使用