天天看點

完成端口模型IOCP

完成端口模型

“完成端口”模型是迄今為止最為複雜的一種 I / O模型。然而,假若一個應用程式同時需

要管理為數衆多的套接字,那麼采用這種模型,往往可以達到最佳的系統性能!但不幸的是,

該模型隻适用于Windows NT和Windows 2000作業系統。因其設計的複雜性,隻有在你的應用

程式需要同時管理數百乃至上千個套接字的時候,而且希望随着系統内安裝的 C P U數量的增

多,應用程式的性能也可以線性提升,才應考慮采用“完成端口”模型。要記住的一個基本

準則是,假如要為Windows NT或Windows 2000開發高性能的伺服器應用,同時希望為大量套

接字I / O請求提供服務(We b伺服器便是這方面的典型例子),那麼I / O完成端口模型便是最佳

選擇!

從本質上說,完成端口模型要求我們建立一個Wi n 3 2完成端口對象,通過指定數量的線程,

對重疊I / O請求進行管理,以便為已經完成的重疊I / O請求提供服務。要注意的是,所謂“完成

端口”,實際是Wi n 3 2、Windows NT以及Windows 2000采用的一種I / O構造機制,除套接字句

柄之外,實際上還可接受其他東西。然而,本節隻打算講述如何使用套接字句柄,來發揮完

成端口模型的巨大威力。使用這種模型之前,首先要建立一個 I / O完成端口對象,用它面向任

意數量的套接字句柄,管理多個I / O請求。要做到這一點,需要調用CreateIoComletionPort函數。

該函數定義如下:

HANDLE CreateIoCompletionPort (

  HANDLE FileHandle,              // handle to file

  HANDLE ExistingCompletionPort,  // handle to I/O completion port

  ULONG_PTR CompletionKey,        // completion key

  DWORD NumberOfConcurrentThreads // number of threads to execute concurrently

);

在我們深入探讨其中的各個參數之前,首先要注意該函數實際用于兩個明顯有别的目的:

■ 用于建立一個完成端口對象。

■ 将一個句柄同完成端口關聯到一起。

最開始建立一個完成端口時,唯一感興趣的參數便是 NumberOfConcurrentThreads(并發

線程的數量);前面三個參數都會被忽略。NumberOfConcurrentThreads參數的特殊之處在于,

它定義了在一個完成端口上,同時允許執行的線程數量。理想情況下,我們希望每個處理器

各自負責一個線程的運作,為完成端口提供服務,避免過于頻繁的線程“場景”切換。若将

該參數設為0,表明系統内安裝了多少個處理器,便允許同時運作多少個線程!可用下述代碼

建立一個I / O完成端口:

CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)

該語句的作用是傳回一個句柄,在為完成端口配置設定了一個套接字句柄後,用來對那個端

口進行标定(引用)。

GetQueuedCompletionStatus函數解釋: 實作從指定的IOCP擷取CP。當CP隊列為空時,對此函數的調用将被阻塞,而不是一直等 待I/O的完成。當CP隊列不為空時,被阻塞的線程将以後進先出(LIFO)順序被釋放。 對于IOCP機制,它允許多線程并發調用GetQueuedCompletionStatus函數,最大并發數 是在調用CreateIoCompletionPort函數時指定的,超出最大并發數的調用線程,将被阻塞。 函數解釋如下: 聲明: BOOL GetQueuedCompletionStatus( HANDLE CompletionPort, LPDWORD lpNumberOfBytes, PULONG_PTR lpCompletionKey, LPOVERLAPPED *lpOverlapped, DWORD dwMilliseconds); 調用參數: CompletionPort:指定的IOCP,該值由CreateIoCompletionPort函數建立。 lpnumberofbytes:一次完成後的I/O操作所傳送資料的位元組數。 lpcompletionkey:當檔案I/O操作完成後,用于存放與之關聯的CK。 lpoverlapped:為調用IOCP機制所引用的OVERLAPPED結構。 dwmilliseconds:用于指定調用者等待CP的時間。 傳回值: 調用成功,則傳回非零數值,相關資料存于lpNumberOfBytes、lpCompletionKey、lpoverlapped變量中。失敗則傳回零值。

int WSASend ( SOCKET s, LPWSABUF lpBuffers DWORD dwBufferCount, LPDWORD lpNumberOfBytesSent, DWORD dwFlags, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );

