一种基于Qt的可伸缩的全异步C/S架构server实现(二) 网络传输

一种基于Qt的可伸缩的全异步C/S架构server实现(二) 网络传输

大家好,又见面了,我是全栈君。

二、网络传输模块

模块相应代码命名空间    (namespace ZPNetwork)

模块相应代码存储目录    (\ZoomPipeline_FuncSvr\network)

2.1 模块结构

网络传输模块负责管理监听器,并依据各个传输线程眼下的负荷,把新申请接入的客户套接字描写叙述符引导到最空暇的传输线程中运行“接受连接(Accept)”操作。该模块由例如以下几个类组成。

1、zp_net_Engine类,派生自Qobject。模块的外部接口类。同一时候也是功能管理者。提供了设置监听器、配置线程池的功能。

2、zp_netListenThread类:派生自QObject。用于绑定在各个监听线程的事件循环中,不断的接受client连接请求。该类会在信号中把套接字描写叙述符(socketdescriptor)泵出,由zp_net_Engine类进行负荷均衡,选取当前负荷最小的传输线程(zp_netTransThread)接受该接入申请。

3、zp_netTransThread类:派生自QObject。用于绑定在各个传输线程的事件循环中,详细承担传输数据。一个zp_netTransThread线程能够承担多个client的收发请求。

4、ZP_TcpServer类:派生自QtcpServer。

重载了ZP_TcpServer::incomingConnection,不在监听线程中进行Accept操作,而是直接发出evt_NewClientArrived信号,把套接字描写叙述符(socketdescriptor)泵出。由zp_net_Engine类进行负荷均衡,选取当前负荷最小的传输线程(zp_netTransThread)接受该接入申请。

这四个类的合作关系图例如以下

一种基于Qt的可伸缩的全异步C/S架构server实现(二) 网络传输

2.2 系统原理

为了提供基于线程池的TCP服务。zp_net_engine类有几个重要成员。以下,依照一次client发起连接的过程,逆向的逐一来介绍这些类的合作原理.

2.2.1 监听器与监听线程

1、监听器ZP_TcpServer
系统执行时。负责监听工作的是 QtcpServer 派生类,名称叫ZP_TcpServer。该类重载了 QtcpServer的incomingConnection()方法1。当网络中一个client发起连接时,这个函数会被立马调用。在本派生类中。并没有直接产生套接字。它只触发了一个称为“evt_NewClientArrived”的信号2。这个信号把套接字描写叙述符泵出给接受者,用于在其它的线程中创建套接字所用。其流程见2.2.2节所述。

2、监听器线程对象zp_netListenThread
ZP_TcpServer类的实例详细是由zp_netListenThread类中一个指针 m_tcpServer操作的。

m_tcpServer是一个指向ZP_TcpServe类实例的指针(參见zp_netlistenthread.h )。

该实例在zp_netListenThread::startListen()中创建。StartListen是一个关键的函数。创建了ZP_TcpServer对象。核心代码例如以下:

				m_tcpServer = new ZP_TcpServer(this);
				connect (m_tcpServer,&ZP_TcpServer::evt_NewClientArrived,this,&zp_netListenThread::evt_NewClientArrived,Qt::QueuedConnection);

上面两行代码中,第一行创建一个监听服务。第二行。把监听服务的evt_NewClientArrived事件直接和zp_netListenThread 的 同名事件连接起来。

3、操作监听器的模块接口类zp_net_Engine
    zp_netListenThread类本身是从Qobject派生。它本身不是一个线程对象,而是被“绑定”在一个线程对象中执行的。

一个进程能够拥有若干监听port,这些监听port相应了不同的zp_netListenThread对象。这些监听线程对象由zp_net_Engine类管理,存储在这个类的成员变量中。以下两个成员变量

		//This map stores listenThreadObjects
		QMap<QString,zp_netListenThread *> m_map_netListenThreads;
		//Internal Threads to hold each listenThreadObjects' message Queue
		QMap<QString,QThread *> m_map_netInternalListenThreads;

第一个存储了各个port的线程对象,第二个存储了各个port的线程。

因为详细下达监听任务的线程是主线程(UI),但运行任务的线程是工作线程,所以,全部的指令均不是通过直接的函数调用来实现。取而代之的是使用Qt的信号与槽。比方。UIbutton被点击。则触发了startListen 信号。转而由zp_netListenThread的startListen槽来响应。

