Qt 任务调度器

近期刚刚完成的C/S端远程监控软件,架构为1个TcpClt对应N个TcpSvr。Clt可向Svr发送指令,Svr接收到指令执行,并通过报文反馈进度。

过去,软件管理的个体是每个Svr;现在需要将若干Svr合并为一个群体,各Svr之间执行指令有先后顺序。

因此过去只需单线程一次性向各Svr发送报文,只需在界面所在线程处理。现在需要有一个调度(等待)的过程,要在子线程里处理。

需要解决的问题如下:

  1. 定时调度
  2. 跨线程更新界面。

首先看问题2:跨线程更新界面。

Qt可以用signal/slot机制,跨线程传递信号。

1 Schedler *pSchd;
2 QDialog *pDlg;
3 //...
4 connect(pSchd, SIGNAL(notify(const QString &, const QString &)), pDlg, SLOT(updateUI(const QString &, const QString &)), Qt::QueuedConnection);

其中最后一个参数Qt::QueuedConnection适用于跨线程调用,将信号传递到消息队列中,由QAppliction.exec()或QThread.exec()等函数调用。

该参数可缺省,缺省为Qt::AutoConnection。Qt::AutoConnection根据emitter/receiver是否在同一线程内,自动决定连接方式:当位于同一线程时,采用Qt::DirectConnection(直连方式,立即执行);位于不同线程时,采用Qt::QueuedConnection。

有关Qt::connectionType,QAssistant解释如下:

enum Qt::ConnectionType

This enum describes the types of connection that can be used between signals and slots. In particular, it determines whether a particular signal is delivered to a slot immediately or queued for delivery at a later time.

Constant Value Description
Qt::AutoConnection 0 (default) Same as DirectConnection, if the emitter and receiver are in the same thread. Same as QueuedConnection, if the emitter and receiver are in different threads.
Qt::DirectConnection 1 The slot is invoked immediately, when the signal is emitted.
Qt::QueuedConnection 2 The slot is invoked when control returns to the event loop of the receiver‘s thread. The slot is executed in the receiver‘s thread.
Qt::BlockingQueuedConnection 4 Same as QueuedConnection, except the current thread blocks until the slot returns. This connection type should only be used where the emitter and receiver are in different threads.Note: Violating this rule can cause your application to deadlock.
Qt::UniqueConnection 0x80 Same as AutoConnection, but the connection is made only if it does not duplicate an existing connection. i.e., if the same signal is already connected to the same slot for the same pair of objects, then the connection will fail. This connection type was introduced in Qt 4.6.
Qt::AutoCompatConnection 3 The default type when Qt 3 support is enabled. Same as AutoConnection but will also cause warnings to be output in certain situations. See Compatibility Signals and Slots for further information.

With queued connections, the parameters must be of types that are known to Qt‘s meta-object system, because Qt needs to copy the arguments to store them in an event behind the scenes. If you try to use a queued connection and get the error message:

 QObject::connect: Cannot queue arguments of type ‘MyType‘

Call qRegisterMetaType() to register the data type before you establish the connection.

When using signals and slots with multiple threads, see Signals and Slots Across Threads.

See also Thread Support in QtQObject::connect(), and qRegisterMetaType().

回到问题1:调度器需要满足以下条件:

  1. 前一个任务未执行到关键点,禁止下一个任务开始。
  2. 前一个任务执行失败,中断剩余任务。
  3. 所有任务都开始执行后,需等待任务执行结束。

根据以上原则,定义调度器如下:

class AberScheduler : public QObject
{
    Q_OBJECT
public:
    AberScheduler();
    ~AberScheduler();

    int process();    //执行调度
    int init(QList<AberInfo> infoList, const QString &cmdType);

protected:
    void sortMachine();
    int goFirst();
    int goNext();
    bool end();

    void TryGoNext();
    bool IsAllDone();

protected:
    void OnBegin();
    void OnEnd();
    void UpdateAllStatus();    //从通讯线程获取状态
    void UpdateAllUI();    //更新界面线程

signals:
   int sendAberOrder(const QString &machineName, const QString &orderType);
   int notify(const QString &machineName, const QString &progressInfo);
   int updateSingleStatus(const QString &machineName, int *next, int *status, double *progress);
   int setUIDisabled(const QString &machineName, bool bDisabled); 

private:
    QList<AberInfo> m_aberInfoList;
    int m_index;
    QString m_cmdType;
};

机器信息结构体定义如下:

struct AberInfo
{
    QString machineName;
    int machineType;
    int next;    //下一台机器是否可以开始: -1:叵;0:可;1:中断所有
    int status;    //命令执行状态: 0:未开始;1:正在执行;2:成功;3:失败
    double progress;    //进度,范围: [0.00, 1.00]
};

核心调度代码如下:

