對于開發一款高性能伺服器程式,廣大伺服器開發人員在一直為之奮鬥和努力.其中一個影響伺服器的重要瓶頸就是伺服器的網絡處理子產品.如果一款伺服器程式不能及時的處理使用者的資料.則伺服器的上層業務邏輯再高效也是徒勞.是以一個伺服器程式的網絡處理能力直接影響到整個伺服器的性能, 本文主要介紹在windows平台下開發高性能的網絡處理子產品以及自己在設計開發伺服器網絡子產品遇到的一些問題和開發心得.本篇主要介紹TCP伺服器的設計, 下一篇将主要介紹UDP伺服器的設計.
衆所周知, 對于伺服器來說windows下網絡I/O處理的最佳方式就是完成端口, 是以本伺服器的開發主要基于完成端口的模式.完成端口(completion port)是應用程式使用線程池處理異步I/O請求的一種機制.将建立好的socket和完成端口綁定後就可以向該socket上投遞相應的I/O操作, 當操作完成後I/O系統會向完成端口發送一個通知包;應用程式通過GetQueuedCompletionStatus()函數擷取這些通知包并進行相應的處理.下面切入正題談談TCP伺服器的開發.
本人在開發TCP伺服器的經過了兩個階段, 第一次設計出來的TCP伺服器網絡層隻能支援5000 – 8000個線上使用者同時和伺服器互動, 但是會出現一些莫名其妙的系統異常.是以網絡層不是很穩定.這次開發主要用到一個系統的I/O線程池函數BindIoCompletionCallback() 該函數在win2000以後都支援, BindIoCompletion-
Callback()是一個I/O線程池函數,其主要功能是采用系統線程池進行I/O處理,優點是使用者無需自己建立完成端口和線程池,完成端口和工作者線程池的建立和管理都由系統維護.給使用者帶了很大的友善.使用者隻需要将自己建立的socket和I/O回調函數傳遞給BindIoCompletionCallback()函數即可, 然後就可以在該socket上投遞相應的操作.當操作完成後系統會調用使用者的回調函數通知使用者.這種方式給開發者帶來了很大的友善, 開發者甚至不需要去了解完成端口的工作機制就可以開發出一個較高性能的網絡程式.但同時也帶來了很多麻煩,使用者無法知道在完成端口上到底有多少個工作者線程, 而且當連接配接到伺服器上的使用者量過大時會出現線程堆棧錯誤等異常,同時有1000-2000個使用者斷開連接配接後, 伺服器就無法讓後續使用者連接配接到伺服器. 在這種方式下的伺服器網絡層最多隻支援4000 – 5000使用者同時連接配接到伺服器.使用者量再大時就會出現一些系統異常而且無法解決.
借鑒于第一次開發的經驗和教訓, 在第二次開發伺服器TCP層時決定自己建立完成端口和工作者線程池, 并對其進行維護和管理.這樣做的好處是出了問題好定位和處理.下面将我開發的代碼拿出來和大家切磋切磋, 如果什麼地方寫得問題還希望能夠指正出來, 歡迎郵件聯系我: [email protected], QQ: 24633959, MSN: [email protected]
1. 首先介紹網絡上下文(NET_CONTEXT)的定義:
class NET_CONTEXT
{
public:
WSAOVERLAPPED m_ol;
SOCKET m_hSock;
CHAR* m_pBuf; //接收或發送資料的緩沖區
INT m_nOperation; //在該網絡上下文上進行的操作 OP_ACCEPT…
static DWORD S_PAGE_SIZE; //緩沖區的最大容量
NET_CONTEXT();
virtual ~NET_CONTEXT();
static void InitReource();
static void ReleaseReource();
private:
void* operator new (size_t nSize);
void operator delete(void* p);
static HANDLE s_hDataHeap;
static vector<char * > s_IDLQue; //無效資料緩沖區的隊列
static CRITICAL_SECTION s_IDLQueLock; //通路s_IDLQue的互斥鎖
};
NET_CONTEXT 是所有網絡上下文的基類, 對于TCP的recv, send, accep, connect的上下文都繼承自該類.UDP的send和recv的網絡上下文也繼承自該類. m_ol 必須放置在第一個位置否則當從完成封包取net_context不能得到正确的結果. S_PAGE_SIZE 為資料緩沖區m_pBuf的大小,其大小和相應的作業系統平台有關, win32下其值為4096, win64下其值為8192, 即為作業系統的一個記憶體頁的大小.設定為一個記憶體頁的原因是因為在投遞重疊操作時系統會鎖定投遞的緩沖區, 在鎖定時是按照記憶體頁的邊界來鎖定的.是以即使你隻發送一個1K位元組資料系統也會鎖定整個記憶體頁(4096/8192). s_hDataHeap 為自定義的BUF申請的堆.其優點是使用者可以自己對堆進行管理和操作. s_IDLQue 為用過的BUF隊列, 當使用者用完相應的NET_CONTEXT後在執行析構操作時并不會真正把m_pBuf所占的記憶體釋放掉而是将其放入到s_IDLQue隊列中, 當下次申請新的NET_CONTEXT時隻需從s_IDLQue中取出就可以使用, 避免頻繁的new和delete操作.
2. 資料標頭的定義:
struct PACKET_HEAD
LONG nTotalLen; //資料包的總長度
ULONG nSerialNum; //資料包的序列号
WORD nCurrentLen; //目前資料包的長度
WORD nType; //資料包的類型
資料標頭位于每一個接收到的或待發送的資料包的首部,用于确定接收到的資料包是否合法以及該資料包是做什麼用的.使用者可以定義自己標頭.
3. TCP_CONTEXT主要用于定義接收和發送資料的緩沖區, 繼承自NET_CONTEXT
class TCP_CONTEXT : public NET_CONTEXT
friend class TcpServer;
protected:
DWORD m_nDataLen; //TCP模式下累計發送和接收資料的長度
TCP_CONTEXT()
: m_nDataLen(0)
{
}
virtual ~TCP_CONTEXT() {}
void* operator new(size_t nSize);
void operator delete(void* p);
enum
{
E_TCP_HEAP_SIZE = 1024 * 1024* 10,
MAX_IDL_DATA = 20000,
};
static vector<TCP_CONTEXT* > s_IDLQue; //無效的資料隊列
static HANDLE s_hHeap; //TCP_CONTEXT的資料申請堆
TCP_CONTEXT類主要用在網絡上發送和接收資料的上下文.每個連接配接到伺服器的SOCKET都會有一個發送和接收資料的TCP_CONTEXT.這裡重載了new和delete函數.這樣做的優點在于當申請一個新的TCP_CONTEXT對象時會先判斷無效的資料隊列中是否有未使用的TCP_CONTEXT,若有則直接取出來使用否則從s_hHeap堆上新申請一個.new 函數的定義如下
void* TCP_CONTEXT::operator new(size_t nSize)
void* pContext = NULL;
try
if (NULL == s_hHeap)
{
throw ((long)(__LINE__));
}
//為新的TCP_CONTEXT申請記憶體, 先從無效隊列中找, 如無效隊列為空則從堆上申請
EnterCriticalSection(&s_IDLQueLock);
vector<TCP_CONTEXT* >::iterator iter = s_IDLQue.begin();
if (iter != s_IDLQue.end())
pContext = *iter;
s_IDLQue.erase(iter);
}
else
pContext = HeapAlloc(s_hHeap, HEAP_ZERO_MEMORY | HEAP_NO_SERIALIZE, nSize);
LeaveCriticalSection(&s_IDLQueLock);
if (NULL == pContext)
catch (const long& iErrCode)
pContext = NULL;
_TRACE("\r\nExcept : %s--%ld", __FILE__, iErrCode);
}
return pContext;
}
當使用完TCP_CONTEXT時調用delete函數進行對記憶體回收, 在進行記憶體回收時先檢視無效隊列已存放的資料是否達到MAX_IDL_DATA, 若沒有超過MAX_IDL_DATA則将其放入到s_IDLQue中否則将其釋放掉.delete函數的實作如下:
void TCP_CONTEXT::operator delete(void* p)
if (p)
//若空閑隊列的長度小于MAX_IDL_DATA, 則将其放入無效隊列中否則釋
//放之
const DWORD QUE_SIZE = (DWORD)(s_IDLQue.size());
TCP_CONTEXT* const pContext = (TCP_CONTEXT*)p;
if (QUE_SIZE <= MAX_IDL_DATA)
s_IDLQue.push_back(pContext);
HeapFree(s_hHeap, HEAP_NO_SERIALIZE, p);
return;
4. ACCEPT_CONTEXT 主要用于投遞AcceptEx操作, 繼承自NET_CONTEXT類
class ACCEPT_CONTEXT : public NET_CONTEXT
SOCKET m_hRemoteSock; //連接配接本伺服器的用戶端SOCKET
ACCEPT_CONTEXT()
: m_hRemoteSock(INVALID_SOCKET)
virtual ~ACCEPT_CONTEXT() {}
static vector<ACCEPT_CONTEXT* > s_IDLQue; //無效的資料隊列
static CRITICAL_SECTION s_IDLQueLock; //通路s_IDLQueµ互斥鎖
static HANDLE s_hHeap; //ACCEPT_CONTEXT的自定義堆
5. TCP_RCV_DATA, 當伺服器的某個socket從網絡上收到資料後并且資料合法便為收到的資料申請一個新的TCP_RCV_DATA執行個體存儲收到的資料.其定義如下:
class DLLENTRY TCP_RCV_DATA
SOCKET m_hSocket; //與該資料相關的socket
CHAR* m_pData; //資料緩沖區位址
INT m_nLen; //收到的資料的長度
TCP_RCV_DATA(SOCKET hSock, const CHAR* pBuf, INT nLen);
~TCP_RCV_DATA();
HEAP_SIZE = 1024 *1024* 50,
DATA_HEAP_SIZE = 1024 *1024 * 100,
MAX_IDL_DATA = 100000,
static vector<TCP_RCV_DATA* > s_IDLQue; //無效資料隊列
static HANDLE s_hHeap;
static HANDLE s_DataHeap;
6. 前面講的相關的資料結構都是為下面要探讨的TcpServer類服務的. TcpServer類是本文要探讨的核心資料結構;主要用于啟動服務, 管理連接配接等操作.
class DLLENTRY TcpServer
TcpServer();
~TcpServer();
/************************************************************************
* Desc : 初始化相關靜态資源,在申請TCP執行個體之前必須先調用該方法對相關資
* 源進行初始化
************************************************************************/
* Desc : 釋放相應的靜态資源
/****************************************************
* Name : StartServer()
* Desc : 啟動TCP服務
****************************************************/
BOOL StartServer(
const char *szIp //要啟動服務的本地位址, 若為NULL則采用預設位址
, INT nPort //要啟動服務的端口
, LPCLOSE_ROUTINE pCloseFun //用戶端socket關閉的通知函數
, LPVOID pParam //close函數的參數
);
* Name : CloseServer()
* Desc : 關閉TCP服務
void CloseServer();
* Name : SendData()
* Desc : 對用戶端hSock發送長度為nDataLen的資料
BOOL SendData(SOCKET hSock, const CHAR* szData, INT nDataLen);
* Name : GetRcvData()
* Desc : 從接收資料隊列中擷取一個接收資料包
* pQueLen 不為NULL時傳回其長度
TCP_RCV_DATA* GetRcvData(
DWORD* const pQueLen
LISTEN_EVENTS = 2, //監聽socket的事件個數
MAX_ACCEPT = 50, //每次最多投遞的accept操作的個數
_SOCK_NO_RECV = 0xf0000000, //用戶端socket已連接配接上但為發送資料
_SOCK_RECV = 0xf0000001 //用戶端socket已連接配接上并也收到資料
vector<TCP_RCV_DATA* > m_RcvDataQue; //接收到的資料緩沖區隊列
CRITICAL_SECTION m_RcvQueLock; //通路m_RcvDataQue的互斥鎖
vector<SOCKET> m_SocketQue; //連接配接本伺服器的用戶端socket隊列
CRITICAL_SECTION m_SockQueLock; //通路m_SocketQue的互斥鎖
LPCLOSE_ROUTINE m_pCloseFun; //用戶端socket關閉的通知函數
LPVOID m_pCloseParam; //傳遞給m_pCloseFun的使用者參數
SOCKET m_hSock; //要進行伺服器監聽的socket
long volatile m_bThreadRun; //是否允許背景線程繼續運作
long volatile m_nAcceptCount; //目前已投遞的accept操作的個數
BOOL m_bSerRun; //服務是否正在運作
//accept的事件
HANDLE m_ListenEvents[LISTEN_EVENTS];
HANDLE *m_pThreads; //建立的背景線程的句柄
HANDLE m_hCompletion; //完成端口句柄
static LPFN_ACCEPTEX s_pfAcceptEx; //AcceptEx位址
// GetAcceptExSockaddrs的位址
static LPFN_GETACCEPTEXSOCKADDRS s_pfGetAddrs;
* Name : AcceptCompletionProc()
* Desc : acceptEx操作完成後回調函數
void AcceptCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);
* Name : RecvCompletionProc()
* Desc : 接收操作完成後的回調函數
void RecvCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);
* Name : SendCompletionProc()
* Desc : 發送操作完成後的回調函數
void SendCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);
* Name : ListenThread()
* Desc : 監聽線程
static UINT WINAPI ListenThread(LPVOID lpParam);
* Name : WorkThread()
* Desc : 在完成端口上工作的背景線程
static UINT WINAPI WorkThread(LPVOID lpParam);
* Name : AideThread()
* Desc : 背景輔助線程
static UINT WINAPI AideThread(LPVOID lpParam);
下面将對相關實作細節作詳細介紹.
也許您已經注意到本類隻提供了用戶端socket關閉的接口, 而沒有提供用戶端連接配接到伺服器的相關接口;這樣做的主要原因是因為當一個用戶端連接配接成功需要在完成端口的I/O線程中進行通知, 若使用者在該接口中進行複雜的運算操作将會使I/O工作線程阻塞.是以此處沒有提供連接配接成功的通知接口, 其實使用者可以根據用戶端發來的特定資料包(例如登陸資料包)确定使用者是否連接配接到本伺服器.
當有用戶端連接配接伺服器投遞的accept操作就會完成, m_ListenEvents[1] 事件對象就會授信這時ListenThread線程将被喚醒并投遞一個accept操作. 若有大量的用戶端連接配接到本伺服器而沒有足夠的accept接受連接配接此時m_ListenEvents[0]事件就會受信此時ListenThread線程會再次投遞MAX_ACCEPT個accept操作已接受更多的連接配接.
ListenThread線程主要用來投遞aeecptex操作, 當m_ListenEvents[0]或者m_ListenEvents[1]受信時就會投遞一定量的AcceptEx操作以接受更多的用戶端連接配接.
WorkThread 線程工作在完成端口上, 當相關的操作完成時該線程組負責從完成端口隊列上取得相應的完成封包進行處理. AideThread線程主要用于維護連接配接本伺服器的socket隊列, 如果用戶端連接配接到伺服器但長時間沒有進行發送資料便斷開該用戶端, 防止用戶端惡意連接配接.當有用戶端斷開連接配接時也在該線程中調用關閉接口通知使用者.
相關函數介紹如下:
l TcpServer(), 該函數主要對相關成員變量進行初始化, 建立完成端口和相關線程.
TcpServer::TcpServer()
: m_pCloseFun(NULL)
, m_hSock(INVALID_SOCKET)
, m_pCloseParam(NULL)
, m_bThreadRun(TRUE)
, m_bSerRun(FALSE)
, m_nAcceptCount(0)
m_RcvDataQue.reserve(10000 * sizeof(void *));
m_SocketQue.reserve(50000 * sizeof(SOCKET));
InitializeCriticalSection(&m_RcvQueLock);
InitializeCriticalSection(&m_SockQueLock);
//建立監聽事件
for (int nIndex = 0; nIndex < LISTEN_EVENTS; nIndex ++)
m_ListenEvents[nIndex] = CreateEvent(NULL, FALSE, FALSE, NULL);
//建立完成端口
m_hCompletion = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
//建立輔助線程, 監聽線程, 工作者線程. 工作者線程的數目為CPU數目*2+2
SYSTEM_INFO sys_info;
GetSystemInfo(&sys_info);
const DWORD MAX_THREAD = sys_info.dwNumberOfProcessors * 2 +2 + 2;
m_pThreads = new HANDLE[MAX_THREAD];
assert(m_pThreads);
m_pThreads[0] = (HANDLE)_beginthreadex(NULL, 0, ListenThread, this, 0, NULL);
m_pThreads[1] = (HANDLE)_beginthreadex(NULL, 0, AideThread, this, 0, NULL);
for (DWORD nIndex = 2; nIndex < MAX_THREAD; nIndex++)
m_pThreads[nIndex] = (HANDLE)_beginthreadex(NULL, 0, WorkThread, this, 0, NULL);
l StartServer(), 該函數主要啟動服務并投遞MAX_ACCEPT個操作接受用戶端連接配接.
BOOL TcpServer::StartServer( const char *szIp , INT nPort , LPCLOSE_ROUTINE pCloseFun , LPVOID pParam )
BOOL bSucc = TRUE;
int nRet = 0;
DWORD dwBytes = 0;
ULONG ul = 1;
int nOpt = 1;
//若服務已運作則不允許啟動新的服務
if (m_bSerRun || m_nAcceptCount)
THROW_LINE;
m_pCloseFun = pCloseFun;
m_pCloseParam = pParam;
m_bSerRun = TRUE;
//建立監聽socket
m_hSock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == m_hSock)
//加載AcceptEx函數
GUID guidProc = WSAID_ACCEPTEX;
if (NULL == s_pfAcceptEx)
nRet = WSAIoctl(m_hSock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidProc, sizeof(guidProc)
, &s_pfAcceptEx, sizeof(s_pfAcceptEx), &dwBytes, NULL, NULL);
if (NULL == s_pfAcceptEx || SOCKET_ERROR == nRet)
//加載GetAcceptExSockaddrs函數
GUID guidGetAddr = WSAID_GETACCEPTEXSOCKADDRS;
dwBytes = 0;
if (NULL == s_pfGetAddrs)
nRet = WSAIoctl(m_hSock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidGetAddr, sizeof(guidGetAddr)
, &s_pfGetAddrs, sizeof(s_pfGetAddrs), &dwBytes, NULL, NULL);
ioctlsocket(m_hSock, FIONBIO, &ul);
//設定位址重用, 當服務關閉後可以立即在該端口上啟動服務
setsockopt(m_hSock, SOL_SOCKET, SO_REUSEADDR, (char*)&nOpt, sizeof(nOpt));
sockaddr_in LocalAddr;
LocalAddr.sin_family = AF_INET;
LocalAddr.sin_port = htons(nPort);
if (szIp)
LocalAddr.sin_addr.s_addr = inet_addr(szIp);
}
LocalAddr.sin_addr.s_addr = htonl(INADDR_ANY);
nRet = bind(m_hSock, (sockaddr*)&LocalAddr, sizeof(LocalAddr));
if (SOCKET_ERROR == nRet)
nRet = listen(m_hSock, 200);
if (SOCKET_ERROR == nRet)
//将監聽socket綁定完成端口上
CreateIoCompletionPort((HANDLE)m_hSock, m_hCompletion, 0, 0);
WSAEventSelect(m_hSock, m_ListenEvents[0], FD_ACCEPT);
//投遞MAX_ACCEPT個AcceptEx操作
for (int nIndex = 0; nIndex < MAX_ACCEPT; )
SOCKET hClient = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == hClient)
{
continue;
}
ul = 1;
ioctlsocket(hClient, FIONBIO, &ul);
ACCEPT_CONTEXT* pAccContext = new ACCEPT_CONTEXT();
if (NULL == pAccContext)
THROW_LINE;
pAccContext->m_hSock = m_hSock;
pAccContext->m_hRemoteSock = hClient;
pAccContext->m_nOperation = OP_ACCEPT;
nRet = s_pfAcceptEx(m_hSock, hClient, pAccContext->m_pBuf, 0
, sizeof(sockaddr_in) +16, sizeof(sockaddr_in) +16, &dwBytes, &(pAccContext->m_ol));
if (FALSE == nRet && ERROR_IO_PENDING != WSAGetLastError())
closesocket(hClient);
delete pAccContext;
pAccContext = NULL;
else
InterlockedExchangeAdd(&m_nAcceptCount, 1);
nIndex++;
catch (const long &lErrLine)
bSucc = FALSE;
m_bSerRun = FALSE;
_TRACE("Exp : %s -- %ld", __FILE__, lErrLine);
return bSucc;
l ListenThread() 該函數用于投遞AcceptEx操作以接受用戶端的連接配接.
UINT WINAPI TcpServer::ListenThread(LPVOID lpParam)
TcpServer *pThis = (TcpServer *)lpParam;
int nRet = 0;
DWORD nEvents = 0;
DWORD dwBytes = 0;
int nAccept = 0;
while (TRUE)
{
nEvents = WSAWaitForMultipleEvents(LISTEN_EVENTS, pThis->m_ListenEvents, FALSE, WSA_INFINITE, FALSE);
//等待失敗線程退出
if (WSA_WAIT_FAILED == nEvents)
nEvents = nEvents - WAIT_OBJECT_0;
if (0 == nEvents)
{
nAccept = MAX_ACCEPT;
}
else if (1 == nEvents)
nAccept = 1;
//最多隻能投遞200個AcceptEx操作
if (InterlockedExchangeAdd(&(pThis->m_nAcceptCount), 0) > 200)
nAccept = 0;
for (int nIndex = 0; nIndex < nAccept; )
SOCKET hClient = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == hClient)
{
continue;
}
ULONG ul = 1;
ioctlsocket(hClient, FIONBIO, &ul);
ACCEPT_CONTEXT* pAccContext = new ACCEPT_CONTEXT();
if (pAccContext && pAccContext->m_pBuf)
pAccContext->m_hSock = pThis->m_hSock;
pAccContext->m_hRemoteSock = hClient;
pAccContext->m_nOperation = OP_ACCEPT;
nRet = s_pfAcceptEx(pThis->m_hSock, hClient, pAccContext->m_pBuf, 0
, sizeof(sockaddr_in) +16, sizeof(sockaddr_in) +16, &dwBytes, &(pAccContext->m_ol));
if (FALSE == nRet && ERROR_IO_PENDING != WSAGetLastError())
{
closesocket(hClient);
delete pAccContext;
pAccContext = NULL;
}
else
InterlockedExchangeAdd(&(pThis->m_nAcceptCount), 1);
else
delete pAccContext;
nIndex++;
if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0))
catch ( const long &lErrLine)
return 0;
l CloseServer(), 關閉服務
void TcpServer::CloseServer()
//關閉所有的socket
closesocket(m_hSock);
EnterCriticalSection(&m_SockQueLock);
for (vector<SOCKET>::iterator iter_sock = m_SocketQue.begin(); m_SocketQue.end() != iter_sock; iter_sock++)
closesocket(*iter_sock);
LeaveCriticalSection(&m_SockQueLock);
m_bSerRun = FALSE;
l SendData() 發送資料
BOOL TcpServer::SendData(SOCKET hSock, const CHAR* szData, INT nDataLen)
#ifdef _XML_NET_
//資料長度非法
if (((DWORD)nDataLen > TCP_CONTEXT::S_PAGE_SIZE) || (NULL == szData))
return FALSE;
#else
if ((nDataLen > (int)(TCP_CONTEXT::S_PAGE_SIZE)) || (NULL == szData) || (nDataLen < sizeof(PACKET_HEAD)))
#endif //#ifdef _XML_NET_
BOOL bResult = TRUE;
WSABUF SendBuf;
TCP_CONTEXT *pSendContext = new TCP_CONTEXT();
if (pSendContext && pSendContext->m_pBuf)
pSendContext->m_hSock = hSock;
pSendContext->m_nDataLen = 0;
pSendContext->m_nOperation = OP_WRITE;
memcpy(pSendContext->m_pBuf, szData, nDataLen);
SendBuf.buf = pSendContext->m_pBuf;
SendBuf.len = nDataLen;
assert(szData);
INT iErr = WSASend(pSendContext->m_hSock, &SendBuf, 1, &dwBytes, 0, &(pSendContext->m_ol), NULL);
if (SOCKET_ERROR == iErr && ERROR_IO_PENDING != WSAGetLastError())
delete pSendContext;
pSendContext = NULL;
_TRACE("\r\n%s : %ld LAST_ERROR = %ld", __FILE__, __LINE__, WSAGetLastError());
bResult = FALSE;
else
delete pSendContext;
bResult = FALSE;
return bResult;
l GetRcvData(), 從接收到的資料隊列中取出資料.
TCP_RCV_DATA * TcpServer::GetRcvData( DWORD* const pQueLen )
TCP_RCV_DATA* pRcvData = NULL;
EnterCriticalSection(&m_RcvQueLock);
vector<TCP_RCV_DATA*>::iterator iter = m_RcvDataQue.begin();
if (m_RcvDataQue.end() != iter)
pRcvData = *iter;
m_RcvDataQue.erase(iter);
if (NULL != pQueLen)
*pQueLen = (DWORD)(m_RcvDataQue.size());
LeaveCriticalSection(&m_RcvQueLock);
return pRcvData;
l WorkThread(), 工作者線程
UINT WINAPI TcpServer::WorkThread(LPVOID lpParam)
DWORD dwTrans = 0, dwKey = 0, dwSockSize = 0;
LPOVERLAPPED pOl = NULL;
NET_CONTEXT *pContext = NULL;
BOOL bRun = TRUE;
while (TRUE)
BOOL bOk = GetQueuedCompletionStatus(pThis->m_hCompletion, &dwTrans, &dwKey, (LPOVERLAPPED *)&pOl, WSA_INFINITE);
pContext = CONTAINING_RECORD(pOl, NET_CONTEXT, m_ol);
if (pContext)
switch (pContext->m_nOperation)
case OP_ACCEPT:
pThis->AcceptCompletionProc(bOk, dwTrans, pOl);
break;
case OP_READ:
pThis->RecvCompletionProc(bOk, dwTrans, pOl);
case OP_WRITE:
pThis->SendCompletionProc(bOk, dwTrans, pOl);
EnterCriticalSection(&(pThis->m_SockQueLock));
dwSockSize = (DWORD)(pThis->m_SocketQue.size());
if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0) && 0 == dwSockSize
&& 0 == InterlockedExchangeAdd(&(pThis->m_nAcceptCount), 0))
bRun = FALSE;
LeaveCriticalSection(&(pThis->m_SockQueLock));
if (FALSE == bRun)
break;
l AcceptCompletionProc(), 當用戶端連接配接到伺服器時調用該函數.
void TcpServer::AcceptCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
ACCEPT_CONTEXT *pContext = CONTAINING_RECORD(lpOverlapped, ACCEPT_CONTEXT, m_ol);
INT nZero = 0;
int nPro = _SOCK_NO_RECV;
IP_ADDR* pClientAddr = NULL;
IP_ADDR* pLocalAddr = NULL;
INT nClientLen = 0;
INT nLocalLen = 0;
int iErrCode;
DWORD nFlag = 0;
DWORD nBytes = 0;
WSABUF RcvBuf;
if (bSuccess)
setsockopt(pContext->m_hRemoteSock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero));
setsockopt(pContext->m_hRemoteSock, SOL_SOCKET, SO_RCVBUF, (CHAR*)&nZero, sizeof(nZero));
setsockopt(pContext->m_hRemoteSock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&(pContext->m_hSock), sizeof(pContext->m_hSock));
setsockopt(pContext->m_hRemoteSock, SOL_SOCKET, SO_GROUP_PRIORITY, (char *)&nPro, sizeof(nPro));
s_pfGetAddrs(pContext->m_pBuf, 0, sizeof(sockaddr_in) +16, sizeof(sockaddr_in) +16
, (LPSOCKADDR*)&pLocalAddr, &nLocalLen, (LPSOCKADDR*)&pClientAddr, &nClientLen);
//為新來的連接配接投遞讀操作
TCP_CONTEXT *pRcvContext = new TCP_CONTEXT;
if (pRcvContext && pRcvContext->m_pBuf)
pRcvContext->m_hSock = pContext->m_hRemoteSock;
pRcvContext->m_nOperation = OP_READ;
CreateIoCompletionPort((HANDLE)(pRcvContext->m_hSock), m_hCompletion, NULL, 0);
RcvBuf.buf = pRcvContext->m_pBuf;
RcvBuf.len = TCP_CONTEXT::S_PAGE_SIZE;
iErrCode = WSARecv(pRcvContext->m_hSock, &RcvBuf, 1, &nBytes, &nFlag, &(pRcvContext->m_ol), NULL);
//投遞失敗
if (SOCKET_ERROR == iErrCode && WSA_IO_PENDING != WSAGetLastError())
closesocket(pRcvContext->m_hSock);
delete pRcvContext;
pRcvContext = NULL;
_TRACE("\r\n%s : %ld SOCKET = 0x%x LAST_ERROR = %ld", __FILE__, __LINE__, pContext->m_hRemoteSock, WSAGetLastError());
EnterCriticalSection(&m_SockQueLock);
m_SocketQue.push_back(pRcvContext->m_hSock);
LeaveCriticalSection(&m_SockQueLock);
delete pRcvContext;
SetEvent(m_ListenEvents[1]);
closesocket(pContext->m_hRemoteSock);
_TRACE("\r\n %s -- %ld accept 操作失敗_FILE__, __LINE__);
InterlockedExchangeAdd(&m_nAcceptCount, -1);
delete pContext;
pContext = NULL;
l RecvCompletionProc(),讀操作完成後的回調函數
void TcpServer::RecvCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
TCP_CONTEXT* pRcvContext = CONTAINING_RECORD(lpOverlapped, TCP_CONTEXT, m_ol);
DWORD dwFlag = 0;
int nErrCode = 0;
int nPro = _SOCK_RECV;
if ((FALSE == bSuccess || 0 == dwNumberOfBytesTransfered) && (WSA_IO_PENDING != WSAGetLastError()))
closesocket(pRcvContext->m_hSock);
setsockopt(pRcvContext->m_hSock, SOL_SOCKET, SO_GROUP_PRIORITY, (char *)&nPro, sizeof(nPro));
#ifndef _XML_NET_ //處理二進制流
//非法而用戶端發來的資料包, 關閉該用戶端.
if (0 == pRcvContext->m_nDataLen && dwNumberOfBytesTransfered < sizeof(PACKET_HEAD))
#endif //#ifndef _XML_NET_
#ifdef _XML_NET_ //處理XML流
TCP_RCV_DATA* pRcvData = new TCP_RCV_DATA(
pRcvContext->m_hSock
, pRcvContext->m_pBuf
, dwNumberOfBytesTransfered
);
if (pRcvData && pRcvData->m_pData)
{
EnterCriticalSection(&m_RcvQueLock);
m_RcvDataQue.push_back(pRcvData);
LeaveCriticalSection(&m_RcvQueLock);
pRcvContext->m_nDataLen = 0;
RcvBuf.buf = pRcvContext->m_pBuf;
RcvBuf.len = TCP_CONTEXT::S_PAGE_SIZE;
#else //處理二進制資料流
//解析資料標頭資訊中應接收的資料包的長度
pRcvContext->m_nDataLen += dwNumberOfBytesTransfered;
PACKET_HEAD* pHeadInfo = (PACKET_HEAD*)(pRcvContext->m_pBuf);
//資料包長度合法才處理
if ((pHeadInfo->nCurrentLen <= TCP_CONTEXT::S_PAGE_SIZE)
//&& (0 == dwErrorCode)
&& ((WORD)(pRcvContext->m_nDataLen) <= pHeadInfo->nCurrentLen + sizeof(PACKET_HEAD)))
//該包的所有資料以讀取完畢, 将其放入到資料隊列中
if ((WORD)(pRcvContext->m_nDataLen) == pHeadInfo->nCurrentLen + sizeof(PACKET_HEAD))
TCP_RCV_DATA* pRcvData = new TCP_RCV_DATA(
pRcvContext->m_hSock
, pRcvContext->m_pBuf
, pRcvContext->m_nDataLen
);
if (pRcvData && pRcvData->m_pData)
EnterCriticalSection(&m_RcvQueLock);
m_RcvDataQue.push_back(pRcvData);
LeaveCriticalSection(&m_RcvQueLock);
pRcvContext->m_nDataLen = 0;
RcvBuf.buf = pRcvContext->m_pBuf;
RcvBuf.len = TCP_CONTEXT::S_PAGE_SIZE;
//資料沒有接收完畢繼續接收
RcvBuf.buf = pRcvContext->m_pBuf +pRcvContext->m_nDataLen;
RcvBuf.len = pHeadInfo->nCurrentLen - pRcvContext->m_nDataLen +sizeof(PACKET_HEAD);
//資料非法, 直接進行下一次讀操作
pRcvContext->m_nDataLen = 0;
//繼續投遞讀操作
nErrCode = WSARecv(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, &(pRcvContext->m_ol), NULL);
if (SOCKET_ERROR == nErrCode && WSA_IO_PENDING != WSAGetLastError())
_TRACE("Exp : %s -- %ld SOCKET = 0x%x ERR_CODE = 0x%x", __FILE__, lErrLine, pRcvContext->m_hSock, WSAGetLastError());
delete pRcvContext;
l SendCompletionProc(), 當發送操作完成後調用該接口
void TcpServer::SendCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
TCP_CONTEXT* pSendContext = CONTAINING_RECORD(lpOverlapped, TCP_CONTEXT, m_ol);
delete pSendContext;
pSendContext = NULL;
l AideThread(), 背景輔助線程主要負責對連接配接本伺服器的用戶端SOCKET隊列進行管理
UINT WINAPI TcpServer::AideThread(LPVOID lpParam)
const int SOCK_CHECKS = 10000;
int nSockTime = 0;
int nPro = 0;
int nTimeLen = 0;
vector<SOCKET>::iterator sock_itre = pThis->m_SocketQue.begin();
for (int index = 0; index < SOCK_CHECKS; index++)
nPro = 0;
nSockTime = 0x0000ffff;
// 檢查socket隊列
EnterCriticalSection(&(pThis->m_SockQueLock));
if (pThis->m_SocketQue.end() != sock_itre)
nTimeLen = sizeof(nPro);
getsockopt(*sock_itre, SOL_SOCKET, SO_GROUP_PRIORITY, (char *)&nPro, &nTimeLen);
if (_SOCK_RECV != nPro)
nTimeLen = sizeof(nSockTime);
getsockopt(*sock_itre, SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSockTime, &nTimeLen);
if (nSockTime > 120)
closesocket(*sock_itre);
pThis->m_pCloseFun(pThis->m_pCloseParam, *sock_itre);
pThis->m_SocketQue.erase(sock_itre);
_TRACE("%s -- %ld SOCKET = 0x%x出現錯誤S_ERR = 0x%x, nPro = 0x%x, TIME = %ld", __FILE__, __LINE__, *sock_itre, WSAGetLastError(), nPro, nSockTime);
sock_itre++;
sock_itre ++;
}
else
sock_itre = pThis->m_SocketQue.begin();
LeaveCriticalSection(&(pThis->m_SockQueLock));
break;
LeaveCriticalSection(&(pThis->m_SockQueLock));
Sleep(100);
return 0;
本伺服器的測試程式在WindowsXP的32位平台下測試時可以同時接受20K個用戶端同時連接配接到伺服器, CPU隻占10%, 記憶體占20M左右.