大家好,又见面了,我是全栈君。
參考这里用到了线程管理。參考:http://blog.csdn.net/calmreason/article/details/36399697
以下的两个线程共享一个消息队列,一个用来放整数到队列,一个从队列里取消息出来。
此程序在控制台不停的输出递增数字,主要是内存不会泄露
用到了多线程、ACE_Message_Queue、ACE_Message_Block、ACE_Thread_Manager::instance()->spawn等
#include <iostream> using namespace std; #include "boost/lexical_cast.hpp" using namespace boost; #include "ace/Thread_Manager.h" #include "ace/Message_Queue.h" void* create_vairous_record(void* ace_message_queue); void* get_vairous_record(void* ace_message_queue); int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { ACE_Message_Queue<ACE_MT_SYNCH>* various_record_queue = new ACE_Message_Queue<ACE_MT_SYNCH>; ACE_Thread_Manager::instance()->spawn( ACE_THR_FUNC(create_vairous_record), various_record_queue, THR_NEW_LWP | THR_DETACHED); ACE_Thread_Manager::instance()->spawn( ACE_THR_FUNC(get_vairous_record), various_record_queue, THR_NEW_LWP | THR_DETACHED); ACE_Thread_Manager::instance()->wait(); return 0; } void* create_vairous_record(void* ace_message_queue) { ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue; int i=0; while (i<10000000) { ACE_Message_Block* mbl = new ACE_Message_Block(10);//在这里创建消息 string temp = lexical_cast<string>(++i); mbl->copy(temp.c_str()); p_queue->enqueue_tail(mbl);//消息被放到队列中(用指针引用消息实体) } return nullptr; } void* get_vairous_record(void* ace_message_queue) { ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue; while (true) { ACE_Message_Block* mbl =nullptr; p_queue->dequeue_head(mbl);//消息出队,出队的消息应该在用完之后被释放 if (mbl) { cout<<mbl->rd_ptr()<<endl; mbl->release();//消息已经用完。释放消息 } } return nullptr; }
以下的程序实现:多个线程将连续整数分批放到ACE_Message_Queue中,一个消费者线程负责从中取出,并验证数据是否完整无误
#include <iostream> #include <bitset> #include <vector> #include <memory> using namespace std; #include "ace/Thread_Manager.h" #include "ace/Message_Queue.h" #include "ace/Message_Block.h" #include "ace/Task.h" #include "ace/OS.h" namespace global { const int total_number = 1000000; int task_number = 2; typedef int number_type; } class Generator_Number : public ACE_Task<ACE_MT_SYNCH> { public: Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i); virtual int open(void *args = 0 ); ~Generator_Number(void); protected: Generator_Number(const Generator_Number&); Generator_Number& operator=(const Generator_Number&); private: int svc(void); int mod_i_; }; Generator_Number::Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i):mod_i_(i) { this->msg_queue(msgq); std::cout<<"Generator_Number(const int "<<i<<")"<<std::endl; } int Generator_Number::open(void *args ) { return this->activate(THR_NEW_LWP | THR_DETACHED); } int Generator_Number::svc(void) { std::cout<<"Generator_Number("<<this->mod_i_<<")::svc()"<<std::endl; for (size_t i = this->mod_i_ ; i<global::total_number;i+=global::task_number) { ACE_Message_Block * blk = new ACE_Message_Block(20); blk->copy(reinterpret_cast<const char*>(&i),sizeof(global::number_type)); this->msg_queue()->enqueue_tail(blk); } return 0; } Generator_Number::~Generator_Number(void) { std::cout<<"~Generator_Number("<<this->mod_i_<<")"<<std::endl; } void* out_put_queue(void* all_numbers_queue1) { ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)all_numbers_queue1; bitset<global::total_number> all_number_bitset; size_t count_got_message=0; while(true) { if(!all_numbers_queue->is_empty()) { ACE_Message_Block* blk = 0; all_numbers_queue->dequeue_head(blk); all_number_bitset.set(*reinterpret_cast<global::number_type*>(blk->rd_ptr())); blk->release(); if(++count_got_message == global::total_number) { break; } } else { std::cout<<"now sleep 1"<<std::endl; ACE_Time_Value t(0,3000); ACE_OS::sleep(t); } } global::number_type check =0; bool wright_flag = true; for (size_t j=0; j!= global::total_number;++j) { if (0 == all_number_bitset[j]) { wright_flag = false; break; } } std::cout<<std::endl; std::cout<<"check result:"<<wright_flag<<std::endl; return 0; } #include "boost/timer.hpp" using namespace boost; int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { cout<<"total_number:"<<global::total_number<<endl; timer t; ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = new ACE_Message_Queue<ACE_MT_SYNCH>; vector<shared_ptr<Generator_Number>> gener_array; for (int i=0;i<global::task_number;++i) { gener_array.push_back(shared_ptr<Generator_Number>(new Generator_Number(all_numbers_queue,i))); } for (vector<shared_ptr<Generator_Number>>::const_iterator citer = gener_array.cbegin(); citer!=gener_array.cend(); ++citer) { (*citer)->open(); } ACE_Thread_Manager::instance()->spawn( ACE_THR_FUNC(out_put_queue), all_numbers_queue, THR_NEW_LWP | THR_DETACHED); ACE_Thread_Manager::instance()->wait(); cout<<t.elapsed()<<"s"<<endl; return 0; }
输出例如以下:
total_number:1000000
Generator_Number(const int 0)
Generator_Number(const int 1)
Generator_Number(0)::svc()
Generator_Number(1now sleep 1
)::svc()
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
check result:1
0.944s
~Generator_Number(0)
~Generator_Number(1)
请按随意键继续. . .
ACE_Message_Queue
高水位低水位
http://blog.163.com/ecy_fu/blog/static/4445126200964115620862/
注意事项
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/115237.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...