大家好,又见面了,我是你们的朋友全栈君。
IOCP底层机理还没有透彻的理解,现将部分内容记录如下 2014.7.22 16:50
把完成端口理解为完成队列。
投递的异步IO请求完成后会携带三参数返回。
异步IO请求分为:连接、接收、发送,分别对应AcceptEx、WSARecv、WSASend。
三参数:单句柄数据结构、单IO数据结构、传输字节数。
用两种自定义结构:单句柄数据结构和单IO数据结构
单句柄数据结构与特定socket关联,在该socket上完成的所有类型的所有异步请求完成后都会返回该结构。单句柄数据结构的故事是这样的:把socket关联到完成端口时允许带一个整数,过后在该socket上完成的所有异步请求完成后都会返回该整数。
单IO数据机构与具体异步请求关联,每次投递请求时都要带一个单IO数据结构,请求A完成后会携带与A关联的单IO数据结构返回。
还原故事发生的大体情节:用
HANDLE hcp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if(hcp == NULL) { std::cout<<"Create Completion Port Failed : "<<GetLastError()<<std::endl; return -1; }
创建完成端口,现在假设socket s上需要进行异步接收,用
CreateIoCompletionPort((HANDLE)s, hcp, (DWORD)pphd, 0);
将s关联到完成端口hcp,同时带一整数pphd。pphd为单句柄数据结构的指针,该结构用new在堆上分配。故传入的是单句柄数据结构指针。用
LPPER_IO_DATA ppiod = new PER_IO_DATA; ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED)); ppiod->operationType = OP_RECV; WSABUF databuf; databuf.buf = ppiod->buf; databuf.len = BUF_LEN; DWORD dwRecv = 0; DWORD dwFlags = 0; WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL);
在s上投递请求。所有的工作线程阻塞在完成端口等待请求完成的到来
DWORD dwNum = 0; LPPER_HANDLE_DATA pphd; LPPER_IO_DATA ppiod; bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);
请求到来,唤醒一个工作线程,该线程从GetQueuedCompletionStatus函数返回,dwNum、pphd、ppiod三参数出参:
dwNum被赋值本次IO传输字节数,
pphd为指针,GetQueuedCompletionStatus中再取地址,是为指针地址,目的为指针赋值,所赋值为CreateIoCompletionPort((HANDLE)s, hcp, (DWORD)pphd, 0)传入的pphd,
ppiod同pphd,被赋值WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL)中的ppiod的地址。
自定义结构可以任意设置,唯一的要求就是单IO数据结构的第一个成员必须是OVERLAPPED。
#include <WinSock2.h> #include <Windows.h> #include <iostream> #include <process.h> #include <string> #include <MSWSock.h> #include <set> #pragma comment(lib, "Ws2_32.lib") #pragma comment(lib, "Kernel32.lib") #pragma comment(lib, "Mswsock.lib") #define BUF_LEN 1024 enum OperateType { OP_RECV, OP_SEND, OP_ACCEPT, }; typedef struct PER_HANDLE_DATA { SOCKET s; //记录是哪个socket上的请求 SOCKADDR_IN addr; //记录该socket对应的客户端地址和端口 }PER_HANDLE_DATA, *LPPER_HANDLE_DATA; typedef struct PER_IO_DATA { OVERLAPPED overlapped; //第一项必须为OVERLAPPED SOCKET cs; //记录客户端socket char buf[BUF_LEN]; //发送:此buf存储待发送数据,接收:此buf存储到来的数据 int operationType; //记录完成的请求类型:是接收?是发送? 还是连接? }PER_IO_DATA, *LPPER_IO_DATA;
异步连接
为什么监听连接用异步?
网上很多IOCP模型:主线程循环accept等待连接的到来,然后将Client socket加入IOCP,同时在Client socket上投递WSARecv等待数据到来。其他worker线程GetQueuedCompletionStatus阻塞在IOCP上等待请求的完成。
缺点:阻塞式accept效率不高,且接收连接单独占用一个线程,让原本的一个worker线程专门来等待连接,压榨了一个worker的潜能。
异步连接让所有运行的线程均为worker线程,且MSDN说AcceptEx比accpet连接进行得更快,可以用少量的线程处理大量的Client连接
整体过程:
1.创建listenSocket,与本地地址绑定,开始监听;
2.将listenSocket添加到IOCP;
3.用AcceptEx在listenSocket上投递连接请求。
如何投递异步连接请求?
bool PostAccept(SOCKET listenSocket) { /************************************************ 为即将到来的Client连接事先创建好Socket: 阻塞式连接中accept的返回值即为新进连接创建的Socket, 异步连接需要事先将此Socket备下,再行连接 **************************************************/ SOCKET cs = socket(AF_INET, SOCK_STREAM, 0); if(INVALID_SOCKET == cs) { std::cout<<"Create Socket Failed : "<<GetLastError()<<std::endl; return false; } /*每一个异步请求必须一个PER_IO_DATA结构*/ LPPER_IO_DATA ppiod = new PER_IO_DATA; ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED)); //PER_IO_DATA在使用前必须清空OVERLAPPED成员 ppiod->operationType = OP_ACCEPT; //待会从GetQueuedCompletionStatus返回,通过查看operationType就知道是接收完成?发送完成?还是连接完成 ppiod->cs = cs; //连接完成后要在新进连接上投递异步发送或接收,所以要事先把Client Socket记录下来以备后续使用 /*********************** 投递异步连接请求: 函数:AcceptEx 参数: 一参本地监听Socket 二参为即将到来的客人准备好的Socket 三参接收缓冲区: 一存客人发来的第一份数据、二存Server本地地址、三存Client远端地址 地址包括IP和端口, 四参定三参数据区长度,0表只连不接收、连接到来->请求完成,否则连接到来+任意长数据到来->请求完成 五参定三参本地地址区长度,至少sizeof(sockaddr_in) + 16 六参定三参远端地址区长度,至少sizeof(sockaddr_in) + 16 七八两参不用管 ***************************************/ DWORD dwRecv; int len = sizeof(sockaddr_in) + 16; bool ret = AcceptEx(listenSocket, ppiod->cs, ppiod->buf, 0, len, len, &dwRecv, &ppiod->overlapped); if(false == ret && ERROR_IO_PENDING != GetLastError()) { std::cout<<"AcceptEx Failed : "<<GetLastError()<<std::endl; return false; } return true; }
异步接收
bool PostRecv(SOCKET s) { /*每一个异步请求必须一个PER_IO_DATA结构*/ LPPER_IO_DATA ppiod = new PER_IO_DATA; ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED)); ppiod->operationType = OP_RECV; //请求类型是接收 memset(ppiod->buf, 0, BUF_LEN); //清空接收缓存 WSABUF databuf; databuf.buf = ppiod->buf; //接收缓冲区首地址 databuf.len = BUF_LEN; //接收缓冲区长度 /*********************** 投递异步接收请求: 函数:WSARecv 参数: 一参接收Socket 二参WSABUF指针,接收缓冲数组 三参说二参数组元素个数、接收缓冲个数 四五六七不用管 ***************************************/ DWORD dwRecv = 0; DWORD dwFlags = 0; int ret = WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL); if(SOCKET_ERROR == ret && WSA_IO_PENDING != GetLastError()) return false; return true; }
异步发送
bool PostSend(SOCKET s, const char *buf, int len) { /*每一个异步请求必须一个PER_IO_DATA结构*/ LPPER_IO_DATA ppiod = new PER_IO_DATA; ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED)); ppiod->operationType = OP_SEND; memset(ppiod->buf, 0, BUF_LEN); memcpy(ppiod->buf, buf, len); WSABUF databuf; databuf.buf = ppiod->buf; //发送缓冲首地址 databuf.len = len; //发送数据长度,定多少就发多少:多定多发,少定少发,不定不发 /*同WSARecv*/ DWORD dwRecv = 0; DWORD dwFlags = 0; WSASend(s, &databuf, 1, &dwRecv, dwFlags, &ppiod->overlapped, NULL); return true; }
处理完成的请求
bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);
全体worker线程循环调用GetQueuedCompletionStatus,阻塞在该函数调用中,等待从IOCP传来请求完成的通知。没有任何请求完成时,IOCP让worker沉睡;当请求到来时,IOCP唤醒最后入睡的worker线程起来执行处理。
GetQueuedCompletionStatus成功返回true,失败返回false。
实验结果如下:
1.连接到来, ret = true && dwNum = 0 && ppiod->operationType = OP_ACCEPT
2.连接断开:
A.Client调用closesocket,ret = true && dwNum = 0
B.Client直接退出,ret = false && dwNum = 0
C.Client暴力中断,如断电,2014.7.23 18:49 用两台电脑测试结果表明对于暴力断电,IOCP无任何反应,故心跳检测 必不可少
3.投递请求的线程退出,ret = false && dwNum = 0 && GetLastError() = 995(995:由于线程退出或应用程序请求,已放弃 I/O 操作)
附加:Client关闭连接或直接退出,在对应socket上投递的所有请求均返回。假设在socket 1上投递了三个WSARecv, 当Client关闭连接时,会有三个连接断开返回,不要重复释放空间。
用户控制退出
PostQueuedCompletionStatus(hcp, -1, NULL, NULL);
向IOCP投一个IO完成包,会有一个worker线程从GetQueuedCompletionStatus返回,同时它得到的三参数依次是dwNum = -1, pphd = NULL, ppiod = NULL,正是我们传入PostQueuedCompletionStatus的后三个参数。
PostQueuedCompletionStatus(hcp, X, Y, Z),则返回的worker线程得到的三参数依次是dwNum = X, pphd = Y, ppiod = Z
正常情况下dwNum只有两种取值:正整数和0,所以worker线程返回后通过查看dwNum == -1是否成立就可以判断是否到了结束的时间了。
理想情况下,以上述格式每投递一个完成包就有一个worker退出,所以我们应该投递workerNum个完成包来使所有worker退出。在非理想情况下我们应该防止还有worker没有退出,完美方式
void NetModule::Stop() { /*std::set<HANDLE> setWorkers存储所有worker线程句柄*/ for(size_t i = 0; i < setWorkers.size(); i++) PostQueuedCompletionStatus(hcp, -1, NULL, NULL); for(auto iter = setWorkers.begin(); iter != setWorkers.end();) { int ret = WaitForSingleObject(*iter, 100); if(WAIT_OBJECT_0 == ret) setWorkers.erase(iter++); else { PostQueuedCompletionStatus(hcp, -1, NULL, NULL); continue; } } }
所以检查GetQueuedCompletionStatus的返回值的标准方式
bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE); //线程退出控制,没有释放申请的堆空间,还不完善 if(-1 == dwNum) { std::cout<<"Thread Exit"<<std::endl; _endthreadex(0); return 0; } int type = ppiod->operationType; if(0 == dwNum && OP_ACCEPT != type) { //投递请求的线程退出 if(GetLastError() == 995) { std::cout<<"Thread Exit Cause Request Returned"<<std::endl; } //连接断开 else { std::cout<<"Peer Close The Connection"<<std::endl; //在一个socket上投递多个WSARecv需要考虑连接被Client断开时所有异步WSARecv均返回不会重复delete PER_HANDLE_DATA AutoLock lock(pphd->mutex); if(pphd->flag == true) { closesocket(pphd->s); delete pphd; pphd->flag = false; } } delete ppiod; continue; } //错误发生 if(false == ret) { std::cout<<"An Error Occurs : "<<GetLastError()<<std::endl; delete ppiod; continue; }
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/155531.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...