这里须要注意的是,因为Qt的信号与槽系统是一种广播系统。意味着一个zp_net_Engine类管理多个zp_netListenThread对象时。zp_net_Engine发出的信号会被全部zp_netListenThread对象接收。因此,信号与槽中含有一个唯一标示,用于指示本次信号触发是为了操作详细哪个对象。这样的技术在类似的场合被多次使用。

void zp_net_Engine::AddListeningAddress(QString  id,const QHostAddress & address , quint16 nPort,bool bSSLConn /*= true*/)
	{
		if (m_map_netListenThreads.find(id)==m_map_netListenThreads.end())
		{
			//Start Thread
			QThread * pThread = new QThread(this);
			zp_netListenThread * pListenObj = new zp_netListenThread(id,address,nPort,bSSLConn);
			pThread->start();
			//m_mutex_listen.lock();
			m_map_netInternalListenThreads[id] = pThread;
			m_map_netListenThreads[id] = pListenObj;
			//m_mutex_listen.unlock();
			//Bind Object to New thread
			connect(this,&zp_net_Engine::startListen,pListenObj,&zp_netListenThread::startListen,Qt::QueuedConnection);
			connect(this,&zp_net_Engine::stopListen,pListenObj,&zp_netListenThread::stopListen,Qt::QueuedConnection);
			connect(pListenObj,&zp_netListenThread::evt_Message,this,&zp_net_Engine::evt_Message,Qt::QueuedConnection);
			connect(pListenObj,&zp_netListenThread::evt_ListenClosed,this,&zp_net_Engine::on_ListenClosed,Qt::QueuedConnection);
			connect(pListenObj,&zp_netListenThread::evt_NewClientArrived,this,&zp_net_Engine::on_New_Arrived_Client,Qt::QueuedConnection);

			pListenObj->moveToThread(pThread);
			//Start Listen Immediately
			emit startListen(id);
		}
		else
			emit evt_Message(this,"Warning>"+QString(tr("This ID has been used.")));
	}

2.2.2 接受连接过程

client发起接入请求后,首先触发了ZP_TcpServer的incomingConnection方法。

在以下这种方法中,套接字的描写叙述符作为事件的參数被泵出。

	void ZP_TcpServer::incomingConnection(qintptr socketDescriptor)
	{
		emit evt_NewClientArrived(socketDescriptor);
	}

    上面的信号相应的槽为zp_net_Engine::on_New_Arrived_Client槽函数。在这个函数中,网络模块首先从当前可用的传输线程中确定最空暇的那个线程,而后把套接字描写叙述符转交给传输线程。这个部分的核心代码:

	void zp_net_Engine::on_New_Arrived_Client(qintptr socketDescriptor)
	{
		zp_netListenThread * pSource = qobject_cast<zp_netListenThread *>(sender());
		if (!pSource)
		{
			emit evt_Message(this,"Warning>"+QString(tr("Non-zp_netListenThread type detected.")));
			return;
		}

		emit evt_Message(this,"Info>" +  QString(tr("Incomming client arriverd.")));
		int nsz = m_vec_NetTransThreads.size();
		int nMinPay = 0x7fffffff;
		int nMinIdx = -1;

		for (int i=0;i<nsz && nMinPay!=0;i++)
		{
			if (m_vec_NetTransThreads[i]->isActive()==false ||
					m_vec_NetTransThreads[i]->SSLConnection()!=pSource->bSSLConn()
					)
				continue;
			int nPat = m_vec_NetTransThreads[i]->CurrentClients();

			if (nPat<nMinPay)
			{
				nMinPay = nPat;
				nMinIdx = i;
			}
			//qDebug()<<i<<" "<<nPat<<" "<<nMinIdx;
		}
//...
		if (nMinIdx>=0 && nMinIdx<nsz)
			emit evt_EstablishConnection(m_vec_NetTransThreads[nMinIdx],socketDescriptor);
		else
		{
			emit evt_Message(this,"Warning>"+QString(tr("Need Trans Thread Object for clients.")));
		}
	}

上面的代码中, evt_EstablishConnection 事件携带了由均衡策略确定的承接线程、socketDescriptor 描写叙述符。

