QT 线程池 + TCP 小试(一)线程池的简单实现

*免分资源链接点击打开链接http://download.csdn.net/detail/goldenhawking/4492378

很久以前做过ACE + MFC/QT 的中轻量级线程池应用,大概就是利用线程池执行客户机上的运算需求,将结果返回。ACE是跨平台重量级的通信中间件,与常见的应用程序框架需要精心契合,才能不出问题。最近想到既然QT框架本身就已经具有各类功能,何不玩一玩呢,那就开搞!这个实验的代码可以从我的资源内下载。

第一步打算实现的模式,我们需要一个设置为CPU核心数的线程池,这个线程池可以异步接受N个数据生产者传入的数据,均衡的分配处理任务,处理后的数据返回给某1个或者几个消费者。有两种均衡方法。一种是生产者粒度的均衡。同一个生产者的各批数据FIFO顺序不被打破,这需要判断,当处理线程队列中还有该生产者的数据时,不改变当前处理线程。第二种是数据粒度的并行,某个生产者传来的数据被分配到不同的线程,不保证后到的数据后被处理(也可能先到的处理的慢,后到的快)。

这种异步队列机制如果在MFC、WinAPI中,需要手工使用 Mutex 同步队列,更可恶的是分配的数据对象的生存期非常微妙,一不小心就会出红叉叉。QT首先为我们提供了信号和槽的机制,且该机制原生支持跨线程。假设我们在16核心服务器上,则使用 15个 QThread对象管理15组工作线程(留一个给主界面)。但是,如果仔细看了QT的文档,就会发现QThread的信号事件循环默认是在创建者中(很多时候就是主线程!),所以,要想让槽在子线程运行,一般是派生一个QObject的类,并把对象MoveToThread到某个QThread管理的线程上去。这样,信号和槽就是全异步FIFO了。其次,QT提供了引用计数的QByteArray封装,这个东西在参数传递的时候,速度很快,很少出现memcpy,生存期也特别容易控制。虽然C++11里有 shared_ptr<T>,但是那个东西还是需要在一开始new 一个int8型的存储区,很讨厌。

说了这么多,上关键代码。

先是线程池的封装qghthreadengine.h

[cpp] view plain copy

  1. #ifndef QGHTHREADENGINE_H
  2. #define QGHTHREADENGINE_H
  3. #include <QObject>
  4. #include <QThread>
  5. #include <QVector>
  6. #include <QList>
  7. #include <QMap>
  8. #include <QMutex>
  9. #include "qghthreadtaskitem.h"
  10. #include "qghthreadobject.h"
  11. //线程池引擎,帮助用户进行动态平衡
  12. class QGHThreadEngine : public QObject
  13. {
  14. Q_OBJECT
  15. public:
  16. QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads = 2,bool bFIFOKeep = true);
  17. ~QGHThreadEngine();
  18. protected:
  19. QVector<QThread *> m_ThreadPool;
  20. QVector<QGHThreadObject *> m_ThreadObjs;
  21. QGHThreadTaskItem * m_pThreadTaskItem;
  22. int m_nThreads;
  23. bool m_bFIFOKeep;
  24. private:
  25. //各个m_ThreadPool\m_ThreadObjs的任务数
  26. QMap<QObject *,qint32> m_map_Tasks;
  27. //m_bFIFOKeep == true 时,下面两个成员将保证非空闲的单个 data_source 将始终在单一线程处理
  28. //各个data_source 目前的处理线程
  29. QMap<QObject *,QObject *> m_map_busy_source_task;
  30. //各个data_source 目前的排队数目
  31. QMap<QObject *,int> m_map_busy_source_counter;
  32. public:
  33. void SetThreadTaskItem(QGHThreadTaskItem * pTaskItem);
  34. QList<qint32> CurrentLoad()
  35. {
  36. return m_map_Tasks.values();
  37. }
  38. public slots:
  39. void append_new(QObject * data_source, const QByteArray & data);
  40. //捕获QGHThreadObject::sig_process_finished, 以便管理data_source的 FIFO 顺序
  41. void on_sig_process_finished(QObject * data_source);
  42. signals:
  43. //************************************
  44. // Method:    do_task
  45. // FullName:  QGHThreadEngine::do_task
  46. // Access:    public
  47. // Returns:   void
  48. // Qualifier:
  49. // Parameter: QObject *     任务来源 (相同任务源的任务,在队列非空时会被安排到同一个线程处理,以确保对相同源的FIFO)
  50. // Parameter: QByteArray    任务体
  51. // Parameter: QObject *     处理任务的线程对象(QGHThreadObject)
  52. //************************************
  53. void do_task(QObject *, const QByteArray &,QObject *);
  54. };
  55. #endif // QGHTHREADENGINE_H

