任务池管理与执行器

1 背景

一个后台实时处理的业务平台,通常我们会根据数据的输入与输出,依据时间轴进行分解成不同阶段或不同粒度的逻辑任务,而每一个待处理的数据我们称为任务或者消息。任务之间的关系可以分为两类:a 上下游父子关系,b 可以并行运行的兄弟关系。具有上下游关系的任务集合具有逻辑或数据依赖关系,即上游任务执行完后,才能执行下游任务;具有兄弟关系的任务间逻辑上互不影响,可以并行运行。

无论是上面任一情况的业务场景,我们需要一种管理类,其职责:管理着一堆线程及其待执行的同类型任务集合。在jdk里面有现成的管理类ThreadPoolExecutor,那么在c++里面看看类似的实现吧:

2 任务与任务池

2.1任务

无论是消息或业务数据,可以抽象地表达为:

      struct data_pair
      {

         char *data;

         int len;

      }

2.2 任务池

任务的缓存用队列表达:

std::queue<data_pair*> _queue;

2.3 任务提交入口

  int CQueueThread::writeData(void *data, int len)

    {

        if (data == NULL || len <= 0) {

            return EXIT_FAILURE;

        }

        data_pair *item = new data_pair();
        item->data = (char*) malloc(len);
        assert(item->data != NULL);
        memcpy(item->data, data, len);
        item->len = len;
        _mutex.lock();
        _queue.push(item);
        _mutex.signal();

        _mutex.unlock();

        return EXIT_SUCCESS;

    }

3线程池   

3.1 线程封装

c++里面类似jdk里面Thread类的封装CThread

{

class CThread {

public:

    /**

     * 构造函数

     */

    CThread() {

        tid = 0;

        pid = 0;

    }

    /**

     * 起一个线程,开始运行

     */

    bool start(Runnable *r, void *a) {

        runnable = r;

        args = a;

        return 0 == pthread_create(&tid, NULL, CThread::hook, this);

    }

    /**

     * 等待线程退出

     */

    void join() {

        if (tid) {

            pthread_join(tid, NULL);

            tid = 0;

            pid = 0;

        }

    }

    /**

     * 得到Runnable对象

     *

     * @return Runnable

     */

    Runnable *getRunnable() {

        return runnable;

    }

    /**

     * 得到回调参数

     *

     * @return args

     */

    void *getArgs() {

        return args;

    }

    /***

     * 得到线程的进程ID

     */

    int getpid() {

        return pid;

    }

    /**

     * 线程的回调函数

     *

     */

    static void *hook(void *arg) {

        CThread *thread = (CThread*) arg;

        thread->pid = gettid();

        if (thread->getRunnable()) {

            thread->getRunnable()->run(thread, thread->getArgs());

        }

        return (void*) NULL;

    }

private:   

    /**

     * 得到tid号

     */

    #ifdef _syscall0

    static _syscall0(pid_t,gettid)

    #else

    static pid_t gettid() { return static_cast<pid_t>(syscall(__NR_gettid));}

    #endif

private:

    pthread_t tid;      // pthread_self() id

    int pid;            // 线程的进程ID

    Runnable *runnable;

    void *args;

}; 

}

3.2 线程池

并行处理的能力有线程池的个数决定,定义如下:


CThread *_thread;

int _threadCount;

4 执行器

4.1 执行启动

int CDefaultRunnable::start() {
    if (_thread != NULL || _threadCount < 1) {
        TBSYS_LOG(ERROR, "start failure, _thread: %p, threadCount: %d", _thread, _threadCount);
        return 0;
    }

    _thread = new CThread[_threadCount];
    if (NULL == _thread)
    {
        TBSYS_LOG(ERROR, "create _thread object failed, threadCount: %d", _threadCount);
        return 0;
    }

    int i = 0;
    for (; i<_threadCount; i++)
    {
        if (!_thread[i].start(this, (void*)((long)i)))
        {
          return i;
        }
    }

  return i;
}

4.2 执行

