C++实现一个多线程同步方式的协同工作程序示例

多线程并发程序与协同程序其实是不同的概念。多线程并发是多个执行序同时运行,而协同程序是多个执行序列相互协作,同一时刻只有一个执行序列。今天想到的是将两者结合起来,拿现实生活中的例子来说,假设一个班级有100个学生,一个老师要批改100个学生的作业,有时老师太忙或者赶时间会叫几个同学帮忙批改,等所有同学都批改完后都交到老师手中,老师在下次上课的时候将作业本一起发给班上的学生。。。。其实在并发编程的时候也可以借鉴这一个思想和模式,特别是网络服务器开发的过程中,并发与协同经常出现,于是今天写了一个简单的程序模拟了这种情形,当然这个程序本身并没有任何意义,只是记录下这种思想,个人一直都觉得,程序开发中,思想是最为重要的,用什么语言来实现只是表现上不同,今天记录下来,日后的开发过程中,在适当地方以此思想为基础,根据项目需要进行拓展!

  1 //--------------------------------------------------------------
  2   开发工具:Visual Studio 2012
  3 //---------------------------------------------------------------
  4 //C++
  5 #include <iostream>
  6 #include <memory>
  7 #include <thread>
  8 #include <mutex>
  9 #include <condition_variable>
 10 #include <queue>
 11 #include <vector>
 12
 13 using namespace std;
 14
 15 //windows
 16 #include <windows.h>
 17
 18
 19 /************************************************
 20     [示例]实现一个多线程方式下的协同工作程序
 21
 22     当一个线程(相对的主线程)在完成一个任务的时
 23     候,有时候为了提高效率,可以充分利用多核CPU的
 24     优势可以将手中的任务分成多个部分,分发给比较
 25     空闲的辅助线程来帮助处理,并且主线程要等待所
 26     有的辅助线程都处理完成后,对所有任务进行一次
 27     汇总,才能进行下一步操作,此时就需要一个同步的
 28     多线程协同工作类。
 29 *************************************************/
 30
 31
 32 //定义一个求累积和的任务类
 33 class CSumTask
 34 {
 35 public:
 36     CSumTask(double dStart,double dEnd);
 37     ~CSumTask();
 38     double DoTask();
 39     double GetResult();
 40 private:
 41     double m_dMin;
 42     double m_dMax;
 43     double m_dResult;
 44 };
 45
 46 CSumTask::CSumTask(double dStart,double dEnd):m_dMin(dStart),m_dMax(dEnd),m_dResult(0.0)
 47 {
 48
 49 }
 50 CSumTask::~CSumTask()
 51 {
 52
 53 }
 54 double CSumTask::DoTask()
 55 {
 56
 57     for(double dNum = m_dMin;dNum <= m_dMax;++dNum)
 58     {
 59         m_dResult += dNum;
 60     }
 61     return m_dResult;
 62 }
 63
 64 double CSumTask::GetResult()
 65 {
 66     return m_dResult;
 67 }
 68
 69
 70 //定义一个任务管理者
 71 class CTaskManager
 72 {
 73 public:
 74     CTaskManager();
 75     ~CTaskManager();
 76     size_t Size();
 77     void AddTask(const std::shared_ptr<CSumTask> TaskPtr);
 78     std::shared_ptr<CSumTask> PopTask();
 79 protected:
 80     std::queue<std::shared_ptr<CSumTask>> m_queTask;
 81 };
 82
 83 CTaskManager::CTaskManager()
 84 {
 85
 86 }
 87
 88 CTaskManager::~CTaskManager()
 89 {
 90
 91 }
 92
 93 size_t CTaskManager::Size()
 94 {
 95     return m_queTask.size();
 96 }
 97
 98 void CTaskManager::AddTask(const std::shared_ptr<CSumTask> TaskPtr)
 99 {
100     m_queTask.push(std::move(TaskPtr));
101 }
102
103 std::shared_ptr<CSumTask> CTaskManager::PopTask()
104 {
105     std::shared_ptr<CSumTask> tmPtr = m_queTask.front();
106     m_queTask.pop();
107     return tmPtr;
108 }
109
110
111 //协同工作线程管理类,负责创建协同工作线程并接受来自主线程委托的任务进行处理
112 class CWorkThreadManager
113 {
114 public:
115     CWorkThreadManager(unsigned int uiThreadSum );
116     ~CWorkThreadManager();
117     bool AcceptTask(std::shared_ptr<CSumTask> TaskPtr);
118     bool StopAll(bool bStop);
119     unsigned int ThreadNum();
120 protected:
121     std::queue<std::shared_ptr<CSumTask>> m_queTask;
122     std::mutex m_muTask;
123     int m_iWorkingThread;
124     int m_iWorkThreadSum;
125     std::vector<std::shared_ptr<std::thread>> m_vecWorkers;
126
127     void WorkThread(int iWorkerID);
128     bool m_bStop;
129     std::condition_variable_any m_condPop;
130     std::condition_variable_any m_stopVar;
131 };
132
133 CWorkThreadManager::~CWorkThreadManager()
134 {
135
136 }
137 unsigned int CWorkThreadManager::ThreadNum()
138 {
139     return m_iWorkThreadSum;
140 }
141
142 CWorkThreadManager::CWorkThreadManager(unsigned int uiThreadSum ):m_bStop(false),m_iWorkingThread(0),m_iWorkThreadSum(uiThreadSum)
143 {
144     //创建工作线程
145     for(int i = 0; i < m_iWorkThreadSum;++i)
146     {
147         std::shared_ptr<std::thread> WorkPtr(new std::thread(&CWorkThreadManager::WorkThread,this,i+1));
148         m_vecWorkers.push_back(WorkPtr);
149     }
150
151 }
152
153 bool CWorkThreadManager::AcceptTask(std::shared_ptr<CSumTask> TaskPtr)
154 {
155     std::unique_lock<std::mutex>    muLock(m_muTask);
156     if(m_iWorkingThread >= m_iWorkThreadSum)
157     {
158         return false;            //当前已没有多余的空闲的线程处理任务
159     }
160     m_queTask.push(TaskPtr);
161     m_condPop.notify_all();
162     return true;
163 }
164
165  void CWorkThreadManager::WorkThread(int iWorkerID)
166  {
167      while(!m_bStop)
168      {
169          std::shared_ptr<CSumTask> TaskPtr;
170          bool bDoTask = false;
171          {
172             std::unique_lock<std::mutex>    muLock(m_muTask);
173             while(m_queTask.empty() && !m_bStop)
174             {
175                 m_condPop.wait(m_muTask);
176             }
177             if(!m_queTask.empty())
178             {
179                 TaskPtr = m_queTask.front();
180                 m_queTask.pop();
181                 m_iWorkingThread++;
182                 bDoTask = true;
183             }
184
185          }
186         //处理任务
187          if(bDoTask)
188          {
189              TaskPtr->DoTask();
190              {
191                  std::unique_lock<std::mutex>    muLock(m_muTask);
192                  m_iWorkingThread--;
193                  cout<<">>>DoTask in thread ["<<iWorkerID<<"]\n";
194              }
195          }
196          m_stopVar.notify_all();
197      }
198  }
199
200  bool CWorkThreadManager::StopAll(bool bStop)
201  {
202      {
203          std::unique_lock<std::mutex>    muLock(m_muTask);
204          while(m_queTask.size()>0 || m_iWorkingThread>0)
205          {
206              m_stopVar.wait(m_muTask);
207              cout<<">>>Waiting finish....\n";
208          }
209         cout<<">>>All task finished!\n";
210
211      }
212
213      m_bStop = true;
214      m_condPop.notify_all();
215      //等待所有线程关闭
216      for(std::vector<std::shared_ptr<std::thread>>::iterator itTask = m_vecWorkers.begin();itTask != m_vecWorkers.end();++itTask)
217      {
218         (*itTask)->join();
219      }
220      return true;
221  }
222
223
224  /**************************************
225   [示例程序说明]
226
227       每个任务对象表示求1+2+....+1000的累
228   积和,现在有2000个这样的任务,需要将每个
229   任务进行计算,然后将所有的结果汇总求和。
230       利用多线程协同工作类对象辅助完成每
231   个任务结果计算,主线程等待所有结果完成
232   后将所有结果汇总求和。
233  ****************************************/
234
235
236 int main(int arg,char *arv[])
237 {
238
239     std::cout.sync_with_stdio(true);
240     CTaskManager TaskMgr;
241     CWorkThreadManager WorkerMgr(5);
242     std::vector<std::shared_ptr<CSumTask>> vecResultTask;
243
244     for(int i = 0; i < 2000; ++i)
245     {
246         std::shared_ptr<CSumTask> TaskPtr(new CSumTask(1.0,1000.0));
247         TaskMgr.AddTask(TaskPtr);
248         vecResultTask.push_back(TaskPtr);
249     }
250
251     //
252     DWORD dStartTime = ::GetTickCount();
253     while(TaskMgr.Size()>0)
254     {
255         std::shared_ptr<CSumTask> WorkPtr = TaskMgr.PopTask();
256         if(!WorkerMgr.AcceptTask(WorkPtr))
257         {
258             //辅助线程此刻处于忙碌状态(没有空闲帮忙),自己处理该任务
259             WorkPtr->DoTask();
260             cout<<">>>DoTask in thread [0]\n";
261         }
262     }
263     WorkerMgr.StopAll(true);                    //等待所有的任务完成
264
265     //对所有结果求和
266     double dSumResult = 0.0;
267     for(std::vector<std::shared_ptr<CSumTask>>::iterator itTask = vecResultTask.begin();itTask != vecResultTask.end();++itTask)
268     {
269         dSumResult += (*itTask)->GetResult();
270     }
271
272     DWORD dEndTime = ::GetTickCount();
273     cout<<"\n[Status]"<<endl;
274     cout<<"\tEvery task result:"<<vecResultTask[0]->GetResult()<<endl;
275     cout<<"\tTask num:"<<vecResultTask.size()<<endl;
276     cout<<"\tAll result sum:"<<dSumResult;
277     cout<<"\tCast to int,result:"<<static_cast<long long>(dSumResult)<<endl;
278     cout<<"\tWorkthread num:"<<WorkerMgr.ThreadNum()<<endl;
279     cout<<"\tTime of used:"<<dEndTime-dStartTime<<" ms"<<endl;
280     getchar();
281     return 0;
282 }