实现qghthreadengine.cpp:

[cpp] view plain copy

  1. #include "qghthreadengine.h"
  2. #include <assert.h>
  3. QGHThreadEngine::QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads,bool bFIFOKeep)
  4. : QObject(parent),
  5. m_nThreads(nThreads),
  6. m_pThreadTaskItem(pTaskItem),
  7. m_bFIFOKeep(bFIFOKeep)
  8. {
  9. assert(nThreads>0 && nThreads<512 && pTaskItem!=NULL);
  10. //创建固定数目的线程
  11. for (int i=0;i<nThreads;i++)
  12. {
  13. QThread * pNewThread = new QThread(this);
  14. QGHThreadObject * pNewObject = new QGHThreadObject(0,pTaskItem);
  15. //记录下来
  16. m_ThreadPool.push_back(pNewThread);
  17. m_ThreadObjs.push_back(pNewObject);
  18. m_map_Tasks[pNewObject] = 0;
  19. pNewThread->start();
  20. //把QGHThreadObject的信号、曹处理搬移到子线程内
  21. pNewObject->moveToThread(pNewThread);
  22. //连接处理完成消息
  23. connect(pNewObject,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *)));
  24. //连接处理新任务消息
  25. connect(this,SIGNAL(do_task(QObject *, const QByteArray &,QObject *)),pNewObject,SLOT(process(QObject *, const QByteArray &,QObject *)));
  26. }
  27. }
  28. QGHThreadEngine::~QGHThreadEngine()
  29. {
  30. foreach(QGHThreadObject * obj,m_ThreadObjs)
  31. {
  32. disconnect(obj,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *)));
  33. obj->deleteLater();
  34. }
  35. foreach(QThread * th ,m_ThreadPool)
  36. {
  37. disconnect(this,SIGNAL(do_task(QObject *, QByteArray,QObject *)),th,SLOT(process(QObject *, QByteArray,QObject *)));
  38. th->exit(0);
  39. th->wait();
  40. }
  41. }
  42. //负载均衡添加任务,生产者的信号要挂接到这个槽上
  43. void QGHThreadEngine::append_new(QObject * data_source, const QByteArray &  data)
  44. {
  45. QObject * pMinObj = 0;
  46. //对一批来自同一数据源的数据,使用同样的数据源处理,以免发生多线程扰乱FIFO对单个data_source的完整性
  47. if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end()&& m_bFIFOKeep==true)
  48. {
  49. m_map_busy_source_counter[data_source]++;
  50. pMinObj = m_map_busy_source_task[data_source];
  51. }
  52. else
  53. {
  54. qint32 nMinCost = 0x7fffffff;
  55. //寻找现在最空闲的一个线程
  56. for (QMap<QObject *,qint32>::iterator p = m_map_Tasks.begin();p!=m_map_Tasks.end();p++)
  57. {
  58. if (p.value()< nMinCost)
  59. {
  60. nMinCost = p.value();
  61. pMinObj = p.key();
  62. }
  63. }
  64. if (pMinObj)
  65. {
  66. m_map_busy_source_counter[data_source] = 1;
  67. m_map_busy_source_task[data_source] = pMinObj;
  68. }
  69. }
  70. if (pMinObj)
  71. {
  72. m_map_Tasks[pMinObj]++;
  73. emit do_task(data_source,data,pMinObj);
  74. }
  75. }
  76. void QGHThreadEngine::on_sig_process_finished(QObject * data_source)
  77. {
  78. if (m_map_Tasks.find(sender())!=m_map_Tasks.end())
  79. {
  80. m_map_Tasks[sender()]--;
  81. }
  82. if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end())
  83. {
  84. m_map_busy_source_counter[data_source]--;
  85. if (m_map_busy_source_counter[data_source]<=0)
  86. {
  87. m_map_busy_source_counter.remove(data_source);
  88. m_map_busy_source_task.remove(data_source);
  89. }
  90. }
  91. }

