完成端口与线程池的关系_端口触发

完成端口与线程池的关系_端口触发关于IOCP网上到处都是资料,说的也很详细。我在这里就不再多说了,这只是本人在学习IOCP时的笔记,和配合AcceptEx写的一个极小的服务端程序。由于刚刚接触ICOP加上本人刚毕业不到一年,所以里面的理解或观点可能有误,还请大家多多批评!        VC6.0开发,旨在体现IOCP的架构,忽略细节,服务程序的功能只是接收客户连接,接着接收到客户数据,然后原封不动的返回给客户! 

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

关于IOCP网上到处都是资料,说的也很详细。我在这里就不再多说了,这只是本人在学习IOCP时的笔记,和配合AcceptEx写的一个极小的服务端程序。由于刚刚接触ICOP加上本人刚毕业不到一年,所以里面的理解或观点可能有误,还请大家多多批评!

         VC6.0开发,旨在体现IOCP的架构,忽略细节,服务程序的功能只是接收客户连接,接着接收到客户数据,然后原封不动的返回给客户!

 

下面这段话,如果不感兴趣,可以跳过去,直接看代码…

先说IOCP,其实思路很清晰:

         1.声明一个数据结构,用来存放客户套接字和客户信息

         2.声明一个包含OVERLAPPED字段的I/O结构

         3.创建完成端口

         4.创建服务线程

         5.接收客户端连接请求

         6.关联这个套接字到完成端口中

         7.服务线程中不断的等待I/O结果,在结果中提供服务和根据需要发起另一个异步操作。

 

         按照这个思路很快的写出了一个服务程序,但是遇到了下面的问题:

         1.WSAGetLastError()返回10045,找了半天才发现发起重叠操作时候,WSARecv中flag参数没有初始化,需要初始化赋值为0。

         2.在GetQueuedCompletionStatus中,没有错误,但总是返回读取的字数为0。I/O重叠结构中也收不到任何字符。这时候我就在这里用了一下recv()函数,在recv中却可以收到来自客户端发送的数据。难道每次都要自己recv()?肯定不是!如果那样还用扩展的I/O结果何用。一定是哪里指定了接收的数目,而自己不小心指定为0了,所以没有接收数据。找了半天果然如此。在发起重叠操作时候,扩展的I/O中WSABUF的赋值有问题。

         我的错误:wsaBuf.len = (I/O结构).len;

         改为:       wsaBuf.len = (I/O结构).len = DATABUF_SIZE;

         修改之后终于可以接收和发送数据了。

 

 

为什么要用AcceptEx?

         在学习IOCP时,看到一位大神写的文章,他用客户端开了3W个线程同时连接服务端和发送数据,我好奇就也开了3W个线程去同时连接服务端,结果很多都printf连接失败的信息!再看看大神的文章,再搜一下AcceptEx。对比accept,觉得AcceptEx确实很强大。AcceptEx和accept主要的区别就在于接收套接字:

         accept函数是等待客户连接进来之后才创建套接字,虽然在我们看到的就是一个socket函数,但是在函数背后,系统应该会消耗不少资源,因为它要打通一个和外界通讯的路。如果大量套接字并发接入,难免有的套接字不能及时创建和接收。

         AcceptEx则是事先创建好套接字,坐等客户端的连接就行了。

 

         但是,AcceptEx相比accept确实复杂了很多。原来一句accept就可以解决的,现在却要为AcceptEx做很多服务,但是只要理清思路,这个做起来也是很从容的。

         1.创建一个监听套接字

         2.将监听套接字关联到完成端口中

         3.对监听套接字调用bind()、listen()

         4.通过WSAIoctl获取AcceptEx、GetAcceptExSockaddrs函数的指针

         5.创建一个用于接收客户连接的套接字

         6.用获取到的AcceptEx函数指针发起用于接收连接的异步操作

         7.服务器接收到连接的套接字,设置一下它的属性(有人说没有必要)。用这个接收到的套接字去发起重叠的I/O操作。

         8.多次重复5,6就是多次发起接收连接的异步操作的过程。

         对于第4步,为什么要获取AcceptEx的指针,而不是直接就调用AcceptEx这个函数呢?网上找到的资料是这么说的:

         Winsock2的其他供应商不一定会实现AcceptEx函数。同样情况也包括的其他Microsoft的特定APIs如TransmitFile,GetAcceptExSockAddrs以及其他Microsoft将在以后版本的windows里。

