在线程的基础上,以任务的形式将并发抽象。设计task类,用于完成一个任务或者说动作。process类用于存储task队列,可以向process投递一个task。再设计一个consumer类,它包含一个线程以及多个process,它的proce函数循环从所有process取出一个task并执行。如果执行返回false则将该process移除。consumer有一个负载均衡的函数balancing可以在繁忙时将process转移给其它闲consumer。从用户层面来看,每个process都是一个逻辑线程,可以以较小的代价创建大量的process。在同一个process里面的操作都是同步的,不同的process可能是异步的,所以如果多个process处理共享数据则需要加锁。
1 #ifndef PROCESS_20160503_H 2 #define PROCESS_20160503_H 3 4 #include <functional> 5 #include <vector> 6 #include <thread> 7 #include <string> 8 #include <memory> 9 #include <atomic> 10 #include "safe_queue.h" 11 #include "log.h" 12 13 struct task 14 { 15 task(){} 16 task(const std::function<bool()>& t, const std::string task_name) 17 : job(t) 18 , name(task_name) 19 { 20 21 } 22 task(const task& t) 23 { 24 job = t.job; 25 name = t.name; 26 } 27 28 std::function<bool()> job; 29 std::string name; 30 }; 31 32 33 typedef safe_queue<task> taskqueue; 34 class process; 35 typedef std::shared_ptr<process> ptr_process; 36 37 class consumer 38 { 39 public: 40 const static int max_busy = 10000; 41 const static int min_busy = -10000; 42 public: 43 consumer(); 44 ~consumer(); 45 public: 46 void profile(); 47 std::string name() { return _name; } 48 void push(ptr_process ptr); 49 int busy() { return _busy; } 50 51 private: 52 void proc(); 53 void proc_i(); 54 void balancing(); 55 bool init_busy(); 56 private: 57 std::string _name; 58 std::vector<ptr_process> _processes; 59 ptr_process _chan; 60 std::atomic_int _busy; 61 bool _idle; 62 task _tmptask; 63 std::thread _thread; 64 65 }; 66 67 typedef std::shared_ptr<consumer> ptr_consumer; 68 69 class process 70 { 71 public: 72 friend class consumer; 73 friend class process_manager; 74 public: 75 process(const std::string& process_name); 76 77 private: 78 process(const process&); 79 process& operator = (const process&); 80 81 public: 82 void profile(); 83 84 public: 85 const std::string& process_name() { return _process_name; } 86 void run(const std::function<bool()>& t, const std::string& name); 87 void stop(); 88 89 private: 90 taskqueue _tasks; 91 std::string _process_name; 92 }; 93 94 class process_manager 95 { 96 public: 97 friend class consumer; 98 private: 99 process_manager(); 100 process_manager(const process_manager&); 101 process_manager& operator = (const process_manager&); 102 103 public: 104 static process_manager& instance(); 105 ptr_process create_process(const std::string name); 106 107 public: 108 void profile(); 109 110 private: 111 ptr_consumer get_consumer(); 112 std::vector<ptr_consumer>& get_consumers(); 113 114 private: 115 std::vector<ptr_consumer> _consumers; 116 unsigned int _no; 117 }; 118 119 120 121 122 123 124 #endif
#include "process.h" #include <chrono> #include "util.h" #include <sstream> #if defined(WIN32) #include <windows.h> #else #include<unistd.h> #endif int cpu_num() { #if defined(WIN32) SYSTEM_INFO info; ::GetSystemInfo(&info); return info.dwNumberOfProcessors; #else return (int)sysconf(_SC_NPROCESSORS_ONLN); #endif } bool consumer::init_busy() { _busy = 0; return true; } consumer::consumer() : _idle(init_busy()) , _chan(new process("chan")) , _thread([this]{ this->proc(); }) { std::ostringstream ss; ss<<std::this_thread::get_id(); _name = ss.str(); logdebug("consumer:%s constructor", _name.c_str()); } consumer::~consumer() { _thread.join(); } void consumer::push(ptr_process ptr) { _chan->run([ptr, this]{ _processes.push_back(ptr); return true; }, std::string("push process:") + ptr->process_name() ); } void consumer::proc() { while (!global_exit::is_exit()) { proc_i(); if(_idle) { _busy = (_busy<min_busy)?min_busy:(_busy-1); std::this_thread::sleep_for(std::chrono::microseconds(10)); } else { _busy = (_busy>max_busy)?max_busy:(_busy+1); } if(_busy == max_busy) { balancing(); } } } // 每一个process都执行一次 void consumer::proc_i() { _idle = true; if(_chan->_tasks.try_pop(_tmptask)) { _tmptask.job(); } for(unsigned int i = 0; i<_processes.size(); ++i) { if(_processes[i]->_tasks.try_pop(_tmptask) == taskqueue::SUCCESS) { if(!_tmptask.job()) { logdebug("process task:‘%s‘ return false", _tmptask.name.c_str()); if(i == _processes.size()-1) { _processes.pop_back(); } else { _processes[i] = _processes[_processes.size()-1]; _processes.pop_back(); } break; } _idle = false; } } } // 负载均衡 取出最忙process投递给最闲线程 void consumer::balancing() { unsigned int max = 0; unsigned int tmp = 0; unsigned int index = -1; for(unsigned int i = 0; i<_processes.size(); ++i) { tmp = _processes[i]->_tasks.size(); if(max<tmp) { max = tmp; index = i; } } ptr_process ptr = _processes[index]; if(index != _processes.size()-1) { _processes[index] = _processes[_processes.size()-1]; } std::vector<ptr_consumer>& css = process_manager::instance().get_consumers(); index = -1; int minbusy = consumer::max_busy - 1; int tmpbusy = 0; for(unsigned int i = 0; i<css.size(); ++i) { tmpbusy = css[i]->busy(); if(minbusy>tmpbusy) { index = i; minbusy = tmpbusy; } } if(css[index].get()!=this) { _processes.pop_back(); css[index]->push(ptr); } } process::process(const std::string& process_name) : _process_name(process_name) , _tasks(1024*1024, 0) { } void process::run(const std::function<bool()>& t, const std::string& name) { if(_tasks.try_push(task(t, name)) == taskqueue::QUEUE_FULL) { logerror("taskqueue full.process_name:%s task_name:%s", _process_name.c_str(), name.c_str()); } } void process::profile() { loginfo("process name:%s, task size:%d", _process_name.c_str(), _tasks.size()); } process_manager::process_manager() { int num = cpu_num() + 4; for(int i = 0; i<num; ++i) { _consumers.push_back(ptr_consumer(new consumer)); } _no = 0; } process_manager& process_manager::instance() { static process_manager cm; return cm; } ptr_consumer process_manager::get_consumer() { return _consumers[(_no++)%_consumers.size()]; } std::vector<ptr_consumer>& process_manager::get_consumers() { return _consumers; } void process_manager::profile() { for(unsigned int i = 0; i<_consumers.size(); ++i) { _consumers[i]->profile(); } } ptr_process process_manager::create_process(const std::string name) { ptr_process ptr(new process(name)); process_manager::instance().get_consumer()->push(ptr); return ptr; }
时间: 2024-10-31 11:36:37