时间: 2024-10-03 22:15:47

C++实现一个多线程同步方式的协同工作程序示例的相关文章

Linux多线程同步方式

当多个线程共享相同的内存时,需要确保每个线程看到一致的数据视图,当多个线程同时去修改这片内存时,就可能出现偏差,得到与预期不符合的值.为啥需要同步,一件事情逻辑上一定是有序的,即使在并发环境下:而操作系统对于多线程不会自动帮我们串行化,所以需要我们通过操作系统提供的同步方式api,结合自己的业务逻辑,利用多线程提高性能的同时,保证业务逻辑的正确性.一般而言,linux下同步方式主要有4种,原子锁,互斥量,读写锁和条件变量.下面一一介绍几种同步方式. 1. spinlock 1)  概念 spin

试着用c写了一个多线程的同步

在Java中写多线程相关的程序简单很多,在多线程中需要同步的时候,使用synchronized就行了. 最近学习c的多线程与同步,感觉实现起来,要写的代码比较多一些,这也许是因为java封装的比较好吧. 下面是今天写的一个例子,主要参考:http://hi.baidu.com/personnel/blog/item/ae87720e8b2f5aea7acbe1c6.html #include <stdio.h> #include <windows.h> #include <p

多线程编程之Windows同步方式

在Windows环境下针对多线程同步与互斥操作的支持,主要包括四种方式:临界区(CriticalSection).互斥对象(Mutex).信号量(Semaphore).事件对象(Event).下面分别针对这四种方式作说明: (1)临界区(CriticalSection) 每个进程中访问临界资源的那段代码称为临界区(临界资源是一次仅允许一个进程使用的共享资源).每次只准许一个进程进入临界区,进入后不允许其他进程进入.不论是硬件临界资源,还是软件临界资源,多个进程必须互斥地对它进行访问.Window