在运行WinNT和Win2000的系统上,这些APIs在Microsoft提供的DLL(mswsock.dll)里实现,可以通过链接mswsock.lib或者通过WSAioctl的SIO_GET_EXTENSION_FUNCTION_POINTER操作动态调用这些扩展APIs.

未获取函数指针就调用函数(如直接连接mswsock.lib并直接调用AcceptEx)的消耗是很大的,因为AcceptEx实际上是存在于Winsock2结构体系之外的。每次应用程序常试在服务提供层上(mswsock之上)调用AcceptEx时,都要先通过WSAIoctl获取该函数指针。如果要避免这个很影响性能的操作,应用程序最好是直接从服务提供层通过WSAIoctl先获取这些APIs的指针。

这样一来,大家就不觉得这个复杂的函数WSAloctl那么让人心烦了吧!至于调用失败后所返回的错误代码,百度百科中介绍的很详细!

 

使用AcceptEx后:

         在使用AcceptEx后,并发2000个套接字去连接客户端,不再出现连接失败的消息了。

         但是,你肯定会说人家3W个,你这2000个不能说明问题。开始我也一直在尝试同时并发3W个线程,可是发现公司机器最多时候也就1573个连接,家里笔记本差不多2000个。这是怎么会事呢?于是搜资料查到一个进程最多可以开启的理论线程数是2048个线程,而且实际情况下通常小于这个值,这样在一个进程里面怎么可能有3W个连接啊!忍不住好奇就下了http://blog.csdn.net/piggyxp/article/details/6922277大神的IOCP客户端demo,发现并不是同时并发3W个,用任务管理器看并发最多时候线程数并没有超过1K(无意冒犯大神,只是个人的愚见,我学习IOCP也是大部分都是从大神的文章中学习到的,所以先要感谢大神的奉献,同时如果(不是如果,是肯定)我的理解有错误,希望大家不吝赐教,多多批评,鄙人一定感激万分)。

         为了验证IOCP是否有那么强的能力,我的客户端没有做成连接到服务端一个套接字,再创建一个线程,传递套接字到线程的方式。而是,主线程直接创建2000个线程,在每个线程中去连接服务器(觉得这样更能体现并发连接),多开几个客户端,每个客户端的连接数为最大线程数,服务端同时处理的连接数为12562(开更多的线程连接数更多,有兴趣的可以试一下)。下面是360的流量管理下面的截图:完成端口与线程池的关系_端口触发

我注释掉了接收数据后printf接收到的数据,因为发现如果连接过多,一直printf服务器就挂掉了,不知道改成mfc会不会好点…

         下面是服务器代码:

