完成端口模型
“完成端口”模型是迄今為止最為複雜的一種 I / O模型。然而,假若一個應用程式同時需
要管理為數衆多的套接字,那麼采用這種模型,往往可以達到最佳的系統性能!但不幸的是,
該模型隻适用于Windows NT和Windows 2000作業系統。因其設計的複雜性,隻有在你的應用
程式需要同時管理數百乃至上千個套接字的時候,而且希望随着系統内安裝的 C P U數量的增
多,應用程式的性能也可以線性提升,才應考慮采用“完成端口”模型。要記住的一個基本
準則是,假如要為Windows NT或Windows 2000開發高性能的伺服器應用,同時希望為大量套
接字I / O請求提供服務(We b伺服器便是這方面的典型例子),那麼I / O完成端口模型便是最佳
選擇!
從本質上說,完成端口模型要求我們建立一個Wi n 3 2完成端口對象,通過指定數量的線程,
對重疊I / O請求進行管理,以便為已經完成的重疊I / O請求提供服務。要注意的是,所謂“完成
端口”,實際是Wi n 3 2、Windows NT以及Windows 2000采用的一種I / O構造機制,除套接字句
柄之外,實際上還可接受其他東西。然而,本節隻打算講述如何使用套接字句柄,來發揮完
成端口模型的巨大威力。使用這種模型之前,首先要建立一個 I / O完成端口對象,用它面向任
意數量的套接字句柄,管理多個I / O請求。要做到這一點,需要調用CreateIoComletionPort函數。
該函數定義如下:
HANDLE CreateIoCompletionPort (
HANDLE FileHandle, // handle to file
HANDLE ExistingCompletionPort, // handle to I/O completion port
ULONG_PTR CompletionKey, // completion key
DWORD NumberOfConcurrentThreads // number of threads to execute concurrently
);
在我們深入探讨其中的各個參數之前,首先要注意該函數實際用于兩個明顯有别的目的:
■ 用于建立一個完成端口對象。
■ 将一個句柄同完成端口關聯到一起。
最開始建立一個完成端口時,唯一感興趣的參數便是 NumberOfConcurrentThreads(并發
線程的數量);前面三個參數都會被忽略。NumberOfConcurrentThreads參數的特殊之處在于,
它定義了在一個完成端口上,同時允許執行的線程數量。理想情況下,我們希望每個處理器
各自負責一個線程的運作,為完成端口提供服務,避免過于頻繁的線程“場景”切換。若将
該參數設為0,表明系統内安裝了多少個處理器,便允許同時運作多少個線程!可用下述代碼
建立一個I / O完成端口:
CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)
該語句的作用是傳回一個句柄,在為完成端口配置設定了一個套接字句柄後,用來對那個端
口進行标定(引用)。
GetQueuedCompletionStatus函數解釋: 實作從指定的IOCP擷取CP。當CP隊列為空時,對此函數的調用将被阻塞,而不是一直等 待I/O的完成。當CP隊列不為空時,被阻塞的線程将以後進先出(LIFO)順序被釋放。 對于IOCP機制,它允許多線程并發調用GetQueuedCompletionStatus函數,最大并發數 是在調用CreateIoCompletionPort函數時指定的,超出最大并發數的調用線程,将被阻塞。 函數解釋如下: 聲明: BOOL GetQueuedCompletionStatus( HANDLE CompletionPort, LPDWORD lpNumberOfBytes, PULONG_PTR lpCompletionKey, LPOVERLAPPED *lpOverlapped, DWORD dwMilliseconds); 調用參數: CompletionPort:指定的IOCP,該值由CreateIoCompletionPort函數建立。 lpnumberofbytes:一次完成後的I/O操作所傳送資料的位元組數。 lpcompletionkey:當檔案I/O操作完成後,用于存放與之關聯的CK。 lpoverlapped:為調用IOCP機制所引用的OVERLAPPED結構。 dwmilliseconds:用于指定調用者等待CP的時間。 傳回值: 調用成功,則傳回非零數值,相關資料存于lpNumberOfBytes、lpCompletionKey、lpoverlapped變量中。失敗則傳回零值。
int WSASend ( SOCKET s, LPWSABUF lpBuffers DWORD dwBufferCount, LPDWORD lpNumberOfBytesSent, DWORD dwFlags, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );
參數
s:辨別一個已連接配接套接口的描述字。 lpBuffers:一個指向WSABUF結構數組的指針。每個WSABUF結構包含緩沖區的指針和緩沖區的大小。 dwBufferCount:lpBuffers數組中WSABUF結構的數目。 lpNumberOfBytesSent:如果發送操作立即完成,則為一個指向所發送資料位元組數的指針。 dwFlags:标志位。 lpOverlapped:指向WSAOVERLAPPED結構的指針(對于非重疊套接口則忽略)。 lpCompletionRoutine:一個指向發送操作完成後調用的完成例程的指針。(對于非重疊套接口則忽略)。
傳回值: 若無錯誤發生且發送操作立即完成,則WSASend()函數傳回0。這時,完成例程(Completion Routine)應該已經被排程,一旦調用線程處于alertable狀态時就會調用它。否則,傳回SOCKET_ERROR 。通過WSAGetLastError獲得詳細的錯誤代碼。WSA_IO_PENDING 這個錯誤碼(其實表示沒有錯誤)表示重疊操作已經送出成功(就是異步IO的意思了),稍後會提示完成(這個完成可不一定是發送成功,沒準出問題也不一定)。其他的錯誤代碼都代表重疊操作沒有正确開始,也不會有完成标志出現。
int WSARecv( SOCKET s, // 當然是投遞這個操作的套接字 ,與Recv函數不同 // 這裡需要一個由WSABUF結構構成的數組 DWORD dwBufferCount, // 數組中WSABUF結構的數量 LPDWORD lpNumberOfBytesRecvd, // 如果接收操作立即完成,這裡會傳回函數調用所接收到的位元組數 LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine // 完成例程中将會用到的參數,我們這裡設定為 NULL ); 傳回值: WSA_IO_PENDING : 最常見的傳回值,這是說明我們的WSARecv操作成功了,但是I/O操作還沒有完成,是以我們就需要綁定一個事件來通知我們操作何時完成
1. 工作者線程與完成端口
成功建立一個完成端口後,便可開始将套接字句柄與對象關聯到一起。但在關聯套接字
之前,首先必須建立一個或多個“工作者線程”,以便在I / O請求投遞給完成端口對象後,為
完成端口提供服務。在這個時候,大家或許會覺得奇怪,到底應建立多少個線程,以便為完
成端口提供服務呢?這實際正是完成端口模型顯得頗為“複雜”的一個方面,因為服務 I / O請
求所需的數量取決于應用程式的總體設計情況。在此要記住的一個重點在于,在我們調用
CreateIoComletionPort時指定的并發線程數量,與打算建立的工作者線程數量相比,它們代表
的并非同一件事情。早些時候,我們曾建議大家用 CreateIoComletionPort函數為每個處理器
都指定一個線程(處理器的數量有多少,便指定多少線程)以避免由于頻繁的線程“場景”
交換活動,進而影響系統的整體性能。CreateIoComletionPort函數的NumberOfConcurrentThreads參數明确訓示系統:在一個完成端口上,一
次隻允許 n個工作者線程運作。假如在完
成端口上建立的工作者線程數量超出 n個,那麼在同一時刻,最多隻允許 n個線程運作。但實
際上,在一段較短的時間内,系統有可能超過這個值,但很快便會把它減少至事先在
CreateIoComletionPort函數中設定的值。那麼,為何實際建立的工作者線程數量有時要比
CreateIoComletionPort函數設定的多一些呢?這樣做有必要嗎?如先前所述,這主要取決于
應用程式的總體設計情況。假定我們的某個工作者線程調用了一個函數,比如 Sleep或
WaitForSingleObject,但卻進入了暫停(鎖定或挂起)狀态,那麼允許另一個線程代替它的位
置。換言之,我們希望随時都能執行盡可能多的線程;當然,最大的線程數量是事先在
CreateIoCompletonPort調用裡設定好的。這樣一來,假如事先預計到自己的線程有可能暫時
處于停頓狀态,那麼最好能夠建立比 CreateIoCompletonPort的NumberOfConcurrentThreads參數的值多的線程,以便到時候充分發揮系統的潛
力。
一旦在完成端口上擁有足夠多的工作者線程來為I / O請求提供服務,便可着手将套接字句柄
同完成端口關聯到一起。這要求我們在一個現有的完成端口上,調用CreateIo CompletionPort函
數,同時為前三個參數—FileHandle,ExistingCompletionPort和CompletionKey—提供套
接字的資訊。其中,FileHandle參數指定一個要同完成端口關聯在一起的套接字句柄。
ExistingCompletionPort參數指定的是一個現有的完成端口。 CompletionKey(完成鍵)參
數則指定要與某個特定套接字句柄關聯在一起的“單句柄資料”;在這個參數中,應用程式
可儲存與一個套接字對應的任意類型的資訊。之是以把它叫作“單句柄資料”,是由于它隻對
應着與那個套接字句柄關聯在一起的資料。可将其作為指向一個資料結構的指針,來儲存套
接字句柄;在那個結構中,同時包含了套接字的句柄,以及與那個套接字有關的其他資訊。
就象本章稍後還會講述的那樣,為完成端口提供服務的線程例程可通過這個參數,取得與套
接字句柄有關的資訊。
根據我們到目前為止學到的東西,首先來建構一個基本的應用程式架構。程式清單 8 - 9向
大家闡述了如何使用完成端口模型,來開發一個回應(或“反射”)伺服器應用。在這個程式
中,我們基本上按下述步驟行事:
1) 建立一個完成端口。第四個參數保持為0,指定在完成端口上,每個處理器一次隻允許
執行一個工作者線程。
2) 判斷系統内到底安裝了多少個處理器。
3) 建立工作者線程,根據步驟2 )得到的處理器資訊,在完成端口上,為已完成的 I / O請求
提供服務。在這個簡單的例子中,我們為每個處理器都隻建立一個工作者線程。這是由于事
先已預計到,到時不會有任何線程進入“挂起”狀态,造成由于線程數量的不足,而使處理
器空閑的局面(沒有足夠的線程可供執行)。調用CreateThread函數時,必須同時提供一個工
作者例程,由線程在建立好執行。本節稍後還會詳細讨論線程的職責。
4) 準備好一個監聽套接字,在端口5150上監聽進入的連接配接請求。
5) 使用accept函數,接受進入的連接配接請求。
6) 建立一個資料結構,用于容納“單句柄資料”,同時在結構中存入接受的套接字句柄。
7) 調用CreateIoComletionPort,将自accept傳回的新套接字句柄同完成端口關聯到一起。
通過完成鍵(CompletionKey)參數,将單句柄資料結構傳遞給CreateIoComletionPort。
8) 開始在已接受的連接配接上進行 I / O操作。在此,我們希望通過重疊 I / O機制,在建立的套
接字上投遞一個或多個異步 WSARecv或WSASend請求。這些I / O請求完成後,一個工作者線
程會為I / O請求提供服務,同時繼續處理未來的 I / O請求,稍後便會在步驟3 )指定的工作者例程
中,體驗到這一點。
9) 重複步驟5 ) ~ 8 ),直至伺服器中止。
程式清單8-9 完成端口的建立
代碼說明:
1 OVERLAPPED 是如何到達工作線程的?或者說工作線程中的GetQueuedCompletionStatus函數的第四個參數為什麼能到得到用戶端對應的
OVERLAPPED?
答:CreateIoCompletionPort函數将完成端口對象與用戶端套接字綁定,WSARecv函數在接收完成後,将OVERLAPPED投遞到了與用戶端套接字綁定的完成端口對象,是以在該完成端口對象中,GetQueuedCompletionStatus可以獲得OVERLAPPED。
2 GetQueuedCompletionStatus獲得了OVERLAPPED,為什麼可以當作PER_IO_OPERATION_DATA結構來使用?
答:這用到了一點程式設計技巧--“尾随資料”,PER_IO_OPERATION_DATA結構體的設計将OVERLAPPED成員放在了第一個,是以PER_IO_OPERATION_DATA結構體的位址正是OVERLAPPED成員的首位址,在問題1中,OVERLAPPED是通過首位址來傳遞的,是以GetQueuedCompletionStatus獲得OVERLAPPED首位址後,可以當作PER_IO_OPERATION_DATA結構來使用。
3注意一點:WSARecv函數和WSASend函數的 dwFlags參數一般設定為0,傳參前記得将參數指派為0.
- #include <winsock2.h>
- #include <windows.h>
- #include <stdio.h>
- #define PORT 5150
- #define DATA_BUFSIZE 8192
- typedef struct
- {
- OVERLAPPED Overlapped;
- WSABUF DataBuf;
- CHAR Buffer[DATA_BUFSIZE];
- DWORD BytesSEND;
- DWORD BytesRECV;
- } PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;
- typedef struct
- {
- SOCKET Socket;
- } PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
- DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID);
- void main(void)
- {
- SOCKADDR_IN InternetAddr;
- SOCKET Listen;
- SOCKET Accept;
- HANDLE CompletionPort;
- SYSTEM_INFO SystemInfo;
- LPPER_HANDLE_DATA PerHandleData;
- LPPER_IO_OPERATION_DATA PerIoData;
- int i;
- DWORD RecvBytes;
- DWORD Flags;
- DWORD ThreadID;
- WSADATA wsaData;
- DWORD Ret;
- if ((Ret = WSAStartup(0x0202, &wsaData)) != 0)
- {
- printf("WSAStartup failed with error %d/n", Ret);
- return;
- }
- // Setup an I/O completion port.
- if ((CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL)
- {
- printf( "CreateIoCompletionPort failed with error: %d/n", GetLastError());
- return;
- }
- // Determine how many processors are on the system.
- GetSystemInfo(&SystemInfo);
- // Create worker threads based on the number of processors available on the
- // system. Create two worker threads for each processor.
- for(i = 0; i < SystemInfo.dwNumberOfProcessors * 2; i++)
- {
- HANDLE ThreadHandle;
- // Create a server worker thread and pass the completion port to the thread.
- if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort,
- 0, &ThreadID)) == NULL)
- {
- printf("CreateThread() failed with error %d/n", GetLastError());
- return;
- }
- // Close the thread handle
- CloseHandle(ThreadHandle);
- }
- // Create a listening socket
- if ((Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,
- WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
- {
- printf("WSASocket() failed with error %d/n", WSAGetLastError());
- return;
- }
- InternetAddr.sin_family = AF_INET;
- InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
- InternetAddr.sin_port = htons(PORT);
- if (bind(Listen, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr)) == SOCKET_ERROR)
- {
- printf("bind() failed with error %d/n", WSAGetLastError());
- return;
- }
- // Prepare socket for listening
- if (listen(Listen, 5) == SOCKET_ERROR)
- {
- printf("listen() failed with error %d/n", WSAGetLastError());
- return;
- }
- // Accept connections and assign to the completion port.
- while(TRUE)
- {
- if ((Accept = WSAAccept(Listen, NULL, NULL, NULL, 0)) == SOCKET_ERROR)
- {
- printf("WSAAccept() failed with error %d/n", WSAGetLastError());
- return;
- }
- // Create a socket information structure to associate with the socket
- if ((PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR,
- sizeof(PER_HANDLE_DATA))) == NULL)
- {
- printf("GlobalAlloc() failed with error %d/n", GetLastError());
- return;
- }
- // Associate the accepted socket with the original completion port.
- printf("Socket number %d connected/n", Accept);
- PerHandleData->Socket = Accept;
- if (CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData,
- 0) == NULL)
- {
- printf("CreateIoCompletionPort failed with error %d/n", GetLastError());
- return;
- }
- // Create per I/O socket information structure to associate with the
- // WSARecv call below.
- if ((PerIoData = (LPPER_IO_OPERATION_DATA) GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA))) == NULL)
- {
- printf("GlobalAlloc() failed with error %d/n", GetLastError());
- return;
- }
- ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
- PerIoData->BytesSEND = 0;
- PerIoData->BytesRECV = 0;
- PerIoData->DataBuf.len = DATA_BUFSIZE;
- PerIoData->DataBuf.buf = PerIoData->Buffer;
- Flags = 0;
- if (WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,
- &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
- {
- if (WSAGetLastError() != ERROR_IO_PENDING)
- {
- printf("WSARecv() failed with error %d/n", WSAGetLastError());
- return;
- }
- }
- }
- }
- DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
- {
- HANDLE CompletionPort = (HANDLE) CompletionPortID;
- DWORD BytesTransferred;
- LPOVERLAPPED Overlapped;
- LPPER_HANDLE_DATA PerHandleData;
- LPPER_IO_OPERATION_DATA PerIoData;
- DWORD SendBytes, RecvBytes;
- DWORD Flags;
- while(TRUE)
- {
- if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,
- (LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)
- {
- printf("GetQueuedCompletionStatus failed with error %d/n", GetLastError());
- return 0;
- }
- // First check to see if an error has occured on the socket and if so
- // then close the socket and cleanup the SOCKET_INFORMATION structure
- // associated with the socket.
- if (BytesTransferred == 0)
- {
- printf("Closing socket %d/n", PerHandleData->Socket);
- if (closesocket(PerHandleData->Socket) == SOCKET_ERROR)
- {
- printf("closesocket() failed with error %d/n", WSAGetLastError());
- return 0;
- }
- GlobalFree(PerHandleData);
- GlobalFree(PerIoData);
- continue;
- }
- // Check to see if the BytesRECV field equals zero. If this is so, then
- // this means a WSARecv call just completed so update the BytesRECV field
- // with the BytesTransferred value from the completed WSARecv() call.
- if (PerIoData->BytesRECV == 0)
- {
- PerIoData->BytesRECV = BytesTransferred;
- PerIoData->BytesSEND = 0;
- }
- else
- {
- PerIoData->BytesSEND += BytesTransferred;
- }
- if (PerIoData->BytesRECV > PerIoData->BytesSEND)
- {
- // Post another WSASend() request.
- // Since WSASend() is not gauranteed to send all of the bytes requested,
- // continue posting WSASend() calls until all received bytes are sent.
- ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
- PerIoData->DataBuf.buf = PerIoData->Buffer + PerIoData->BytesSEND;
- PerIoData->DataBuf.len = PerIoData->BytesRECV - PerIoData->BytesSEND;
- if (WSASend(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &SendBytes, 0,
- &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
- {
- if (WSAGetLastError() != ERROR_IO_PENDING)
- {
- printf("WSASend() failed with error %d/n", WSAGetLastError());
- return 0;
- }
- }
- }
- else
- {
- PerIoData->BytesRECV = 0;
- // Now that there are no more bytes to send post another WSARecv() request.
- Flags = 0;
- ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
- PerIoData->DataBuf.len = DATA_BUFSIZE;
- PerIoData->DataBuf.buf = PerIoData->Buffer;
- if (WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,
- &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
- {
- if (WSAGetLastError() != ERROR_IO_PENDING)
- {
- printf("WSARecv() failed with error %d/n", WSAGetLastError());
- return 0;
- }
- }
- }
- }
- }