參數

s:辨別一個已連接配接套接口的描述字。 lpBuffers:一個指向WSABUF結構數組的指針。每個WSABUF結構包含緩沖區的指針和緩沖區的大小。 dwBufferCount:lpBuffers數組中WSABUF結構的數目。 lpNumberOfBytesSent:如果發送操作立即完成,則為一個指向所發送資料位元組數的指針。 dwFlags:标志位。 lpOverlapped:指向WSAOVERLAPPED結構的指針(對于非重疊套接口則忽略)。 lpCompletionRoutine:一個指向發送操作完成後調用的完成例程的指針。(對于非重疊套接口則忽略)。

傳回值: 若無錯誤發生且發送操作立即完成,則WSASend()函數傳回0。這時,完成例程(Completion Routine)應該已經被排程,一旦調用線程處于alertable狀态時就會調用它。否則,傳回SOCKET_ERROR 。通過WSAGetLastError獲得詳細的錯誤代碼。WSA_IO_PENDING 這個錯誤碼(其實表示沒有錯誤)表示重疊操作已經送出成功(就是異步IO的意思了),稍後會提示完成(這個完成可不一定是發送成功,沒準出問題也不一定)。其他的錯誤代碼都代表重疊操作沒有正确開始,也不會有完成标志出現。

int WSARecv( SOCKET s, // 當然是投遞這個操作的套接字 ,與Recv函數不同 // 這裡需要一個由WSABUF結構構成的數組 DWORD dwBufferCount, // 數組中WSABUF結構的數量 LPDWORD lpNumberOfBytesRecvd, // 如果接收操作立即完成,這裡會傳回函數調用所接收到的位元組數 LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine // 完成例程中将會用到的參數,我們這裡設定為 NULL ); 傳回值: WSA_IO_PENDING : 最常見的傳回值,這是說明我們的WSARecv操作成功了,但是I/O操作還沒有完成,是以我們就需要綁定一個事件來通知我們操作何時完成

1. 工作者線程與完成端口

成功建立一個完成端口後,便可開始将套接字句柄與對象關聯到一起。但在關聯套接字

之前,首先必須建立一個或多個“工作者線程”,以便在I / O請求投遞給完成端口對象後,為

完成端口提供服務。在這個時候,大家或許會覺得奇怪,到底應建立多少個線程,以便為完

成端口提供服務呢?這實際正是完成端口模型顯得頗為“複雜”的一個方面,因為服務 I / O請

求所需的數量取決于應用程式的總體設計情況。在此要記住的一個重點在于,在我們調用

CreateIoComletionPort時指定的并發線程數量,與打算建立的工作者線程數量相比,它們代表

的并非同一件事情。早些時候,我們曾建議大家用 CreateIoComletionPort函數為每個處理器

都指定一個線程(處理器的數量有多少,便指定多少線程)以避免由于頻繁的線程“場景”

交換活動,進而影響系統的整體性能。CreateIoComletionPort函數的NumberOfConcurrentThreads參數明确訓示系統:在一個完成端口上,一

次隻允許 n個工作者線程運作。假如在完

成端口上建立的工作者線程數量超出 n個,那麼在同一時刻,最多隻允許 n個線程運作。但實

際上,在一段較短的時間内,系統有可能超過這個值,但很快便會把它減少至事先在

CreateIoComletionPort函數中設定的值。那麼,為何實際建立的工作者線程數量有時要比

CreateIoComletionPort函數設定的多一些呢?這樣做有必要嗎?如先前所述,這主要取決于

應用程式的總體設計情況。假定我們的某個工作者線程調用了一個函數,比如 Sleep或

WaitForSingleObject,但卻進入了暫停(鎖定或挂起)狀态,那麼允許另一個線程代替它的位

置。換言之,我們希望随時都能執行盡可能多的線程;當然,最大的線程數量是事先在

CreateIoCompletonPort調用裡設定好的。這樣一來,假如事先預計到自己的線程有可能暫時

處于停頓狀态,那麼最好能夠建立比 CreateIoCompletonPort的NumberOfConcurrentThreads參數的值多的線程,以便到時候充分發揮系統的潛

力。

一旦在完成端口上擁有足夠多的工作者線程來為I / O請求提供服務,便可着手将套接字句柄

同完成端口關聯到一起。這要求我們在一個現有的完成端口上,調用CreateIo CompletionPort函

數,同時為前三個參數—FileHandle,ExistingCompletionPort和CompletionKey—提供套

接字的資訊。其中,FileHandle參數指定一個要同完成端口關聯在一起的套接字句柄。

ExistingCompletionPort參數指定的是一個現有的完成端口。 CompletionKey(完成鍵)參

數則指定要與某個特定套接字句柄關聯在一起的“單句柄資料”;在這個參數中,應用程式

可儲存與一個套接字對應的任意類型的資訊。之是以把它叫作“單句柄資料”,是由于它隻對

應着與那個套接字句柄關聯在一起的資料。可将其作為指向一個資料結構的指針,來儲存套

接字句柄;在那個結構中,同時包含了套接字的句柄,以及與那個套接字有關的其他資訊。

就象本章稍後還會講述的那樣,為完成端口提供服務的線程例程可通過這個參數,取得與套

接字句柄有關的資訊。

根據我們到目前為止學到的東西,首先來建構一個基本的應用程式架構。程式清單 8 - 9向

大家闡述了如何使用完成端口模型,來開發一個回應(或“反射”)伺服器應用。在這個程式

中,我們基本上按下述步驟行事:

1) 建立一個完成端口。第四個參數保持為0,指定在完成端口上,每個處理器一次隻允許