执行器包含了具体业务的执行:

    void CQueueThread::run(CThread *thread, void *args)
    {
        int threadIndex = (int)((long)(args));
        _mutex.lock();
        while(!_stop) {
            while(_stop == 0 && _queue.empty()) {
                _mutex.wait();
            }
            if (_stop) {
                break;
            }

            data_pair *item = _queue.front();
            _queue.pop();
            _mutex.unlock();
            if (item != NULL) {
                if (_handler) {
                    _handler->handleQueue(item->data, item->len, threadIndex, _args);
                }

                if (item->data) {
                    free(item->data);
                }
                free(item);
            }
            _mutex.lock();
        }

        _mutex.unlock();   

5 样例代码

 CMyHandler handler;

    CQueueThread queueThread(3, &handler, NULL);

    queueThread.start();

    char data[1024];

    for(int i=1; i<=mWriteCount; i++) {

        int len = sprintf(data, "data_%05d", i);

        queueThread.writeData(data, len+1);

    }

    queueThread.wait();

参考:

http://code.taobao.org/p/tfs/src/

时间: 2024-10-11 05:34:15

任务池管理与执行器的相关文章

Python3标准库:concurrent.futures管理并发任务池

1. concurrent.futures管理并发任务池 concurrent.futures模块提供了使用工作线程或进程池运行任务的接口.线程和进程池的API是一样的,所以应用只做最小的修改就可以在线程和进程之间顺利地切换. 这个模块提供了两种类型的类与这些池交互.执行器(executor)用来管理工作线程或进程池,future用来管理工作线程或进程计算的结果.要使用一个工作线程或进程池,应用要创建适当的执行器类的一个实例,然后向它提交任务来运行.每个任务启动时,会返回一个Future实例.需

Python的并发并行[4] -&gt; 并发 -&gt; 利用线程池启动线程

利用线程池启动线程 submit与map启动线程 利用两种方式分别启动线程,同时利用with上下文管理来对线程池进行控制 1 from concurrent.futures import ThreadPoolExecutor as tpe 2 from concurrent.futures import ProcessPoolExecutor as ppe 3 from time import ctime, sleep 4 from random import randint 5 6 def f

javascript基础修炼(7)——Promise,异步,可靠性

开发者的javascript造诣取决于对[动态]和[异步]这两个词的理解水平. 一. 别人是开发者,你也是 Promise技术是[javascript异步编程]这个话题中非常重要的,它一度让我感到熟悉又陌生,我熟悉其所有的API并能够在编程中相对熟练地运用,却对其中原理和软件设计思想感到陌生,即便我读了很多源码分析和教程也一度很难理解为什么Promise这样一个普通的类能够实现异步,也曾尝试着去按照Promise/A+规范来编写Promise,但很快便陷入了一种更大的混乱之中.直到我接触到一些软

java使用Executor(执行器)管理线程

一.一个实现了Runnable接口的类 class MyThread implements Runnable{ private static int num = 0; @Override public void run() { while(true){ synchronized(MyThread.class){ ++num; try{ Thread.sleep(500); } catch(Exception e){ System.out.println(e.toString()); } Syst

SSM框架中使用Spring的@Transactional注解进行事务管理

一 介绍 在企业级应用中,保护数据的完整性是非常重要的一件事.因此不管应用的性能是多么的高.界面是多么的好看,如果在转账的过程中出现了意外导致用户的账号金额发生错误,那么这样的应用程序也是不可接受的 数据库的事务管理可以有效地保护数据的完整性(PS:关于数据库的事务管理基础可以参考我以前写过的这篇文章:http://www.zifangsky.cn/385.html),但是原生态的事务操作需要写不少的代码,无疑是非常麻烦的.在使用了Spring框架的应用中,我们可以使用@Transactiona

RDD关键性能考量之 内存管理

<Spark快速大数据分析> 8.4.2 关键性能考量   内存管理 内存对Spark来说哟几个不同的用途,理解并调优Spark的内存使用方法 可以帮助优化Spark应用.在各个执行器进程中,内存有一下所列集中用途. RDD存储 当调用RDD的persist()或cache()方法时,这个RDD的分区会被存储到缓存区中. Spark会根据spark.stroage.memoryFraction限制用来缓存的内存占整个JVM堆空间的比例大小. 如果超出限制,旧的分区数据会被移出内存. 数据混洗与

使用Executor管理Thread对象详解

java SE5的java.util.concurrent包中的执行器(Executor)是管理Thread对象的优选方法.使用Executor管理Thread对象可以简化并发编程. Executor是在客户端和任务执行之间提供了一个间接层,与客户端直接执行任务不同,我们将使用Executor来执行任务.Executor允许你管理异步任务的执行,而无须显示地管理线程的生命周期. 线程对象知道如何运行具体的任务,它暴露了要执行的单一方法.ExecutorService(具有生命周期的Executo

java并发之线程执行器(Executor)

线程执行器和不使用线程执行器的对比(优缺点) 1.线程执行器分离了任务的创建和执行,通过使用执行器,只需要实现Runnable接口的对象,然后把这些对象发送给执行器即可. 2.使用线程池来提高程序的性能.当发送一个任务给执行器时,执行器会尝试使用线程池中的线程来执行这个任务.避免了不断创建和销毁线程导致的性能开销. 3.执行器可以处理实现了Callable接口的任务.Callable接口类似于Runnable接口,却提供了两方面的增强: a.Callable主方法名称为call(),可以返回结果

Mesos+Zookeeper+Marathon+Docker分布式集群管理最佳实践

参考赵班长的unixhot以及马亮blog 笔者QQ:572891887 Linux架构交流群:471443208 1.1Mesos简介 Mesos是Apache下的开源分布式资源管理框架,它被称为分布式系统的内核.Mesos最初是由加州大学伯克利分校的AMPLab开发,后在Twitter得到广泛使用. Mesos-Master:主要负责管理各个framework和slave,并将slave上的资源分配给各个framework. Mesos-Slave:负责管理本节点上的各个mesos-task