#include "stdafx.h"
#include <Afx.h>
#include <Windows.h>
#include <Winsock2.h>
#pragma comment(lib, "WS2_32.lib")
#include <mswsock.h>    //微软扩展的类库
#define DATA_BUFSIZE 100
#define READ   0
#define WRITE  1
#define ACCEPT 2
DWORD g_count = 0;
//扩展的输入输出结构
typedef struct _io_operation_data
{
OVERLAPPED	overlapped;
WSABUF		databuf;
CHAR		buffer[DATA_BUFSIZE];
BYTE		type;
DWORD		len;
SOCKET		sock;
}IO_OPERATION_DATA, *LP_IO_OPERATION_DATA;
//完成键
typedef struct _completion_key
{
SOCKET sock;
char   sIP[30];		//本机测试,IP都是127.0.0.1,没啥意思,实际写时候这个值填的是端口号
}COMPLETION_KEY, *LP_COMPLETION_KEY;
///
//完成端口句柄
HANDLE g_hComPort = NULL;
BOOL   g_bRun = FALSE;
BOOL AcceptClient(SOCKET sListen);		//发起接收连接操作
BOOL Recv(COMPLETION_KEY *pComKey, IO_OPERATION_DATA *pIO);	//发起接收操作
BOOL Send(COMPLETION_KEY *pComKey, IO_OPERATION_DATA *pIO);	//发起发送操作
//处理重叠结果
BOOL ProcessIO(IO_OPERATION_DATA *pIOdata, COMPLETION_KEY *pComKey);
//
//服务线程
DWORD WINAPI ServerWorkerThread( LPVOID pParam );
//
LPFN_ACCEPTEX lpfnAcceptEx = NULL;					 //AcceptEx函数指针
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;  //加载GetAcceptExSockaddrs函数指针
///
//监听套接字,其实也不一定要是全局的。用于接收到连接后继续发起等待连接操作。
SOCKET g_sListen;
int main(int argc, char* argv[])
{
g_bRun = TRUE;
//创建完成端口
g_hComPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );
if( g_hComPort == NULL )
{
printf("Create completionport error!   %d\n", WSAGetLastError() );
return 0;
}
//创建服务线程
SYSTEM_INFO sysInfor;
GetSystemInfo( &sysInfor );
int i=0;
for(i = 0; i < sysInfor.dwNumberOfProcessors * 2; i++)
//	if(true)
{
HANDLE hThread;
DWORD  dwThreadID;
hThread = CreateThread( NULL, 0, ServerWorkerThread, g_hComPort, 0, &dwThreadID );
CloseHandle( hThread );
}
//加载套接字库
WSADATA wsData;
if( 0 != WSAStartup( 0x0202, &wsData ) )
{
printf("加载套接字库失败!   %d\n", WSAGetLastError() );
g_bRun = FALSE;
return 0;
}
//等待客户端连接
//先创建一个套接字用于监听
SOCKET sListen = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED );
g_sListen = sListen;
//将监听套接字与完成端口绑定
LP_COMPLETION_KEY pComKey;		//完成键
pComKey = (LP_COMPLETION_KEY) GlobalAlloc ( GPTR, sizeof(COMPLETION_KEY) );
pComKey->sock = sListen;
CreateIoCompletionPort( (HANDLE)sListen, g_hComPort, (DWORD)pComKey, 0 );
//监听套接字绑定监听
SOCKADDR_IN serAdd;
serAdd.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
serAdd.sin_family = AF_INET;
serAdd.sin_port = htons( 6000 );
bind( sListen, (SOCKADDR*)&serAdd, sizeof(SOCKADDR) );
listen( sListen, 5 );
if( sListen == SOCKET_ERROR )
{
goto STOP_SERVER;
}
/
//使用WSAIoctl获取AcceptEx函数指针
if( true )
{
DWORD dwbytes = 0;
//Accept function GUID
GUID guidAcceptEx = WSAID_ACCEPTEX;
if( 0 != WSAIoctl( sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, 
&guidAcceptEx, sizeof(guidAcceptEx), 
&lpfnAcceptEx, sizeof(lpfnAcceptEx), 
&dwbytes, NULL, NULL) )
{
//百度百科,有关该函数的所有返回值都有!
}
// 获取GetAcceptExSockAddrs函数指针,也是同理
GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
if( 0 != WSAIoctl( sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, 
&guidGetAcceptExSockaddrs,
sizeof(guidGetAcceptExSockaddrs), 
&lpfnGetAcceptExSockaddrs, 
sizeof(lpfnGetAcceptExSockaddrs),   
&dwbytes, NULL, NULL) )  
{  
}  
}
//发起接收的异步操作
for(i=0; i<2000; i++ )
{
AcceptClient(sListen);	
}
//不让主线程退出
while( g_bRun )
{
Sleep(1000);
}
STOP_SERVER:
closesocket( sListen );
g_bRun = FALSE;
WSACleanup();
return 0;
}
/
//服务线程
DWORD WINAPI ServerWorkerThread( LPVOID pParam )
{
HANDLE  completionPort = (HANDLE)pParam;
DWORD	dwIoSize;
COMPLETION_KEY		  *pComKey;		//完成键
LP_IO_OPERATION_DATA  lpIOoperData;		//I/O数据
//用于发起接收重叠操作
BOOL bRet;
while( g_bRun )
{
bRet = FALSE;
dwIoSize = -1;
pComKey = NULL;
lpIOoperData = NULL;
bRet = GetQueuedCompletionStatus( g_hComPort, &dwIoSize, (LPDWORD)&pComKey, (LPOVERLAPPED*)&lpIOoperData,INFINITE );
if( !bRet )
{
DWORD dwIOError = GetLastError();
if( WAIT_TIMEOUT == dwIOError )
{
continue;
}
else if( NULL != lpIOoperData )
{
CancelIo( (HANDLE)pComKey->sock );	//取消等待执行的异步操作
closesocket(pComKey->sock);				
GlobalFree( pComKey );		
}
else
{
g_bRun = FALSE;
break;
}
}
else
{		
if( 0 == dwIoSize  && (READ==lpIOoperData->type || WRITE==lpIOoperData->type) )
{
printf("客户断开了连接!\n");
CancelIo( (HANDLE)pComKey->sock );	//取消等待执行的异步操作
closesocket(pComKey->sock);				
GlobalFree( pComKey );
GlobalFree( lpIOoperData );
continue;
}
else 
{
ProcessIO( lpIOoperData, pComKey );
}
}
}
return 0;
}
BOOL ProcessIO(IO_OPERATION_DATA *pIOoperData, COMPLETION_KEY *pComKey)
{
if( pIOoperData->type == READ )
{
//打印接收到的内容
//	char ch[100] = { 0 };
//	sprintf(ch, "%s  :  %s", pComKey->sIP, pIOoperData->buffer);
//	printf( ch );
Send( pComKey, pIOoperData );	//将接收到的内容原封不动的发送回去
}
else if( pIOoperData->type == WRITE )
{
Recv( pComKey, pIOoperData );	//发起接收操作
}
else if( pIOoperData->type == ACCEPT )
{	//使用GetAcceptExSockaddrs函数 获得具体的各个地址参数.
printf("accept sucess!\n");
setsockopt( pIOoperData->sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&(pComKey->sock), sizeof(pComKey->sock) );
LP_COMPLETION_KEY pClientComKey = (LP_COMPLETION_KEY) GlobalAlloc ( GPTR, sizeof(COMPLETION_KEY) );
pClientComKey->sock = pIOoperData->sock;
SOCKADDR_IN *addrClient = NULL, *addrLocal = NULL;
int nClientLen = sizeof(SOCKADDR_IN), nLocalLen = sizeof(SOCKADDR_IN);
lpfnGetAcceptExSockaddrs(pIOoperData->buffer, 0, 
sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, 
(LPSOCKADDR*)&addrLocal, &nLocalLen,
(LPSOCKADDR*)&addrClient, &nClientLen);
sprintf(pClientComKey->sIP, "%d", addrClient->sin_port );	//cliAdd.sin_port );
printf(pClientComKey->sIP );
CreateIoCompletionPort( (HANDLE)pClientComKey->sock, g_hComPort, (DWORD)pClientComKey, 0 );	//将监听到的套接字关联到完成端口
Recv( pClientComKey, pIOoperData );
//	char s[30] = {0};
//	sprintf( s, "%d\n", g_count++ );
//	printf(s);
//接收到一个连接,就再发起一个异步操作!
AcceptClient( g_sListen );	
}
return TRUE;
}
BOOL AcceptClient(SOCKET sListen)
{	
DWORD dwBytes;
LP_IO_OPERATION_DATA pIO;
pIO = (LP_IO_OPERATION_DATA) GlobalAlloc (GPTR, sizeof(IO_OPERATION_DATA));	
pIO->databuf.buf = pIO->buffer;
pIO->databuf.len = pIO->len = DATA_BUFSIZE;
pIO->type = ACCEPT;
//先创建一个套接字(相比accept有点就在此,accept是接收到连接才创建出来套接字,浪费时间. 这里先准备一个,用于接收连接)
pIO->sock = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED );
//调用AcceptEx函数,地址长度需要在原有的上面加上16个字节
//向服务线程投递一个接收连接的的请求
BOOL rc = lpfnAcceptEx( sListen, pIO->sock,
pIO->buffer, 0, 
sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, 
&dwBytes, &(pIO->overlapped) );
if( FALSE == rc )
{
if( WSAGetLastError() != ERROR_IO_PENDING )
{
printf("%d", WSAGetLastError() );
return false;
}
}
return true;
}
BOOL Recv(COMPLETION_KEY *pComKey, IO_OPERATION_DATA *pIOoperData)
{
DWORD flags = 0;
DWORD recvBytes = 0;
ZeroMemory( &pIOoperData->overlapped, sizeof(OVERLAPPED) );
pIOoperData->type = READ;	
pIOoperData->databuf.buf = pIOoperData->buffer;
pIOoperData->databuf.len = pIOoperData->len = DATA_BUFSIZE;
if( SOCKET_ERROR == WSARecv( pComKey->sock, &pIOoperData->databuf, 1, &recvBytes, &flags,  &pIOoperData->overlapped, NULL) )
{
if( ERROR_IO_PENDING != WSAGetLastError() )
{
printf("发起重叠接收失败!   %d\n", GetLastError() );
return FALSE;
}
}
return TRUE;
}
BOOL Send(COMPLETION_KEY *pComKey, IO_OPERATION_DATA *pIOoperData)
{
DWORD flags = 0;
DWORD recvBytes = 0;
ZeroMemory( &pIOoperData->overlapped, sizeof(OVERLAPPED) );
pIOoperData->type = WRITE;
pIOoperData->databuf.len = 100;		
if( SOCKET_ERROR == WSASend( pComKey->sock, &pIOoperData->databuf, 1, &recvBytes, flags,  &pIOoperData->overlapped , NULL) )
{
if( ERROR_IO_PENDING != WSAGetLastError() )
{
printf("发起发送重叠接收失败!\n");
return FALSE;
}
}
return TRUE;
}