執行一個工作者線程。

2) 判斷系統内到底安裝了多少個處理器。

3) 建立工作者線程,根據步驟2 )得到的處理器資訊,在完成端口上,為已完成的 I / O請求

提供服務。在這個簡單的例子中,我們為每個處理器都隻建立一個工作者線程。這是由于事

先已預計到,到時不會有任何線程進入“挂起”狀态,造成由于線程數量的不足,而使處理

器空閑的局面(沒有足夠的線程可供執行)。調用CreateThread函數時,必須同時提供一個工

作者例程,由線程在建立好執行。本節稍後還會詳細讨論線程的職責。

4) 準備好一個監聽套接字,在端口5150上監聽進入的連接配接請求。

5) 使用accept函數,接受進入的連接配接請求。

6) 建立一個資料結構,用于容納“單句柄資料”,同時在結構中存入接受的套接字句柄。

7) 調用CreateIoComletionPort,将自accept傳回的新套接字句柄同完成端口關聯到一起。

通過完成鍵(CompletionKey)參數,将單句柄資料結構傳遞給CreateIoComletionPort。

8) 開始在已接受的連接配接上進行 I / O操作。在此,我們希望通過重疊 I / O機制,在建立的套

接字上投遞一個或多個異步 WSARecv或WSASend請求。這些I / O請求完成後,一個工作者線

程會為I / O請求提供服務,同時繼續處理未來的 I / O請求,稍後便會在步驟3 )指定的工作者例程

中,體驗到這一點。

9) 重複步驟5 ) ~ 8 ),直至伺服器中止。

程式清單8-9   完成端口的建立

 代碼說明:

1 OVERLAPPED 是如何到達工作線程的?或者說工作線程中的GetQueuedCompletionStatus函數的第四個參數為什麼能到得到用戶端對應的

OVERLAPPED?

答:CreateIoCompletionPort函數将完成端口對象與用戶端套接字綁定,WSARecv函數在接收完成後,将OVERLAPPED投遞到了與用戶端套接字綁定的完成端口對象,是以在該完成端口對象中,GetQueuedCompletionStatus可以獲得OVERLAPPED。

2 GetQueuedCompletionStatus獲得了OVERLAPPED,為什麼可以當作PER_IO_OPERATION_DATA結構來使用?

答:這用到了一點程式設計技巧--“尾随資料”,PER_IO_OPERATION_DATA結構體的設計将OVERLAPPED成員放在了第一個,是以PER_IO_OPERATION_DATA結構體的位址正是OVERLAPPED成員的首位址,在問題1中,OVERLAPPED是通過首位址來傳遞的,是以GetQueuedCompletionStatus獲得OVERLAPPED首位址後,可以當作PER_IO_OPERATION_DATA結構來使用。