int AberScheduler::process()
{
    int ret_next = -1;
    bool bAllDone = false;

    if(this->m_infoList.count() <= 0)
    {
        return 0;
    }

    OnBegin();

    goFirst();
    UpdateAllUI();

    const AberInfo &info = this.m_infoList[this->m_index];
    emit sendOrder(info.machineName, this->m_cmdType);
    Sleep(5000);

    while(!end())
    {
        Sleep(1000);
        UpdateAllStatus();
        UpdateAllUI();

        ret_next = TryGoNext();
        if(ret_next == 0)  //可以进行下一个
        {
            int ret_go_next = goNext();
            if(ret_go_next == -1)
            {
                onEnd();
                return -1;
            }
            if(!end())
            {
                const AberInfo &info = this.m_infoList[this->m_index];
                emit sendOrder(info.machineName, this->cmd_type);
                Sleep(5000);
            }
            else
            {
                break;
            }
        }
        else if(ret_next == -1)  //中断全部
        {
            UpdateAllStatus();
            UpdateAllUI();
            onEnd();
            return 1;
        }
        bAllDone = IsAllDone();
        if(bAllDone)
        {
            break;
        }
    }

    while(!bAllDone)
    {
        Sleep(1000);
        UpdateAllStatus();
        UpdateAllUI();
        bAllDone = IsAllDone();
    }

    UpdateAllStatus();
    UpdateAllUI();
    onEnd();

}

与发送信号有关的函数:

void AberScheduler::UpdateAllStatus()
{
    //只更新index及之前的机器
    for(int i=0; i <= this->m_index && i < this->m_infoList.count(); i++)
    {
        AberInfo &info = this->m_infoList[i];
        emit UpdateStatus(this->m_cmd_type, info.machineName, &info.next, &info.status, &info.progress);
    }
}

void AberScheduler::UpdateAllUI()
{
    foreach(AberInfo info, this->m_infoList)
    {
        emit notify(info.machineInfo, info.progress, info.status);
    }
}
void AberScheduler::OnBegin() {   foreach(AberInfo info, this->m_infoList)  {    emit setDisabled(info.machineInfo, true);   } }

void AberScheduler::OnEnd() {   foreach(AberInfo info, this->m_infoList)  {    emit setDisabled(info.machineInfo, false);   } }
int AberScheduler::TryGoNext()
{
    AberInfo curInfo = this->m_infoList[m_index]; //???
    return curInfo.next;
}

使用线程包装AberScheduler,使其能适用于跨线程更新界面。

class AberThread : public QRunnable
{
public:
    AberThread();
    ~AberThread();
    void init(const QList<AberInfo> &infoList, const QString &cmdType);

public:
    void run();
    AberScheduler *schd();

private:
    AberScheduler *m_shcd;
};

void void AberThread::run(){  m_schd->process();}

界面连接signal/slot如下:

void AberDialog::doSth()
{
    QList<AberInfo> infoList;
    //...
    AberThread *bThread = new AberThread;
    bThread->init(infoList, "cmdType1");

    AberScheduler *p_schd = bThread->schd();
    connect(p_schd, SIGNAL(sendAberOrder(const QString &, const QString &)), this, SLOT(sendAberOrder(const QString &, const QString &)));
    connect(p_schd, SIGNAL(notify(const QString &, const QString &)), this, SLOT(updateSingleUI(const QString &, const QString &)));
    connect(p_schd, SIGNAL(updateSingleStatus(const QString &, int *, int *, double *)), this, SLOT(getSingleStatus(const QString &, int *, int *, double *)));    connect(p_schd, SIGNAL(setUIDisabled(const QString &, bool)), this, SLOT(setUIDisabled(const QString &, bool)));

    QThreadPool::globalInstance()->start(bThread);
}

QTheadPool是Qt自带的线程池,只能处理QRunnable,而不能处理QThread,可设置同时运行任务的数量,超过数量的线程会在队列中排队。并且无法强制终止QRunnable。如果要实现调度的强制中断,可以采用类似全局变量的方式,在process循环中读取g_flag,若为false,则终止调度(代码就不贴了)。



总结:

1. QRunnable包装AberScheduler,利用多线程防止界面卡死。

2. AberScheduler继承QObject,并使用signal/slot机制,对处理逻辑与界面解耦。

3. 利用QThreadPool实现多个任务同时调度。

P.S.

1. VS2008无法对QObject::connect()函数、SIGNAL宏、SLOT宏做语法检查。即使写了不存在的函数也能编译通过,但信号无法传递,此处极易出错。

时间: 2024-11-09 02:54:39

Qt 任务调度器的相关文章

《CLR via C#》之线程处理——任务调度器

<CLR via C#>之线程基础--任务调度器 <CLR via C#>之线程基础--任务调度器线程池任务调度器设置线程池限制如何管理工作者线程同步上下文任务调度器自定义TaskScheduler派生类 FCL提供了两个派生子TaskScheduler的类型:线程池任务调度器(thread pool task scheduler),和同步上下文任务调度器(synchronization context task scheduler).默认情况下都使用线程池任务调度器. 线程池任务