用于绑定的 qghthreadobject.h

[cpp] view plain copy

  1. #ifndef QGHTHREADOBJECT_H
  2. #define QGHTHREADOBJECT_H
  3. #include <QObject>
  4. #include "qghthreadtaskitem.h"
  5. //用于在子线程内具体承担事件循环的类,用户无需重载
  6. class QGHThreadObject:public QObject
  7. {
  8. Q_OBJECT
  9. public:
  10. QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem);
  11. ~QGHThreadObject();
  12. public:
  13. void SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem);
  14. public slots:
  15. //************************************
  16. // Method:    process
  17. // FullName:  QGHThreadObject::process
  18. // Access:    public
  19. // Returns:   void
  20. // Qualifier:
  21. // Parameter: QObject *     任务来源 (相同任务源的任务,在队列非空时会被安排到同一个线程处理,以确保对相同源的FIFO)
  22. // Parameter: QByteArray    任务体
  23. // Parameter: QObject *     处理任务的线程对象(QGHThreadObject)
  24. //************************************
  25. void process(QObject * data_source, const QByteArray &data,QObject * target);
  26. private:
  27. QGHThreadTaskItem * m_pThreadTaskItem;
  28. signals:
  29. //信号,表示一次处理已经完成。QGHThreadEngine捕获该信号,管理data_source的 FIFO 顺序
  30. void sig_process_finished(QObject * data_source);
  31. };
  32. #endif

相应实现qghthreadobject.cpp

[cpp] view plain copy

  1. #include "qghthreadobject.h"
  2. #include <assert.h>
  3. QGHThreadObject::QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem)
  4. : QObject(parent),
  5. m_pThreadTaskItem(pThreadTaskItem)
  6. {
  7. assert(pThreadTaskItem!=NULL);
  8. }
  9. QGHThreadObject::~QGHThreadObject()
  10. {
  11. }
  12. void QGHThreadObject::process(QObject * data_source, const QByteArray &data,QObject * target)
  13. {
  14. if (target==this)
  15. {
  16. m_pThreadTaskItem->run(data_source,data);
  17. emit sig_process_finished(data_source);
  18. }
  19. }
  20. void QGHThreadObject::SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem)
  21. {
  22. assert(pThreadTaskItem!=NULL);
  23. m_pThreadTaskItem = pThreadTaskItem;
  24. }

最后,是供用户重载的实际处理方法的纯虚基类qghthreadtaskitem.h

[cpp] view plain copy

  1. #ifndef QGHTHREADTASKITEM_H
  2. #define QGHTHREADTASKITEM_H
  3. #include <QObject>
  4. //用户重载该类,实现自定义方法的线程池调用
  5. class QGHThreadTaskItem:public QObject
  6. {
  7. Q_OBJECT
  8. public:
  9. QGHThreadTaskItem(QObject *parent);
  10. ~QGHThreadTaskItem();
  11. public:
  12. virtual void run(QObject * task_source, const QByteArray & data_array) = 0;
  13. };
  14. #endif

下次,继续写如何实现一个TCP链路,让这个线程池活起来

http://blog.csdn.net/goldenhawking/article/details/7854413

时间: 2024-12-18 00:12:54

QT 线程池 + TCP 小试(一)线程池的简单实现的相关文章

线程池 异步I/O线程 &lt;第三篇&gt;

在学习异步之前先来说说异步的好处,例如对于不需要CPU参数的输入输出操作,可以将实际的处理步骤分为以下三步: 启动处理: 实际的处理,此时不需要CPU参数: 任务完成后的处理: 以上步骤如果仅仅使用一个线程,当线程正在处理UI操作时就会出现“卡”的现象. 如果使用异步的处理方式,则这三步处理过程涉及到两个线程,主线程中启动第一步:第一步启动后,主线程结束(如果不结束,只会让该线程处于无作为的等待状态):第二步不需要CPU参与:第二步完成之后,在第二个线程上启动第三步:完成之后第二个线程结束.这样