这个事件广播给全部的传输线程对象。在各个对象的incomingConnection槽中,详细生成用于传输的套接字对象.注意, 这个槽函数是执行在各个传输线程的事件循环中的,因此,创建的套接字直接属于特定线程.

	/**
	 * @brief This slot dealing with multi-thread client socket accept.
	 * accepy works start from zp_netListenThread::m_tcpserver, end with this method.
	 * the socketDescriptor is delivered from zp_netListenThread(a Listening thread)
	 *  to zp_net_Engine(Normally in main-gui thread), and then zp_netTransThread.
	 *
	 * @param threadid if threadid is not equal to this object, this message is just omitted.
	 * @param socketDescriptor socketDescriptor for incomming client.
	 */
	void zp_netTransThread::incomingConnection(QObject * threadid,qintptr socketDescriptor)
	{
		if (threadid!=this)
			return;
		QTcpSocket * sock_client = 0;
		if (m_bSSLConnection)
			sock_client =  new QSslSocket(this);
		else
			sock_client =  new QTcpSocket(this);
		if (sock_client)
		{
			//Initial content
			if (true ==sock_client->setSocketDescriptor(socketDescriptor))
			{
				connect(sock_client, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved,Qt::QueuedConnection);
				connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection);
				connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection);
				connect(sock_client, &QTcpSocket::bytesWritten, this, &zp_netTransThread::some_data_sended,Qt::QueuedConnection);
				m_mutex_protect.lock();
				m_clientList[sock_client] = 0;
				m_mutex_protect.unlock();
				if (m_bSSLConnection)
				{
					QSslSocket * psslsock = qobject_cast<QSslSocket *>(sock_client);
					assert(psslsock!=NULL);
					QString strCerPath =  QCoreApplication::applicationDirPath() + "/svr_cert.pem";
					QString strPkPath =  QCoreApplication::applicationDirPath() + "/svr_privkey.pem";
					psslsock->setLocalCertificate(strCerPath);
					psslsock->setPrivateKey(strPkPath);
					connect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection);
					psslsock->startServerEncryption();
				}
				emit evt_NewClientConnected(sock_client);
				emit evt_Message(sock_client,"Info>" +  QString(tr("Client Accepted.")));
			}
			else
				sock_client->deleteLater();
		}

	}

2.2.3 数据接收

在成功创建了套接字后, 数据的收发都在传输线程中执行了.当套接字收到数据后,简单的触发事件

evt_Data_recieved

	void zp_netTransThread::new_data_recieved()
	{
		QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
		if (pSock)
		{
			QByteArray array = pSock->readAll();
			int sz = array.size();
			g_mutex_sta.lock();
			g_bytesRecieved +=sz;
			g_secRecieved += sz;
			g_mutex_sta.unlock();
			emit evt_Data_recieved(pSock,array);
		}
	}

2.2.4数据发送

虽然Qt的套接字本身具备缓存,塞入多大的数据都会成功, 可是本实现仍旧使用额外的队列, 每次缓存一个固定长度的片段并顺序发送. 这种优点,是能够给代码使用者一个机会,来增加代码检查缓冲区的大小,并作一些持久化的工作. 比方,队列超过100MB后,就把兴许的数据缓存在磁盘上, 而不是继续放在内存中,

实现这个策略的变量是两个缓存.

		//sending buffer, hold byteArraies.
		QMap<QObject *,QList<QByteArray> > m_buffer_sending;

		QMap<QObject *,QList<qint64> > m_buffer_sending_offset;

第一个缓存存储各个套接字的队列.还有一个存储各个数据块的发送偏移. 这样做是有性能缺陷的, 更好的办法是从 QTcpSocket 派生自己的类,并把各个套接字的缓存直接存储在派生类实例中去. 在本实现中, 直接使用了 QTcpSocket和QSSLSocket类, 因而有一定的性能损失.

一个槽方法  SendDataToClient 负责接受发送数据的请求.

	void zp_netTransThread::SendDataToClient(QObject * objClient,QByteArray   dtarray)
	{
		m_mutex_protect.lock();
		if (m_clientList.find(objClient)==m_clientList.end())
		{
			m_mutex_protect.unlock();
			return;
		}
		m_mutex_protect.unlock();
		QTcpSocket * pSock = qobject_cast<QTcpSocket*>(objClient);
		if (pSock&&dtarray.size())
		{
			QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];
			QList<qint64> & list_offset = m_buffer_sending_offset[pSock];
			if (list_sock_data.empty()==true)
			{
				qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));
				if (bytesWritten < dtarray.size())
				{
					list_sock_data.push_back(dtarray);
					list_offset.push_back(bytesWritten);
				}
			}
			else
			{
				list_sock_data.push_back(dtarray);
				list_offset.push_back(0);
			}
		}
	}