一个经典的多线程同步问题

上一篇<秒杀多线程第三篇原子操作 Interlocked系列函数>中介绍了原子操作在多进程中的作用,现在来个复杂点的.这个问题涉及到线程的同步和互斥,是一道非常有代表性的多线程同步问题,如果能将这个问题搞清楚,那么对多线程同步也就打下了良好的基础. 程序描述: 主线程启动10个子线程并将表示子线程序号的变量地址作为参数传递给子线程.子线程接收参数 -> sleep(50) -> 全局变量++ -> sleep(0) -> 输出参数和全局变量. 要求: 1.子线程输出的线

[一个经典的多线程同步问题]总结

针对一个经典的线程同步互斥问题,前面几篇文章提出了四种解决方案:关键段.事件.互斥量.信号量. 下面对这四种解决方案做一个总结,梳理一下知识点: 首先来看下关于线程同步互斥的概念性的知识,相信大家通过前面的文章,已经对线程同步互斥有一定的认识了,也能模糊的说出线程同步互斥的各种概念性知识,下面再列出从<计算机操作系统>一书中选取的一些关于线程同步互斥的描述.相信先有个初步而模糊的印象再看下权威的定义,应该会记忆的特别深刻. 1.线程(进程)同步的主要任务 答:在引入多线程后,由于线程执行的异步

