c++并发抽象

在线程的基础上,以任务的形式将并发抽象。设计task类,用于完成一个任务或者说动作。process类用于存储task队列,可以向process投递一个task。再设计一个consumer类,它包含一个线程以及多个process,它的proce函数循环从所有process取出一个task并执行。如果执行返回false则将该process移除。consumer有一个负载均衡的函数balancing可以在繁忙时将process转移给其它闲consumer。从用户层面来看,每个process都是一个逻辑线程,可以以较小的代价创建大量的process。在同一个process里面的操作都是同步的,不同的process可能是异步的,所以如果多个process处理共享数据则需要加锁。

  1 #ifndef PROCESS_20160503_H
  2 #define PROCESS_20160503_H
  3
  4 #include <functional>
  5 #include <vector>
  6 #include <thread>
  7 #include <string>
  8 #include <memory>
  9 #include <atomic>
 10 #include "safe_queue.h"
 11 #include "log.h"
 12
 13 struct task
 14 {
 15     task(){}
 16     task(const std::function<bool()>& t, const std::string task_name)
 17         : job(t)
 18         , name(task_name)
 19     {
 20
 21     }
 22     task(const task& t)
 23     {
 24         job = t.job;
 25         name = t.name;
 26     }
 27
 28     std::function<bool()> job;
 29     std::string name;
 30 };
 31
 32
 33 typedef safe_queue<task> taskqueue;
 34 class process;
 35 typedef std::shared_ptr<process>    ptr_process;
 36
 37 class consumer
 38 {
 39 public:
 40     const static int max_busy = 10000;
 41     const static int min_busy = -10000;
 42 public:
 43     consumer();
 44     ~consumer();
 45 public:
 46     void profile();
 47     std::string name() { return _name; }
 48     void push(ptr_process ptr);
 49     int busy() { return _busy; }
 50
 51 private:
 52     void proc();
 53     void proc_i();
 54     void balancing();
 55     bool init_busy();
 56 private:
 57     std::string _name;
 58     std::vector<ptr_process> _processes;
 59     ptr_process _chan;
 60     std::atomic_int _busy;
 61     bool _idle;
 62     task _tmptask;
 63     std::thread _thread;
 64
 65 };
 66
 67 typedef std::shared_ptr<consumer>    ptr_consumer;
 68
 69 class process
 70 {
 71 public:
 72     friend class consumer;
 73     friend class process_manager;
 74 public:
 75     process(const std::string& process_name);
 76
 77 private:
 78     process(const process&);
 79     process& operator = (const process&);
 80
 81 public:
 82     void profile();
 83
 84 public:
 85     const std::string& process_name() { return _process_name; }
 86     void run(const std::function<bool()>& t, const std::string& name);
 87     void stop();
 88
 89 private:
 90     taskqueue _tasks;
 91     std::string _process_name;
 92 };
 93
 94 class process_manager
 95 {
 96 public:
 97     friend class consumer;
 98 private:
 99     process_manager();
100     process_manager(const process_manager&);
101     process_manager& operator = (const process_manager&);
102
103 public:
104     static process_manager& instance();
105     ptr_process create_process(const std::string name);
106
107 public:
108     void profile();
109
110 private:
111     ptr_consumer get_consumer();
112     std::vector<ptr_consumer>& get_consumers();
113
114 private:
115     std::vector<ptr_consumer> _consumers;
116     unsigned int _no;
117 };
118
119
120
121
122
123
124 #endif
#include "process.h"
#include <chrono>
#include "util.h"
#include <sstream>

#if defined(WIN32)
#include <windows.h>
#else
#include<unistd.h>
#endif

int cpu_num()
{
#if defined(WIN32)
    SYSTEM_INFO info;
    ::GetSystemInfo(&info);
    return info.dwNumberOfProcessors;
#else
    return (int)sysconf(_SC_NPROCESSORS_ONLN);
#endif
}

bool consumer::init_busy()
{
    _busy = 0;
    return true;
}

consumer::consumer()
    : _idle(init_busy())
    , _chan(new process("chan"))
    , _thread([this]{
        this->proc();
    })
{
    std::ostringstream ss;
    ss<<std::this_thread::get_id();
    _name = ss.str();
    logdebug("consumer:%s constructor", _name.c_str());
}

consumer::~consumer()
{
    _thread.join();
}