[Java Performance] 线程及同步的性能 - 线程池/ThreadPoolExecutors/ForkJoinPool

线程池和ThreadPoolExecutors 虽然在程序中可以直接使用Thread类型来进行线程操作,但是更多的情况是使用线程池,尤其是在Java EE应用服务器中,一般会使用若干个线程池来处理来自客户端的请求.Java中对于线程池的支持,来自ThreadPoolExecutor.一些应用服务器也确实是使用的ThreadPoolExecutor来实现线程池. 对于线程池的性能调优,最重要的参数就是线程池的大小. 对于任何线程池而言,它们的工作方式几乎都是相同的: 任务被投放到一个队列中(队列的

线程(Thread)、线程池(ThreadPool)技术

线程:是Windows任务调度的最小单位.线程是程序中的一个执行流,每个线程都有自己的专有寄存器(栈指针.程序计数器等),但代码区是共享的,即不同的线程可以执行同样的函数,在一个应用程序中,常常需要使用多个线程来处理不同的事情,这样可以提高程序的运行效率,也不会使主界面出现无响应的情况.在这里主要介绍线程(Thread).线程池(ThreadPool)两种不同创建线程的区别 在通常的情况下,当我们需要开启一个新的线程时,我们直接通过Thread(继承自 System.Threading;)去创建

C#如何判断线程池中所有的线程是否已经完成之Demo

1 start: 2 3 System.Threading.RegisteredWaitHandle rhw = null; 4 new Action(() => 5 { 6 for (var i = 0; i < 30; i++) { 7 new Action<int>((index) => 8 { 9 System.Threading.Thread.Sleep(1000); 10 Console.WriteLine(System.Threading.Thread.Curr

java多线程系类:JUC线程池:05之线程池原理(四)(转)

概要 本章介绍线程池的拒绝策略.内容包括:拒绝策略介绍拒绝策略对比和示例 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3512947.html 拒绝策略介绍 线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施.当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭.第二,任务数量超过线程池的最大限制. 线程池共包括4种拒绝策略,它们分别是:AbortPolicy, CallerRunsPolicy, DiscardOld

C#如何判断线程池中所有的线程是否已经完成(转)

其 实很简单用ThreadPool.RegisterWaitForSingleObject方法注册一个定时检查线程池的方法,在检查线程的方法内调用 ThreadPool.GetAvailableThreads与ThreadPool.GetMaxThreads并比较两个方法返回的值是不是相等, 相等表示线池内所有的线程已经完成. //每秒检次一次线程池的状态 RegisteredWaitHandle rhw = ThreadPool.RegisterWaitForSingleObject(Auto

java多线程系类:JUC线程池:01之线程池架构

概要 前面分别介绍了"Java多线程基础"."JUC原子类"和"JUC锁".本章介绍JUC的最后一部分的内容--线程池.内容包括:线程池架构图线程池示例 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509903.html 线程池架构图 线程池的架构图如下: 1. Executor 它是"执行者"接口,它是来执行任务的.准确的说,Executor提供了execute()接口来执行

java多线程系类:JUC线程池:03之线程池原理(二)(转)

概要 在前面一章"Java多线程系列--"JUC线程池"02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明.内容包括:线程池示例参考代码(基于JDK1.7.0_40)线程池源码分析(一) 创建"线程池"(二) 添加任务到"线程池"(三) 关闭"线程池" 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.h

线程小酌之理解线程池

一.引言 在学习JAVASE部分中,我们都学习到了基本的线程创建继承THREAD类或实现Runnable接口,在正常负载情况下,为每个任务分配一个线程这种方法能够提升串行执行的性能.只要请求的导弹速率不超出服务器的请求处理能力,那么这种方法可以同时带来更快的响应性和更高的吞吐率.但是在实际开发过程中,开发环境和测试环境因数据流量并没有达到实际请求流量,并不能发现实际的问题,在生产环境中,为每个任务分配一个线程这种方法存在一些缺陷,尤其是当需要创建大量的线程时: 1.线程的生命周期的开销非常高 线