在上面的函数中,将检查队列是否为空.为空的话,将触发 QTcpSocket::write方法发出m_nPayload大小的数据块.当这些数据块发送完成,将触发QTcpSocket::bytesWritten事件,由以下的槽响应.

	/**
	 * @brief this slot will be called when internal socket successfully
	 * sent some data. in this method, zp_netTransThread object will check
	 * the sending-queue, and send more data to buffer.
	 *
	 * @param wsended
	 */
	void zp_netTransThread::some_data_sended(qint64 wsended)
	{
		g_mutex_sta.lock();
		g_bytesSent +=wsended;
		g_secSent += wsended;
		g_mutex_sta.unlock();
		QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
		if (pSock)
		{
			emit evt_Data_transferred(pSock,wsended);
			QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];
			QList<qint64> & list_offset = m_buffer_sending_offset[pSock];
			while (list_sock_data.empty()==false)
			{
				QByteArray & arraySending = *list_sock_data.begin();
				qint64 & currentOffset = *list_offset.begin();
				qint64 nTotalBytes = arraySending.size();
				assert(nTotalBytes>=currentOffset);
				qint64 nBytesWritten = pSock->write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad));
				currentOffset += nBytesWritten;
				if (currentOffset>=nTotalBytes)
				{
					list_offset.pop_front();
					list_sock_data.pop_front();
				}
				else
					break;
			}
		}
	}

2.2.5 其它工作

     在传输终止后, 会进行一定的清理. 对于多线程的传输,最重要的是确保各个对象的生存期. 有兴趣的读者能够使用 sharedptr来管理动态分配的对象, 这样操作起来会非常方便. 在本范例中, 全部代码均进行了 7*24 调试.

    下一章,将介绍流水线线程池的原理和实现.

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/115839.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 项目章程的内容有哪些?[通俗易懂]

    项目章程的内容有哪些?[通俗易懂]项目章程的内容有:1.项目目的或者批准项目的原因;2.可测量的项目目标和相关的成功标准;3.项目的总体要求;4.概括性的项目描述;5.项目的主要风险;6.总体里程碑进度计划;

  • 操作系统实验:银行家算法C语言实现

    操作系统实验:银行家算法C语言实现银行家算法C语言实现#include<stdio.h>#include<stdlib.h>#defineok1#definetrue1#definefalse0intclaim[100][100]={0};//各个进程需要的最大资源数量intalloc[100][100]={0};//各个进程已分配的资源数量intneed[100][100]={0};//各个进程还需要的资源数量intresource[100]={

  • html遮罩层动画制作,flash简单制作遮罩动画效果[通俗易懂]

    html遮罩层动画制作,flash简单制作遮罩动画效果[通俗易懂]flash简单制作遮罩动画效果QQ空间的开机动画大家应该都有,从最初的出现的一点到后面全部出现,如此神奇的效果到底是怎么做的呢,一起来看看吧!遮罩特效:由于百度只能上传500k以内的照片,所以效果图片质量不是很好,当然,我们一般做的特效是.swf,这里是为了方便大家观看,所以做成了gif.步骤:1、打开flash面板,创建新项目。2、点击文件——导入——导入一张图片。3、窗口——库,这里我们可以…

  • python anaconda和pycharm的区别_质量度三者关系

    python anaconda和pycharm的区别_质量度三者关系哈喽~大家好呀Python作为深度学习和人工智能学习的热门语言,你们知道Python、Pycharm、Anaconda三者之间的关系吗?学习一门语言,除了学会其简单的语法之外还需要对其进行运行和实现,才能实现和发挥其功能和作用。下面来介绍运行Python代码常用到的工具总结。一.Python、Pycharm、Anaconda关系介绍1.PythonPython是一种跨平台的计算机程序语言。是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言。最初被设计用于编写自动…

  • 括号匹配问题 栈c语言(c语言栈实现括号匹配)

    例如:{}[()]、{[()]}、()[]{}这种大中小括号成对出现(位置不限)则为括号匹配,反之则不匹配,如{()[接下来看一下实现方式栈的定义以及相关操作//栈的定义typedefstruct{ charelem[stack_size]; inttop;}seqStack;//栈的初始化voidinitStack(seqStack*s){ s->top=-…

  • Softmax classifier[通俗易懂]

    Softmax classifier[通俗易懂]Softmaxclassifier原文链接SVM是两个常见的分类器之一。另一个比较常见的是Softmax分类器,它具有不同的损失函数。如果你听说过二分类的Logistic回归分类器,那么Softmax分类器就是将其推广到多个类。不同于SVM将 f(xi,W) 的输出结果 (为校准,可能难以解释)作为每个分类的评判标准,Softmax分类器给出了一个稍直观的输出(归一化的类概率),并且也有

    2022年10月21日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号