void consumer::push(ptr_process ptr)
{
    _chan->run([ptr, this]{
        _processes.push_back(ptr);
        return true;
    }, std::string("push process:") + ptr->process_name()
    );
}

void consumer::proc()
{
    while (!global_exit::is_exit())
    {
        proc_i();
        if(_idle)
        {
            _busy = (_busy<min_busy)?min_busy:(_busy-1);
            std::this_thread::sleep_for(std::chrono::microseconds(10));
        }
        else
        {
            _busy = (_busy>max_busy)?max_busy:(_busy+1);
        }
        if(_busy == max_busy)
        {
            balancing();
        }
    }
}

// 每一个process都执行一次
void consumer::proc_i()
{
    _idle = true;
    if(_chan->_tasks.try_pop(_tmptask))
    {
        _tmptask.job();
    }
    for(unsigned int i = 0; i<_processes.size(); ++i)
    {
        if(_processes[i]->_tasks.try_pop(_tmptask) == taskqueue::SUCCESS)
        {
            if(!_tmptask.job())
            {
                logdebug("process task:‘%s‘ return false", _tmptask.name.c_str());
                if(i == _processes.size()-1)
                {
                    _processes.pop_back();
                }
                else
                {
                    _processes[i] = _processes[_processes.size()-1];
                    _processes.pop_back();
                }
                break;
            }
            _idle = false;
        }
    }
}

// 负载均衡 取出最忙process投递给最闲线程
void consumer::balancing()
{
    unsigned int max = 0;
    unsigned int tmp = 0;
    unsigned int index = -1;
    for(unsigned int i = 0; i<_processes.size(); ++i)
    {
        tmp = _processes[i]->_tasks.size();
        if(max<tmp)
        {
            max = tmp;
            index = i;
        }
    }
    ptr_process ptr = _processes[index];
    if(index != _processes.size()-1)
    {
        _processes[index] = _processes[_processes.size()-1];
    }
    std::vector<ptr_consumer>& css = process_manager::instance().get_consumers();
    index = -1;
    int minbusy = consumer::max_busy - 1;
    int tmpbusy = 0;
    for(unsigned int i = 0; i<css.size(); ++i)
    {
        tmpbusy = css[i]->busy();
        if(minbusy>tmpbusy)
        {
            index = i;
            minbusy = tmpbusy;
        }
    }
    if(css[index].get()!=this)
    {
        _processes.pop_back();
        css[index]->push(ptr);
    }
}

process::process(const std::string& process_name)
    : _process_name(process_name)
    , _tasks(1024*1024, 0)
{

}

void process::run(const std::function<bool()>& t, const std::string& name)
{
    if(_tasks.try_push(task(t, name)) == taskqueue::QUEUE_FULL)
    {
        logerror("taskqueue full.process_name:%s task_name:%s", _process_name.c_str(), name.c_str());
    }
}

void process::profile()
{
    loginfo("process name:%s, task size:%d", _process_name.c_str(), _tasks.size());
}

process_manager::process_manager()
{
    int num = cpu_num() + 4;
    for(int i = 0; i<num; ++i)
    {
        _consumers.push_back(ptr_consumer(new consumer));
    }
    _no = 0;
}

process_manager& process_manager::instance()
{
    static process_manager cm;
    return cm;
}

ptr_consumer process_manager::get_consumer()
{
    return _consumers[(_no++)%_consumers.size()];
}

std::vector<ptr_consumer>& process_manager::get_consumers()
{
    return _consumers;
}

void process_manager::profile()
{
    for(unsigned int i = 0; i<_consumers.size(); ++i)
    {
        _consumers[i]->profile();
    }
}

ptr_process process_manager::create_process(const std::string name)
{
    ptr_process ptr(new process(name));
    process_manager::instance().get_consumer()->push(ptr);
    return ptr;
}
时间: 2024-10-31 11:36:37

c++并发抽象的相关文章

Java并发编程学习路线

一年前由于工作需要从微软技术栈入坑Java,并陆陆续续做了一个Java后台项目,目前在搞Scala+Java混合的后台开发,一直觉得并发编程是所有后台工程师的基本功,所以也学习了小一年Java的并发工具,对整体的并发理解乃至分布式都有一定的提高,所以想和大家分享一下. 我的学习路线 首先说说学习路线,我一开始是直接上手JCIP(Java Concurrency in Practice),发现不是很好懂,把握不了那本书的主线,所以思索着从国内的作者开始先,所以便读了下方腾飞的<Java并发编程的艺

