IOCP模型与网络编程
一、前言:
1. 我想只要是写过或者想要写C/S模式网络服务器端的朋友,都应该或多或少的听过完成端口的大名吧,完成端口会充分利用Windows内核来进行I/O的调度,是用于C/S通信模式中性能最好的网络通信模型,没有之一;甚至连和它性能接近的通信模型都没有。
2. 完成端口和其他网络通信方式最大的区别在哪里呢?
(1) 首先,如果使用“同步”的方式来通信的话,这里说的同步的方式就是说所有的操作都在一个线程内顺序执行完成,这么做缺点是很明显的:因为同步的通信操作会阻塞住来自同一个线程的任何其他操作,只有这个操作完成了之后,后续的操作才可以完成;一个最明显的例子就是咱们在MFC的界面代码中,直接使用阻塞Socket调用的代码,整个界面都会因此而阻塞住没有响应!所以我们不得不为每一个通信的Socket都要建立一个线程,多麻烦?这不坑爹呢么?所以要写高性能的服务器程序,要求通信一定要是异步的。
(2) 各位读者肯定知道,可以使用使用“同步通信(阻塞通信)+多线程”的方式来改善(1)的情况,那么好,想一下,我们好不容易实现了让服务器端在每一个客户端连入之后,都要启动一个新的Thread和客户端进行通信,有多少个客户端,就需要启动多少个线程,对吧;但是由于这些线程都是处于运行状态,所以系统不得不在所有可运行的线程之间进行上下文的切换,我们自己是没啥感觉,但是CPU却痛苦不堪了,因为线程切换是相当浪费CPU时间的,如果客户端的连入线程过多,这就会弄得CPU都忙着去切换线程了,根本没有多少时间去执行线程体了,所以效率是非常低下的,承认坑爹了不?
(3) 而微软提出完成端口模型的初衷,就是为了解决这种"one-thread-per-client"的缺点的,它充分利用内核对象的调度,只使用少量的几个线程来处理和客户端的所有通信,消除了无谓的线程上下文切换,最大限度的提高了网络通信的性能,这种神奇的效果具体是如何实现的请看下文。
3. 完成端口被广泛的应用于各个高性能服务器程序上,例如著名的Apache….如果你想要编写的服务器端需要同时处理的并发客户端连接数量有数百上千个的话,那不用纠结了,就是它了
二、提出相关问题:
1. IOCP模型是什么?
2. IOCP模型是用来解决什么问题的?它为什么存在?
3. 使用IOCP模型需要用到哪些知识?
4. 如何使用IOCP模型与Socket网络编程结合起来?
5. 学会了这个模型以后与我之前写过的简单的socket程序主要有哪些不同点?
部分问题探究及解决:
1. 什么是IOCP?什么是IOCP模型?IOCP模型有什么作用?
1) IOCP(I/O Completion Port),常称I/O完成端口。
2) IOCP模型属于一种通讯模型,适用于(能控制并发执行的)高负载服务器的一个技术。
3) 通俗一点说,就是用于高效处理很多很多的客户端进行数据交换的一个模型。
4) 或者可以说,就是能异步I/O操作的模型。
5) 只是了解到这些会让人很糊涂,因为还是不知道它究意具体是个什么东东呢?
下面我想给大家看三个图:
第一个是IOCP的内部工作队列图。(整合于《IOCP本质论》文章,在英文的基础上加上中文对照)
第二个是程序实现IOCP模型的基本步骤。(整合于《深入解释IOCP》,加个人观点、理解、翻译)
第三个是使用了IOCP模型及没使用IOCP模型的程序流程图。(个人理解绘制)
2. IOCP的存在理由(IOCP的优点)及技术相关有哪些?
之前说过,很通俗地理解可以理解成是用于高效处理很多很多的客户端进行数据交换的一个模型,那么,它具体的优点有些什么呢?它到底用到了哪些技术了呢?在Windows环境下又如何去使用这些技术来编程呢?它主要使用上哪些API函数呢?呃~看来我真是一个问题多多的人,跟前面提出的相关问题变种延伸了不少的问题,好吧,下面一个个来解决。
1) 使用IOCP模型编程的优点
① 帮助维持重复使用的内存池。(与重叠I/O技术有关)
② 去除删除线程创建/终结负担。
③ 利于管理,分配线程,控制并发,最小化的线程上下文切换。
④ 优化线程调度,提高CPU和内存缓冲的命中率。
2) 使用IOCP模型编程汲及到的知识点(无先后顺序)
① 同步与异步
② 阻塞与非阻塞
③ 重叠I/O技术
④ 多线程
⑤ 栈、队列这两种基本的数据结构
3) 需要使用上的API函数
① 与SOCKET相关
1、链接套接字动态链接库:int WSAStartup(...);
2、创建套接字库: SOCKET socket(...);
3、绑字套接字: int bind(...);
4、套接字设为监听状态: int listen(...);
5、接收套接字: SOCKET accept(...);
6、向指定套接字发送信息:int send(...);
7、从指定套接字接收信息:int recv(...);
② 与线程相关
1、创建线程:HANDLE CreateThread(...);
③ 重叠I/O技术相关
1、向套接字发送数据: int WSASend(...);
2、向套接字发送数据包: int WSASendFrom(...);
3、从套接字接收数据: int WSARecv(...);
4、从套接字接收数据包: int WSARecvFrom(...);
④ IOCP相关
1、创建完成端口: HANDLE WINAPI CreateIoCompletionPort(...);
2、关联完成端口: HANDLE WINAPI CreateIoCompletionPort(...);
3、获取队列完成状态: BOOL WINAPI GetQueuedCompletionStatus(...);
4、投递一个队列完成状态:BOOL WINAPI PostQueuedCompletionStatus(...);
四。完整的简单的IOCP服务器与客户端代码实例:
[cpp] view plaincopyprint?
1. // IOCP_TCPIP_Socket_Server.cpp
2.
3. #include
4. #include
5. #include
6. #include
7.
8. using namespace std;
9.
10. #pragma comment(lib, "Ws2_32.lib") // Socket编程需用的动态链接库
11. #pragma comment(lib, "Kernel32.lib") // IOCP需要用到的动态链接库
12.
13.
17. const int DataBuffSize = 2 * 1024;
18. typedef struct
19. {
20. OVERLAPPED overlapped;
21. WSABUF databuff;
22. char buffer[ DataBuffSize ];
23. int BufferLen;
24. int operationType;
25. }PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA;
26.
27.
32. typedef struct
33. {
34. SOCKET socket;
35. SOCKADDR_STORAGE ClientAddr;
36. }PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
37.
38. // 定义全局变量
39. const int DefaultPort = 6000;
40. vector < PER_HANDLE_DATA* > clientGroup; // 记录客户端的向量组
41.
42. HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);
43. DWORD WINAPI ServerWorkThread(LPVOID CompletionPortID);
44. DWORD WINAPI ServerSendThread(LPVOID IpParam);
45.
46. // 开始主函数
47. int main()
48. {
49. // 加载socket动态链接库
50. WORD wVersionRequested = MAKEWORD(2, 2); // 请求2.2版本的WinSock库
51. WSADATA wsaData; // 接收Windows Socket的结构信息
52. DWORD err = WSAStartup(wVersionRequested, &wsaData);
53.
54. if (0 != err){ // 检查套接字库是否申请成功
55. cerr << "Request Windows Socket Library Error!";
56. system("pause");
57. return -1;
58. }
59. if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2){// 检查是否申请了所需版本的套接字库
60. WSACleanup();
61. cerr << "Request Windows Socket Version 2.2 Error!";
62. system("pause");
63. return -1;
64. }
65.
66. // 创建IOCP的内核对象
67.
76. HANDLE completionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0);
77. if (NULL == completionPort){ // 创建IO内核对象失败
78. cerr << "CreateIoCompletionPort failed. Error:" << GetLastError() << endl;
79. system("pause");
80. return -1;
81. }
82.
83. // 创建IOCP线程--线程里面创建线程池
84.
85. // 确定处理器的核心数量
86. SYSTEM_INFO mySysInfo;
87. GetSystemInfo(&mySysInfo);
88.
89. // 基于处理器的核心数量创建线程
90. for(DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2); ++i){
91. // 创建服务器工作器线程,并将完成端口传递到该线程
92. HANDLE ThreadHandle = CreateThread(NULL, 0, ServerWorkThread, completionPort, 0, NULL);
93. if(NULL == ThreadHandle){
94. cerr << "Create Thread Handle failed. Error:" << GetLastError() << endl;
95. system("pause");
96. return -1;
97. }
98. CloseHandle(ThreadHandle);
99. }
100.
101. // 建立流式套接字
102. SOCKET srvSocket = socket(AF_INET, SOCK_STREAM, 0);
103.
104. // 绑定SOCKET到本机
105. SOCKADDR_IN srvAddr;
106. srvAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
107. srvAddr.sin_family = AF_INET;
108. srvAddr.sin_port = htons(DefaultPort);
109. int bindResult = bind(srvSocket, (SOCKADDR*)&srvAddr, sizeof(SOCKADDR));
110. if(SOCKET_ERROR == bindResult){
111. cerr << "Bind failed. Error:" << GetLastError() << endl;
112. system("pause");
113. return -1;
114. }
115.
116. // 将SOCKET设置为监听模式
117. int listenResult = listen(srvSocket, 10);
118. if(SOCKET_ERROR == listenResult){
119. cerr << "Listen failed. Error: " << GetLastError() << endl;
120. system("pause");
121. return -1;
122. }
123.
124. // 开始处理IO数据
125. cout << "本服务器已准备就绪,正在等待客户端的接入...";
126.
127. // 创建用于发送数据的线程
128. HANDLE sendThread = CreateThread(NULL, 0, ServerSendThread, 0, 0, NULL);
129.
130. while(true){
131. PER_HANDLE_DATA * PerHandleData = NULL;
132. SOCKADDR_IN saRemote;
133. int RemoteLen;
134. SOCKET acceptSocket;
135.
136. // 接收连接,并分配完成端,这儿可以用AcceptEx()
137. RemoteLen = sizeof(saRemote);
138. acceptSocket = accept(srvSocket, (SOCKADDR*)&saRemote, &RemoteLen);
139. if(SOCKET_ERROR == acceptSocket){ // 接收客户端失败
140. cerr << "Accept Socket Error: " << GetLastError() << endl;
141. system("pause");
142. return -1;
143. }
144.
145. // 创建用来和套接字关联的单句柄数据信息结构
146. PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA)); // 在堆中为这个PerHandleData申请指定大小的内存
147. PerHandleData -> socket = acceptSocket;
148. memcpy (&PerHandleData -> ClientAddr, &saRemote, RemoteLen);
149. clientGroup.push_back(PerHandleData); // 将单个客户端数据指针放到客户端组中
150.
151. // 将接受套接字和完成端口关联
152. CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), completionPort, (DWORD)PerHandleData, 0);
153.
154.
155. // 开始在接受套接字上处理I/O使用重叠I/O机制
156. // 在新建的套接字上投递一个或多个异步
157. // WSARecv或WSASend请求,这些I/O请求完成后,工作者线程会为I/O请求提供服务
158. // 单I/O操作数据(I/O重叠)
159. LPPER_IO_OPERATION_DATA PerIoData = NULL;
160. PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATEION_DATA));
161. ZeroMemory(&(PerIoData -> overlapped), sizeof(OVERLAPPED));
162. PerIoData->databuff.len = 1024;
163. PerIoData->databuff.buf = PerIoData->buffer;
164. PerIoData->operationType = 0; // read
165.
166. DWORD RecvBytes;
167. DWORD Flags = 0;
168. WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
169. }
170.
171. system("pause");
172. return 0;
173. }
174.
175. // 开始服务工作线程函数
176. DWORD WINAPI ServerWorkThread(LPVOID IpParam)
177. {
178. HANDLE CompletionPort = (HANDLE)IpParam;
179. DWORD BytesTransferred;
180. LPOVERLAPPED IpOverlapped;
181. LPPER_HANDLE_DATA PerHandleData = NULL;
182. LPPER_IO_DATA PerIoData = NULL;
183. DWORD RecvBytes;
184. DWORD Flags = 0;
185. BOOL bRet = false;
186.
187. while(true){
188. bRet = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&IpOverlapped, INFINITE);
189. if(bRet == 0){
190. cerr << "GetQueuedCompletionStatus Error: " << GetLastError() << endl;
191. return -1;
192. }
193. PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(IpOverlapped, PER_IO_DATA, overlapped);
194.
195. // 检查在套接字上是否有错误发生
196. if(0 == BytesTransferred){
197. closesocket(PerHandleData->socket);
198. GlobalFree(PerHandleData);
199. GlobalFree(PerIoData);
200. continue;
201. }
202.
203. // 开始数据处理,接收来自客户端的数据
204. WaitForSingleObject(hMutex,INFINITE);
205. cout << "A Client says: " << PerIoData->databuff.buf << endl;
206. ReleaseMutex(hMutex);
207.
208. // 为下一个重叠调用建立单I/O操作数据
209. ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存
210. PerIoData->databuff.len = 1024;
211. PerIoData->databuff.buf = PerIoData->buffer;
212. PerIoData->operationType = 0; // read
213. WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
214. }
215.
216. return 0;
217. }
218.
219.
220. // 发送信息的线程执行函数
221. DWORD WINAPI ServerSendThread(LPVOID IpParam)
222. {
223. while(1){
224. char talk[200];
225. gets(talk);
226. int len;
227. for (len = 0; talk[len] != '0'; ++len){
228. // 找出这个字符组的长度
229. }
230. talk[len] = '';
231. talk[++len] = '0';
232. printf("I Say:");
233. cout << talk;
234. WaitForSingleObject(hMutex,INFINITE);
235. for(int i = 0; i < clientGroup.size(); ++i){
236. send(clientGroup[i]->socket, talk, 200, 0); // 发送信息
237. }
238. ReleaseMutex(hMutex);
239. }
240. return 0;
241. }
[cpp] view plaincopyprint?
1. // IOCP_TCPIP_Socket_Client.cpp
2.
3. #include
4. #include
5. #include
6. #include
10. using namespace std;
11.
12. #pragma comment(lib, "Ws2_32.lib") // Socket编程需用的动态链接库
13.
14. SOCKET sockClient; // 连接成功后的套接字
15. HANDLE bufferMutex; // 令其能互斥成功正常通信的信号量句柄
16. const int DefaultPort = 6000;
17.
18. int main()
19. {
20. // 加载socket动态链接库(dll)
21. WORD wVersionRequested;
22. WSADATA wsaData; // 这结构是用于接收Wjndows Socket的结构信息的
23. wVersionRequested = MAKEWORD( 2, 2 ); // 请求2.2版本的WinSock库
24. int err = WSAStartup( wVersionRequested, &wsaData );
25. if ( err != 0 ) { // 返回值为零的时候是表示成功申请WSAStartup
26. return -1;
27. }
28. if ( LOBYTE( wsaData.wVersion ) != 2 || HIBYTE( wsaData.wVersion ) != 2 ) { // 检查版本号是否正确
29. WSACleanup( );
30. return -1;
31. }
32.
33. // 创建socket操作,建立流式套接字,返回套接字号sockClient
34. sockClient = socket(AF_INET, SOCK_STREAM, 0);
35. if(sockClient == INVALID_SOCKET) {
36. printf("Error at socket():%ld", WSAGetLastError());
37. WSACleanup();
38. return -1;
39. }
40.
41. // 将套接字sockClient与远程主机相连
42. // int connect( SOCKET s, const struct sockaddr* name, int namelen);
43. // 第一个参数:需要进行连接操作的套接字
44. // 第二个参数:设定所需要连接的地址信息
45. // 第三个参数:地址的长度
46. SOCKADDR_IN addrSrv;
47. addrSrv.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); // 本地回路地址是127.0.0.1;
48. addrSrv.sin_family = AF_INET;
49. addrSrv.sin_port = htons(DefaultPort);
50. while(SOCKET_ERROR == connect(sockClient, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR))){
51. // 如果还没连接上服务器则要求重连
52. cout << "服务器连接失败,是否重新连接?(Y/N):";
53. char choice;
54. while(cin >> choice && (!((choice != 'Y' && choice == 'N') || (choice == 'Y' && choice != 'N')))){
55. cout << "输入错误,请重新输入:";
56. cin.sync();
57. cin.clear();
58. }
59. if (choice == 'Y'){
60. continue;
61. }
62. else{
63. cout << "退出系统中...";
64. system("pause");
65. return 0;
66. }
67. }
68. cin.sync();
69. cout << "本客户端已准备就绪,用户可直接输入文字向服务器反馈信息。";
70.
71. send(sockClient, "Attention: A Client has enter...", 200, 0);
72.
73. bufferMutex = CreateSemaphore(NULL, 1, 1, NULL);
74.
75. DWORD WINAPI SendMessageThread(LPVOID IpParameter);
76. DWORD WINAPI ReceiveMessageThread(LPVOID IpParameter);
77.
78. HANDLE sendThread = CreateThread(NULL, 0, SendMessageThread, NULL, 0, NULL);
79. HANDLE receiveThread = CreateThread(NULL, 0, ReceiveMessageThread, NULL, 0, NULL);
80.
81.
82. WaitForSingleObject(sendThread, INFINITE); // 等待线程结束
83. closesocket(sockClient);
84. CloseHandle(sendThread);
85. CloseHandle(receiveThread);
86. CloseHandle(bufferMutex);
87. WSACleanup(); // 终止对套接字库的使用
88.
89. printf("End linking...");
90. printf("");
91. system("pause");
92. return 0;
93. }
94.
95.
96. DWORD WINAPI SendMessageThread(LPVOID IpParameter)
97. {
98. while(1){
99. string talk;
100. getline(cin, talk);
101. WaitForSingleObject(bufferMutex, INFINITE); // P(资源未被占用)
102. if("quit" == talk){
103. talk.push_back('0');
104. send(sockClient, talk.c_str(), 200, 0);
105. break;
106. }
107. else{
108. talk.append("");
109. }
110. printf("I Say:("quit"to exit):");
111. cout << talk;
112. send(sockClient, talk.c_str(), 200, 0); // 发送信息
113. ReleaseSemaphore(bufferMutex, 1, NULL); // V(资源占用完毕)
114. }
115. return 0;
116. }
117.
118.
119. DWORD WINAPI ReceiveMessageThread(LPVOID IpParameter)
120. {
121. while(1){
122. char recvBuf[300];
123. recv(sockClient, recvBuf, 200, 0);
124. WaitForSingleObject(bufferMutex, INFINITE); // P(资源未被占用)
125.
126. printf("%s Says: %s", "Server", recvBuf); // 接收信息
127.
128. ReleaseSemaphore(bufferMutex, 1, NULL); // V(资源占用完毕)
129. }
130. return 0;
131. }