对于客户端就更简单了,只是创建线程,请求连接,发送数据,接收数据

#include "stdafx.h"
#include <Afx.h>
#include <Windows.h>
#include <Winsock2.h>
#pragma comment(lib, "WS2_32.lib")
DWORD WINAPI Thread(LPVOID lParam);
int main(int argc, char* argv[])
{
WSADATA dwData;
WSAStartup( 0x0202, &dwData );
for( int i = 0; i < 2000; i++ )
{
HANDLE hThread = NULL;
hThread = CreateThread(NULL, 0, Thread, NULL, 0, 0);
CloseHandle(hThread);
hThread = NULL;
}
while( true )
{
Sleep(100);
}
return 0;
}
DWORD WINAPI Thread(LPVOID lParam)
{
SOCKET sock = socket( AF_INET, SOCK_STREAM, 0 );
SOCKADDR_IN serAddr;
serAddr.sin_family = AF_INET;
serAddr.sin_port = htons(6000);
serAddr.sin_addr.S_un.S_addr = inet_addr(_T("127.0.0.1"));
int reVal = connect( sock, (SOCKADDR*)&serAddr, sizeof(SOCKADDR) );
if( reVal==SOCKET_ERROR )
{
printf("cannot client SERVER!   %d\n", WSAGetLastError());
return 0;
}	
int i=0;
char buf[100] = _T("光阴的故事!\n");
while( true )
{			
if( SOCKET_ERROR == send( sock, buf, 100, 0 ) )
{
printf("cannot SEND message to server!   %d\n", WSAGetLastError());
break;
}
memset( buf, 0, strlen(buf) );	//清空一下,体现是接收到的数据
if(  SOCKET_ERROR == recv( sock, buf, 100, 0 ) )
{
printf("cannot RECV message to server!   %d\n", WSAGetLastError());
break;
}
//	printf( buf );
Sleep(3000);		
}
closesocket(sock);
return 0;
}