从FindBugs中学Java【一】

findbug 这里[中文列表]: http://svn.codehaus.org/sonar-plugins/tags/sonar-l10n-zh-plugin-1.1/src/main/resources/org/sonar/l10n/findbugs_zh.properties rule.findbugs.IMSE_DONT_CATCH_IMSE.name=不良实践 - 捕获可疑IllegalMonitorStateException rule.findbugs.BX_BOXING_IMM

[Effective Java]第八章 通用程序设计

第八章      通用程序设计 45.      将局部变量的作用域最小化 将局部变量的作用域最小化,可以增强代码的可读性和可维护性,并降低出错的可能性. 要使用局部变量的作用域最小化,最有力的方法就是在第一次使用它的地方才声明,不要过早的声明. 局部变量的作用域从它被声明的点开始扩展,一直到外围块的结束外.如果变量是在“使用它的块”之外被声明有,当程序退出该块之后,该变量仍是可见的,如果它在目标使用区之前或之后意外使用,将可能引发意外错误. 几乎每个局部变量的声明都应该包含一个初始化表达式,如

Scala具体解释---------Scala是什么?可伸展的语言!

Scala是什么 Scala语言的名称来自于"可伸展的语言". 之所以这样命名,是由于他被设计成随着使用者的需求而成长.你能够把Scala应用在非常大范围的编程任务上.从写个小脚本到建立个大系统. 51CTO编辑推荐:Scala编程语言专题 Scala是非常easy进入的语言. 它跑在标准的Java平台上.可以与全部的Java库实现无缝交互. 它也是用来编写脚本把Java控件链在一起的非常好的语言.可是用它来建立大系统和可重用控件的架构将更可以发挥它的力量. 从技术层面上来说,Scal

Scala详解---------Scala是什么?可伸展的语言!

Scala是什么 Scala语言的名称来自于"可伸展的语言".之所以这样命名,是因为他被设计成随着使用者的需求而成长.你可以把Scala应用在很大范围的编程任务上,从写个小脚本到建立个大系统. 51CTO编辑推荐:Scala编程语言专题 Scala是很容易进入的语言.它跑在标准的Java平台上,可以与所有的Java库实现无缝交互.它也是用来编写脚本把Java控件链在一起的很好的语言.但是用它来建立大系统和可重用控件的架构将更能够发挥它的力量. 从技术层面上来说,Scala是一种把面向对

scala语言与java的区别 (1)

scala支持关联映射,如可以用(key -> value)表示一个键值对 scala中的所有类型都是对象,包括基本数据类型 scala中的case语句用来判断接收的消息,比java中的switch...case...更专注 receive{    case msg => action()    } scala行动类实现了线程之上的并发抽象,通过发送消息相互通信,每个行动类都自动维护一个消息队列,并且实现发送消息和接收消息的基本操作. receriver ! msg scala是静态类型的,也

findBugs英文代号的对照表

findBugs错误英文翻译rule.findbugs.IMSE_DONT_CATCH_IMSE.name=不良实践 - 捕获可疑IllegalMonitorStateException rule.findbugs.BX_BOXING_IMMEDIATELY_UNBOXED.name=性能 - 基本类型包装之后立刻解包 rule.findbugs.IJU_SETUP_NO_SUPER.name=使用错误 - TestCase定义的setUp没有调用super.setUp() rule.findb

详解FindBugs的各项检测器 .

FindBugs是一个静态分析工具,在程序不需运行的情况下,分析class文件,将字节码与一组缺陷模式进行对比,试图寻找真正的缺陷或者潜在的性能问题.本文档主要详细说明FindBugs 2.0.3版本中各项检测器的作用,该版本共有156个缺陷检测器,分为11个类别. 1.       No Category(无类别) 1.1 BuildInterproceduralCallGraph 模式 - 速度 快 缺陷类别 - 类型 edu.umd.cs.findbugs.detect.BuildInte

es7,es8

ES7新特性 ES7在ES6的基础上添加了三项内容:求幂运算符(**).Array.prototype.includes()方法.函数作用域中严格模式的变更. Array.prototype.includes()方法 includes()的作用,是查找一个值在不在数组里,若在,则返回 true,反之返回 false. 基本用法: ['a', 'b', 'c'].includes('a')     // true ['a', 'b', 'c'].includes('d')     // false