3注意一點:WSARecv函數和WSASend函數的 dwFlags參數一般設定為0,傳參前記得将參數指派為0.

  1. #include <winsock2.h>
  2. #include <windows.h>
  3. #include <stdio.h>
  4. #define PORT 5150
  5. #define DATA_BUFSIZE 8192
  6. typedef struct
  7. {
  8.    OVERLAPPED Overlapped;
  9.    WSABUF DataBuf;
  10.    CHAR Buffer[DATA_BUFSIZE];
  11.    DWORD BytesSEND;
  12.    DWORD BytesRECV;
  13. } PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;
  14. typedef struct 
  15. {
  16.    SOCKET Socket;
  17. } PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
  18. DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID);
  19. void main(void)
  20. {
  21.    SOCKADDR_IN InternetAddr;
  22.    SOCKET Listen;
  23.    SOCKET Accept;
  24.    HANDLE CompletionPort;
  25.    SYSTEM_INFO SystemInfo;
  26.    LPPER_HANDLE_DATA PerHandleData;
  27.    LPPER_IO_OPERATION_DATA PerIoData;
  28.    int i;
  29.    DWORD RecvBytes;
  30.    DWORD Flags;
  31.    DWORD ThreadID;
  32.    WSADATA wsaData;
  33.    DWORD Ret;
  34.    if ((Ret = WSAStartup(0x0202, &wsaData)) != 0)
  35.    {
  36.       printf("WSAStartup failed with error %d/n", Ret);
  37.       return;
  38.    }
  39.    // Setup an I/O completion port.
  40.    if ((CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL)
  41.    {
  42.       printf( "CreateIoCompletionPort failed with error: %d/n", GetLastError());
  43.       return;
  44.    }
  45.    // Determine how many processors are on the system.
  46.    GetSystemInfo(&SystemInfo);
  47.    // Create worker threads based on the number of processors available on the
  48.    // system. Create two worker threads for each processor.
  49.    for(i = 0; i < SystemInfo.dwNumberOfProcessors * 2; i++)
  50.    {
  51.       HANDLE ThreadHandle;
  52.       // Create a server worker thread and pass the completion port to the thread.
  53.       if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort,
  54.          0, &ThreadID)) == NULL)
  55.       {
  56.          printf("CreateThread() failed with error %d/n", GetLastError());
  57.          return;
  58.       }
  59.       // Close the thread handle
  60.       CloseHandle(ThreadHandle);
  61.    }
  62.    // Create a listening socket
  63.    if ((Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,
  64.       WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
  65.    {
  66.       printf("WSASocket() failed with error %d/n", WSAGetLastError());
  67.       return;
  68.    } 
  69.    InternetAddr.sin_family = AF_INET;
  70.    InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
  71.    InternetAddr.sin_port = htons(PORT);
  72.    if (bind(Listen, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr)) == SOCKET_ERROR)
  73.    {
  74.       printf("bind() failed with error %d/n", WSAGetLastError());
  75.       return;
  76.    }
  77.    // Prepare socket for listening
  78.    if (listen(Listen, 5) == SOCKET_ERROR)
  79.    {
  80.       printf("listen() failed with error %d/n", WSAGetLastError());
  81.       return;
  82.    }
  83.    // Accept connections and assign to the completion port.
  84.    while(TRUE)
  85.    {
  86.       if ((Accept = WSAAccept(Listen, NULL, NULL, NULL, 0)) == SOCKET_ERROR)
  87.       {
  88.          printf("WSAAccept() failed with error %d/n", WSAGetLastError());
  89.          return;
  90.       }
  91.       // Create a socket information structure to associate with the socket
  92.       if ((PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR, 
  93.          sizeof(PER_HANDLE_DATA))) == NULL)
  94.       {
  95.          printf("GlobalAlloc() failed with error %d/n", GetLastError());
  96.          return;
  97.       }
  98.       // Associate the accepted socket with the original completion port.
  99.       printf("Socket number %d connected/n", Accept);
  100.       PerHandleData->Socket = Accept;
  101.       if (CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData,
  102.          0) == NULL)
  103.       {
  104.          printf("CreateIoCompletionPort failed with error %d/n", GetLastError());
  105.          return;
  106.       }
  107.       // Create per I/O socket information structure to associate with the 
  108.       // WSARecv call below.
  109.       if ((PerIoData = (LPPER_IO_OPERATION_DATA) GlobalAlloc(GPTR,          sizeof(PER_IO_OPERATION_DATA))) == NULL)
  110.       {
  111.          printf("GlobalAlloc() failed with error %d/n", GetLastError());
  112.          return;
  113.       }
  114.       ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
  115.       PerIoData->BytesSEND = 0;
  116.       PerIoData->BytesRECV = 0;
  117.       PerIoData->DataBuf.len = DATA_BUFSIZE;
  118.       PerIoData->DataBuf.buf = PerIoData->Buffer;
  119.       Flags = 0;
  120.       if (WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,
  121.          &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
  122.       {
  123.          if (WSAGetLastError() != ERROR_IO_PENDING)
  124.          {
  125.             printf("WSARecv() failed with error %d/n", WSAGetLastError());
  126.             return;
  127.          }
  128.       }
  129.    }
  130. }
  131. DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
  132. {
  133.    HANDLE CompletionPort = (HANDLE) CompletionPortID;
  134.    DWORD BytesTransferred;
  135.    LPOVERLAPPED Overlapped;
  136.    LPPER_HANDLE_DATA PerHandleData;
  137.    LPPER_IO_OPERATION_DATA PerIoData;
  138.    DWORD SendBytes, RecvBytes;
  139.    DWORD Flags;
  140.    while(TRUE)
  141.    {
  142.       if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,
  143.          (LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)
  144.       {
  145.          printf("GetQueuedCompletionStatus failed with error %d/n", GetLastError());
  146.          return 0;
  147.       }
  148.       // First check to see if an error has occured on the socket and if so
  149.       // then close the socket and cleanup the SOCKET_INFORMATION structure
  150.       // associated with the socket.
  151.       if (BytesTransferred == 0)
  152.       {
  153.          printf("Closing socket %d/n", PerHandleData->Socket);
  154.          if (closesocket(PerHandleData->Socket) == SOCKET_ERROR)
  155.          {
  156.             printf("closesocket() failed with error %d/n", WSAGetLastError());
  157.             return 0;
  158.          }
  159.          GlobalFree(PerHandleData);
  160.          GlobalFree(PerIoData);
  161.          continue;
  162.       }
  163.       // Check to see if the BytesRECV field equals zero. If this is so, then
  164.       // this means a WSARecv call just completed so update the BytesRECV field
  165.       // with the BytesTransferred value from the completed WSARecv() call.
  166.       if (PerIoData->BytesRECV == 0)
  167.       {
  168.          PerIoData->BytesRECV = BytesTransferred;
  169.          PerIoData->BytesSEND = 0;
  170.       }
  171.       else
  172.       {
  173.          PerIoData->BytesSEND += BytesTransferred;
  174.       }
  175.       if (PerIoData->BytesRECV > PerIoData->BytesSEND)
  176.       {
  177.          // Post another WSASend() request.
  178.          // Since WSASend() is not gauranteed to send all of the bytes requested,
  179.          // continue posting WSASend() calls until all received bytes are sent.
  180.          ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
  181.          PerIoData->DataBuf.buf = PerIoData->Buffer + PerIoData->BytesSEND;
  182.          PerIoData->DataBuf.len = PerIoData->BytesRECV - PerIoData->BytesSEND;
  183.          if (WSASend(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &SendBytes, 0,
  184.             &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
  185.          {
  186.             if (WSAGetLastError() != ERROR_IO_PENDING)
  187.             {
  188.                printf("WSASend() failed with error %d/n", WSAGetLastError());
  189.                return 0;
  190.             }
  191.          }
  192.       }
  193.       else
  194.       {
  195.          PerIoData->BytesRECV = 0;
  196.          // Now that there are no more bytes to send post another WSARecv() request.
  197.          Flags = 0;
  198.          ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
  199.          PerIoData->DataBuf.len = DATA_BUFSIZE;
  200.          PerIoData->DataBuf.buf = PerIoData->Buffer;
  201.          if (WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,
  202.             &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)
  203.          {
  204.             if (WSAGetLastError() != ERROR_IO_PENDING)
  205.             {
  206.                printf("WSARecv() failed with error %d/n", WSAGetLastError());
  207.                return 0;
  208.             }
  209.          }
  210.       }
  211.    }
  212. }

繼續閱讀