秒杀多线程第四篇 一个经典的多线程同步问题

上一篇<秒杀多线程第三篇原子操作 Interlocked系列函数>中介绍了原子操作在多进程中的作用,如今来个复杂点的.这个问题涉及到线程的同步和相互排斥,是一道很有代表性的多线程同步问题,假设能将这个问题搞清楚,那么对多线程同步也就打下了良好的基础. 程序描写叙述: 主线程启动10个子线程并将表示子线程序号的变量地址作为參数传递给子线程.子线程接收參数 -> sleep(50) -> 全局变量++ -> sleep(0) -> 输出參数和全局变量. 要求: 1.子线程输

多线程第四篇秒杀 一个经典的多线程同步问题

前<秒杀多线程第三篇原子操作 Interlocked系列函数>中介绍了原子操作在多进程中的作用,如今来个复杂点的.这个问题涉及到线程的同步和相互排斥,是一道很有代表性的多线程同步问题,假设能将这个问题搞清楚,那么对多线程同步也就打下了良好的基础. 程序描写叙述: 主线程启动10个子线程并将表示子线程序号的变量地址作为參数传递给子线程. 子线程接收參数 -> sleep(50) -> 全局变量++ -> sleep(0) -> 输出參数和全局变量. 要求: 1.子线程输出

转--秒杀多线程第四篇 一个经典的多线程同步问题

上一篇<秒杀多线程第三篇原子操作 Interlocked系列函数>中介绍了原子操作在多进程中的作用,现在来个复杂点的.这个问题涉及到线程的同步和互斥,是一道非常有代表性的多线程同步问题,如果能将这个问题搞清楚,那么对多线程同步也就打下了良好的基础. 程序描述: 主线程启动10个子线程并将表示子线程序号的变量地址作为参数传递给子线程.子线程接收参数 -> sleep(50) -> 全局变量++ -> sleep(0) -> 输出参数和全局变量. 要求: 1.子线程输出的线

ffmpeg+sdl教程----编写一个简单的播放器6(其他的时钟同步方式)

来源:http://blog.csdn.net/mu399/article/details/5818384 在理解上一个教程的基础上,这篇教程就稍微容易理解些了,不外乎多加了两种同步方式,同步音频到视频,同步音频视频到外部时钟. 这篇教程主要是新增了不少新变量,is->video_current_pts用于保存当前视频帧的时间戳(以秒为单位),只在 video_refresh_timer函数中播放一帧视频前改变,is->video_current_pts_time单位为毫秒,在 stream_