将代码贴到编译器中即可,也可以下载这个demo  http://download.csdn.net/detail/u010025913/7250965

代码漏洞百出,希望大家多多批评指教!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/187798.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 如何创建一个Java项目[通俗易懂]

    如何创建一个Java项目[通俗易懂]文章目录新建项目项目信息配置创建Java类编译和运行新建项目首先双击eclipse进入到eclipse页面。菜单“File”下的“New”里“JavaProject”,点击即可创建Java项目。项目信息配置1)给项目起名称。2)usedefaultlocation(使用默认位置),当然,你也可以使用Browse更改默认位置3)确保勾选Useprojectfolderasrootforsourcesandclassfiles,从而.java文件(源文件)和.clas

  • https加密原理(转)

    https加密原理(转)HTTP、HTTPS在我们日常开发中是经常会接触到的。我们也都知道,一般Android应用开发,在请求API网络接口的时候,很多使用的都是HTTP协议;使用浏览器打开网页,也是利用HTTP协议。看来HTTP真是使用广泛啊,但是,HTTP是不安全的。利用网络抓包工具就可以知道传输中的内容,一览无余。比如我经常会使用Fiddler来抓包,搜集一些有趣的API接口。那么…

  • 【ZigBee协议栈简介】

    【ZigBee协议栈简介】1、Zigbee协议栈简介  协议是一系列的通信标准,通信双方需要按照这一标准进行正常的数据发射和接收。协议栈是协议的具体实现形式,通俗讲协议栈就是协议和用户之间的一个接口,开发人员通过使用协议栈来使用这个协议,进而实现无线数据收发。  如图1所示:Zigbee协议分为两部分,IEEE802.15.4定义了PHY(物理层)和MAC(介质访问层)技术规范;Zigbee联盟定义了NWK(网络…

  • Ubuntu下VLC播放器的字幕乱码问题

    Ubuntu下VLC播放器的字幕乱码问题为了为可能进入的实验室实习做准备,今天重新装上了Ubuntu,今天的安装总的来说还是顺利多了。在播放软件上,这次我选择了VLC,因为感觉mplayer虽然强大,但是始终界面不是十分友好。而VLC也是灰常强大的。但是,在Linux下播放电影时,经常会遇到乱码的问题,下面就谈谈我的经验。造成字幕乱码的原因可能有两个:1.GB字符的解码:因为Linux下中文默认采取utf-

  • verilog hdl与fpga数字系统设计_简易交通信号灯控制系统

    verilog hdl与fpga数字系统设计_简易交通信号灯控制系统1、系统设计要求该交通灯控制器用于主干道与支道公路的交叉路口,要求是优先保证主干道的畅通,因此,设计要求如下。1、平时处于“主干道绿灯,支道红灯”状态,只有在支道有车辆要穿过主干道时,才将交通灯切向“主干道红灯,支道绿灯”,一旦支道无车辆通过路口,交通灯又回到“主干道绿灯,支道红灯”的状态。2、主干道每次通行的时间不得短于1min,支路每次通行的时间不得长于20s,而这两个状态交换过程中出现“主干道黄灯,支道红灯”和“主干道红灯,支道黄灯”的状态,持续时间都为4s。2、设计分析1、用状态

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号