【Qt5开发及实例】23、Qt调试器的配置,这里以Qt5为例

Qt中安装断点调试 1.设置Qt的调试,首先我们得到windows的网站上下载 WDK:反正就是这个,我用的是8.1的那个 https://msdn.microsoft.com/en-us/windows/hardware/hh852365 这个是地址,下载第一个就可以了. 把这个下载下来,安装 2.Wdk安装步骤 1.双击 2.      等待一下 2.选择路径,我这里是已经安装好了,所以没得选. 3.随意 4.接受 5.下载 6.下载结束 得到的文件夹里面下载下来的东西,你们是没有Qt调试器

TaskScheduler一个.NET版任务调度器

TaskScheduler是一个.net版的任务调度器.概念少,简单易用. 支持SimpleTrigger触发器,指定固定时间间隔和执行次数: 支持CronTrigger触发器,用强大的Cron表达式满足日历形式的复杂触发规则: 支持动态添加和删除任务,可根据具体需求实现复杂逻辑: static void Main(string[] args) { Scheduler scheduler = new Scheduler(); SimpleTrigger simpleTrigger = new S

SST-超级简单任务调度器结构分析

SST(Super Simple Task) 是一个基于任务优先级.抢占式.事件驱动.RTC.单堆栈的超级简单任务调度器,它基于Rober Ward一篇论文的思想,Miro Samek用C重新编程实现的,它是QP中QK的的基本思想. QK加上状态机事件处理的方法QEP,再加上任务的注册与事件的保存与分发功能QF,再加上串口调试功能QSpy,再加上基于模型驱动的开发QM,就成了QP. 2015年,QP获得“嵌入式计算设计”顶级发明奖[1]. 1.SST层次结构 ? 2.SST总体结构 ? 发送事件

温故知新 任务调度器(定时器)

1,导入Quartz的依赖包 2,任务调度主要是三个关键点:调度器    任务    触发器 Schedule  Job   Strigger //创建任务类 1,实现Job接口 2,任务的自定义执行方法(重写) //创建调度器 ScheduleFactory scheduleFactory =new StdScheduleFactory(); Schedule schedule=scheduleFactory.getSchedule(); //创建任务 JobDetail jobDetail=

任务调度器quartz的使用

1.quartz的获取. 可參照:Quartz任务调度模型实例 2.开发思路: 要使用定时器quartz.先弄清楚三个概念:调度器.任务.触发器.开发也是依照这三个方面来开发, 1>写一个Job的实现类.里面是你自己要完毕的业务逻辑: 2>写Trigger的实现类,主要有SimpleTrigger和CronTrigger这两个子类.来决定调度方案: 当仅需触发一次或者以固定时间间隔周期运行,SimpleTrigger是最适合的选择: 而CronTrigger则能够通过Cron表达式定义出各种复

浅谈qt 布局器

在一个颜值当道的今天,无论买衣服,买车还是追星,颜值的高低已经变成了大家最看重的(不管男性女性都一样,千万别和我说你不是):而对于程序猿来说,开发一款软件,不再只注重逻辑和稳定性,美观和用户友好性也是我们不得不关注的一个重点了. 我们进入正题,今天主要和大家分享一下Qt方面关于布局管理器的使用: 一.基本概念   Qt 提供了几种在窗口部件上管理子窗口部件的基本方式.一共有3 种方法用于管理窗体上子窗口部件的布局:绝对位置法.人工布局法和布局管理器法.相比于使用固定尺寸和位置,布局提供了功能强大

Quartz任务调度器

背景:              近期项目中遇到跨区调拨商品的需求,比如A区和B区,需要判断A区或者B区某种sku是否需要从对方库调拨商品来补充货源,避免因缺失商品而出现订单延误,影响销售和对用户产生不良影响. 问题:             数据量庞大,如果当查看的时候去获取数据,那么会严重影响系统的性能,甚至导致数据库和应用服务器无法响应. 解决方案:            规定在某个时间点,最好是在晚上12点时系统自动获取需要调拨的数据,然后将数据存储到数据库中.晚上12点,用户访问量和系

ubuntu12.04下Qt调试器的使用

最近,我一直在用Qt编写C++程序,但在编写过程中遇到了问题,想用Qt Creator中的调试器调试一下,但调试时(在Qt Creator中已配置好相应的调试器)出现“ ptrace:Operation not permitted ”的错误,如下图所示: 经百度搜索,找到了问题的原因及其解决方案. 原来,在Ubuntu 11.04("Natty Narwhal")之后的版本中,一种叫做 ptrace scope 的安全机制被引入.这种机制防止用户访问当前正在运行的进程的内存和状态,这就