《C++ Concurrency in Action》读书笔记三 同步并发操作

本章要点

*等待事件

*使用futures等待一次性事件(waiting for one-off events with futures)

*等待时间限制

*使用同步操作来简化代码

这章主要描述了如何使用条件变量和futures来等待事件,以及如何使用他们来使线程同步操作更加简化.

CP4

1. 等待事件或者其他条件

a.如果一个线程需要等待另外一个线程处理的结果,可以采取不停的检测共享资源,等另外一个线程完成特定操作以后会修改共享资源。该线程检测到修改以后执行后续的操作,但这种方法需要不停的检测共享资源效率低下会浪费大量的系统资源。

b.使用std::this_thread::sleep_for()让等待线程sleep一会儿

bool flag;
std::mutex m;

void wait_for_flag()
{
    std::unique_lock<std::mutex> lk(m);
    while(!flag)
    {
        lk.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        lk.lock();
    }
}

但是这种方法有弊端,太短的sleep时间会浪费系统资源,太长的sleep时间会导致目标线程完成了工作以后等待线程依然在sleep.

c. 使用条件变量(condition variable)完成特定条件以后唤醒sleep中的线程

1) std::condition_variable  std::condition_variable_any.  需要#include <condition_variable>

前者需要配合std::mutex来使用,后者可以配合任意类mutex对象。

#include <queue>
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <sstream>

std::mutex mut;
std::queue<int> data_queue;
std::condition_variable data_cond;

//bool more_data_to_prepared();

void data_preparation_thread()
{
    for(int i = 0; i<= 99; i++)
    {
        int data = i;
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);
        data_cond.notify_one();
    }
}

void data_processing_thread()
{
    while(true)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(
            lk, []{return !data_queue.empty();});
        int data = data_queue.front();
        data_queue.pop();
        lk.unlock();
        std::cout<<"data poped! "<<data<<std::endl;
        if(data == 99)
            break;
    }
}

int main()
{
    std::thread t1(data_preparation_thread);
    std::thread t2(data_processing_thread);
    t1.join();
    t2.join();
    return 0;
}

使用队列(std::deque)来进行线程之间传递数据也是一种常用的方法,能减少许多同步问题和资源竞争问题。

2)使用条件变量的线程安全的队列

#include <memory>
#include <queue>
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut; //the mutex must be mutable because it will be  modified by the const function.
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {
    }
    threadsafe_queue(const threadsafe_queue& other)
    {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue = other.data_queue;
    }
    threadsafe_queue& operator=(const threadsafe_queue&) = delete;

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }

    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return false;
        value = data_queue.pop();
        return true;
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return false;
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty();});
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

一个使用条件变量来控制三个线程按照固定的顺序循环输出A,B,C的例子

#include <memory>
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>

class threads_rolling
{
private:
    std::mutex mut;
    std::condition_variable m_cond;
    char m_flag;
    bool m_firstrun;
public:
    threads_rolling()
        :m_flag('A'),
        m_firstrun(true)
    {
    }
    void thread_A()
    {
        while(true)
        {
            std::unique_lock<std::mutex> lk(mut);
            if(m_firstrun)
            {
                m_firstrun = false;
            }else
            {
                m_cond.wait(lk, [=]{return m_flag == 'A';});
            }
            m_flag = 'B';
            std::cout<<"Output from thread A!"<<std::endl;
            m_cond.notify_all();
        }

    }

    void thread_B()
    {
        while(true)
        {
            std::unique_lock<std::mutex> lk(mut);
            m_cond.wait(lk, [=]{return m_flag == 'B';});
            m_flag = 'C';
            std::cout<<"Output from thread B!"<<std::endl;
            m_cond.notify_all();
        }
    }

    void thread_C()
    {
        while(true)
        {
            std::unique_lock<std::mutex> lk(mut);
            m_cond.wait(lk, [=]{return m_flag == 'C';});
            m_flag = 'A';
            lk.unlock();
            std::cout<<"Output from thread C!"<<std::endl;
            m_cond.notify_all();
        }
    }

};

int main()
{
    threads_rolling threads;
    std::thread t1(&threads_rolling::thread_A, &threads);
    std::thread t2(&threads_rolling::thread_B, &threads);
    std::thread t3(&threads_rolling::thread_C, &threads);
    t1.join();
    t2.join();
    t3.join();
    //use the ctrl+c to exit the test program!
    return 0;
}

2.使用futures来等待一次性的事件

std::furute<> 仅有一个future关联一个event

std::shared_future<>  多个future可以关联同一个event

1)从后台任务返回值

a.从一个异步任务返回值std::async() 语法同std::thread()

#include <memory>
#include <queue>
#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <condition_variable>

int find_the_answer_to_ltuae();

void do_other_stuff();

int main()
{
    std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
    do_other_stuff();
    std::cout<<"The answer is "<<the_answer.get()<<std::endl;
    return 0;
}

std:async()的基本用法

struct X
{
    void foo(int, std::string const&);
    std::string bar(std::string const&);
};

X x;
auto f1 = std::async(&X::foo, &x, 42, "Hello");
auto f2 = std::async(&X::bar, x, "goodbye");

struct Y
{
    double operator()(double);
};
Y y;
auto f3 = std::async(Y(), 3.141);
auto f4 = std::async(std::ref(y), 2.718);
X baz(X&);
std::async(baz, std::ref(X));

class move_only
{
public:
    move_only();
    move_only(move_only&&);
    move_only(move_only const&) = delete;
    move_only& operator = (move_only&&);
    move_only& operator = (move_only const&) = delete;

    void operator()();
};
auto f5 = std::async(move_only());
auto f6 = std::async(std::launch::async, Y(), 1.2); // run in new thread.
auto f7 = std::async(std::launch::deferred, baz, std::ref(x));  //run in wait() or get()
auto f8 = std::async(
        std::launch::deferred | std::launch::async,
        baz, std::ref(x));  //Implementation chooses
auto f9 = std::async(baz, std::ref(x));
f7.wait();  //invoke deferred function.

2)使用future来联合任务

std::packaged_task 包装一个可调用的对象,并且允许异步获取该可调用对象产生的结果,从包装可调用对象意义上来讲,std::packaged_task 与 std::function 类似,只不过 std::packaged_task 将其包装的可调用对象的执行结果传递给一个 std::future 对象(该对象通常在另外一个线程中获取 std::packaged_task 任务的执行结果)。

std::packaged_task 对象内部包含了两个最基本元素,一、被包装的任务(stored task),任务(task)是一个可调用的对象,如函数指针、成员函数指针或者函数对象,二、共享状态(shared state),用于保存任务的返回值,可以通过 std::future 对象来达到异步访问共享状态的效果。

可以通过 std::packged_task::get_future 来获取与共享状态相关联的 std::future 对象。在调用该函数之后,两个对象共享相同的共享状态,具体解释如下:

std::packaged_task 对象是异步 Provider,它在某一时刻通过调用被包装的任务来设置共享状态的值。

std::future 对象是一个异步返回对象,通过它可以获得共享状态的值,当然在必要的时候需要等待共享状态标志变为 ready.

#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <chrono>

int countdown(int start, int end)
{
    for(int i = start; i!= end; --i)
    {
        std::cout<<i<<std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::cout<<"Finished!\n";
    return start - end;
}

int main()
{
    std::packaged_task<int(int, int)> task(countdown);
    std::future<int> ret = task.get_future();

    std::thread th(std::move(task), 10, 0);

    int value = ret.get();

    std::cout<< "The countdown lasted for " <<value <<" seconds.\n";
    th.join();

    return 0;
}

std::packaged_task<> 的模板参数是函数签名 类似 int(std::string&, double*) 标示参数类型和返回类型。例子

template<>
class packaged_task<std::string(std::vector<char>*, int)>
{
public:
    template<typename Callable>
    explicit packaged_task(Callable&& f);
    std::future<std::string> get_future();
    void operator()(std::vector<char>*, int);
};

std::packaged_task成为一个函数对象,std::function, 可以作为参数传递给std::thread

一个在线程之间传递任务的例子

#include <deque>
#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()>> tasks;

bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread()
{
    while(!gui_shutdown_message_received())
    {
        get_and_process_gui_message();
        std::packaged_task<void()> task;
        {
            std::lock_guard<std::mutex> lk(m);
            if(tasks.empty())
                continue;
            task = std::move(tasks.front());
            tasks.pop_front();
        }
        task();
    }
}

std::thread gui_bg_thread(gui_thread);

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
    std::packaged_task<void()> task(f);
    std::future<void> res = task.get_future();
    std::lock_guard<std::mutex> lk(m);
    tasks.push_back(std::move(task));
    return res;
}

3)使用std::promise

promise 对象可以保存某一类型 T 的值,该值可被 future 对象读取(可能在另外一个线程中),因此 promise 也提供了一种线程同步的手段。在 promise 对象构造时可以和一个共享状态(通常是std::future)相关联,并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值。

可以通过 get_future 来获取与该 promise 对象相关联的 future 对象,调用该函数之后,两个对象共享相同的共享状态(shared state)

promise 对象是异步 Provider,它可以在某一时刻设置共享状态的值。

future 对象可以异步返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标志变为 ready,然后才能获取共享状态的值。

#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <utility>

void print_string(std::future<std::string>& fut)
{
    std::string str = fut.get();
    std::cout<<"The string is: "<<str<<std::endl;
}

int main()
{
    std::promise<std::string> prom;
    std::future<std::string> fut = prom.get_future(); // associate with the future object.
    std::thread t(print_string, std::ref(fut));
    prom.set_value("Hello world!");
    t.join();

    return 0;
}

4)在future中保存异常

some_promise.set_exception(std::current_exception());

一个使用std::promised 单线程来管理多连接的例子

#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <utility>

void process_connections(connection_set & connections)
{
    while(!done(connections))
    {
        for(connection_iterator connection = connections.begin(), end = connections.end();
            connection! = end;
            ++connection)
        {
            if(connection->has_incoming_data())
            {
                data_packet data = connection->incoming();
                std::promise<payload_type>& p = connection->get_promise(data.id);
                p.set_value(data.payload);
            }
            if(connection->has_outgoing_data())
            {
                outgoing_packet data = connection->top_of_outgoing_queue();
                connection->send(data.payload);
                data.promise.set_value(true);
            }

        }
    }
}

5) 等待多个线程

多个线程同时访问一个std::future 会产生资源竞争的问题因为future的get()函数只能被调用一次,只后不会再有返回对象。

如果多个线程需要等待同一个事件的话,使用std::shared_future

std::promise<int> p;
std::future<int> f(p.get_future());
std::shared_future<int> sf(std::move(f));

4. 等待一个时间限制

有的时候客户不想等待,需要设定一个时间上限。

condition variable的两个成员

wait_for()

wait_until()

1)clocks

clock是一个类并且具有以下特诊

* 当前的时间  std::chrono::system_clock::now()

* 一个用来表示时间的值 some_clock::time_point

* 时钟的触发 std::ratio<1, 25>  表示1秒触发25次

* steady clock (稳固时钟)触发频率是稳定不变不可调的

std::chrono::high_resolution_clock

2)Durations

std::chrono::duration<60,1>    // 1minute

std::chrono::duration<1, 1000>  //  1ms

std::chrono::milliseconds ms(54802);

std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms); // convert from ms to seconds.

std::chrono::milliseconds(1234).count() = 1234

std::future<int> f = std::async(some_task);

if(f.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready)

do_something_with(f.get());

3) time point 时间点

std::chrono::time_point<> 第一个参数是clock ,第二个参数是duration

std::chrono::time_point<std::chrono::system_clock, std::chrono::minutes> 参考系统时间,根据分钟来衡量

使用时间限制来等待条件变量的例子

#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <chrono>
#include <condition_variable>

std::condition_variable cv;
bool done;
std::mutex m;

bool wait_loop()
{
    auto const timeout = std::chrono::steady_clock::now() +
        std::chrono::milliseconds(500);
    std::unique_lock<std::mutex> lk(m);
    while(!done)
    {
        if(cv.wait_until(lk, timeout) == std::cv_status::timeout)
            break;
    }
    return done;
}

4)支持timeout的函数

4. 使用同步操作来简化代码

1)使用futures的编程

函数的返回值只和参数的类型和参数值有关,与其他任何状态无关。不发生共享资源的修改,不会参数资源竞争条件

一个快速排序的例子

template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
    if(input.empty())
    {
        return input;
    }
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const* pivot = *result.begin();

    auto divide_point = std::partition(input.begin(), input.end(),
        [&](T const& t){return t<pivot;});

    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

    auto new_lower(sequential_quick_sort(std::move(lower_part)));
    auto new_higher(sequential_quick_sort(std::move(input)));

    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower);
    return result;
}

使用future改成了并行排序

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if(input.empty())
    {
        return input;
    }
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const* pivot = *result.begin();

    auto divide_point = std::partition(input.begin(), input.end(),
        [&](T const& t){return t<pivot;});

    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

    std::future<std::list<T>> new_lower(std::async(¶llel_quick_sort<T>, std::move(lower_part)));
    //auto new_lower(parallel_quick_sort(std::move(lower_part)));
    auto new_higher(parallel_quick_sort(std::move(input)));

    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower.get());
    return result;
}
template<typename F, typename A>
std::future<std::result_of<F(A&&)::type> spawn_task(F&& f, A&& a)
{
    typedef std::result_of<F(A&&)>::type result_type;
    std::packaged_task<result_type(A&&)> task(std::move(f));
    std::future<result_type> res(task.get_future());
    std::thread  t(std::move(task), std::move(a));
    t.detach();
    return res;
}

2)使用同步来传递消息

时间: 2024-11-09 04:13:37

《C++ Concurrency in Action》读书笔记三 同步并发操作的相关文章

《R实战》读书笔记三

第二章  创建数据集 本章概要 1探索R数据结构 2使用数据编辑器 3数据导入 4数据集标注 本章所介绍内容概括如下. 两个方面的内容. 方面一:R数据结构 方面二:进入数据或者导入数据到数据结构 理解数据集 一个数据集通常由一个表格组合而成,行表示观测,列表示变量.病人的数据集如表1所示. 表1 病人数据集 数据集能够反映数据结构.数据类型和内容. 数据结构 R数据结构如图2所示. 图2:R数据结构 数据结构即数据的组织方式,R数据结构包括向量.矩阵.数组.数据框和列表等. R向量 R向量是一

《你必须知道的.NET》读书笔记三:体验OO之美

一.依赖也是哲学 (1)本质诠释:"不要调用我们,我们会调用你" (2)依赖和耦合: ①无依赖,无耦合: ②单向依赖,耦合度不高: ③双向依赖,耦合度较高: (3)设计的目标:高内聚,低耦合. ①低耦合:实现最简单的依赖关系,尽可能地减少类与类.模块与模块.层次与层次.系统与系统之间的联系: ②高内聚:一方面代表了职责的统一管理,一方面又代表了关系的有效隔离: (4)控制反转(IoC):代码的控制器交由系统控制而不是在代码内部,消除组件或模块间的直接依赖: (5)依赖注入(DI): ①

《世界是数字的》读书笔记 三

<世界是数字的>读书笔记 三 第六章 软件系统 操作系统是软件中的基础层,他负责管理计算机硬件,并为其他被称作应用程序的程序运行提供支持. 6.1操作系统 操作系统控制和分配计算机资源.首先,他负责管理CPU,调度和协调当前运行的程序.操作系统通常都需要管理数十个同时运行的进程或任务. 其次,操作系统管理RAM.他把程序加载到内存中以便执行指令. 最后,操作系统管理和协调外接设备的活动. 6.2操作系统怎么工作 计算机启动时首先要加载代码,加载的过程中还要检查硬件,比如哪些设备已经接入电脑,,

悟道—位IT高管20年的职场心经(读书笔记三)

悟道--一位IT高管20年的职场心经 第三章 世事洞明皆学问 职场就是你的大半个世界 是你一辈子也读不完的一大本书 想明白一个道理, 看明白一件事儿, 你就向成功迈进了一步. 1.1  "四行"说 四行是指: 第一,  你自己得行.自己的基础的能力是必须的,得靠自己学习. 第二,  得有人说你行.需要有伯乐,实际上是你得有一个自己的圈子,并且这些人都人认同你. 第三,  说你行的人得行.自己周围的圈子,里面也必须有牛人,只有在牛人的范围内,才能突显你自己的才能. 第四,  你身子骨得行

《大型网站技术架构》读书笔记三:大型网站核心架构要素

一.性能—响应时间决定用户 (1)浏览器端: ①浏览器缓存: ②使用页面压缩: PS:Gzip压缩效率非常高,通常可以达到70%的压缩率,也就是说,如果你的网页有30K,压缩之后就变成了9K左右.想要启用Gzip压缩,提高浏览速度,可以浏览这篇文章:http://www.chinaz.com/web/2012/1017/278682.shtml ③合理布局页面: CSS:把样式表置于顶部:避免使用CSS表达式(expression_r):使用外部JavaScript和CSS:削减JavaScri

Struts2技术内幕 读书笔记三 表示层的困惑

表示层能有什么疑惑?很简单,我们暂时忘记所有的框架,就写一个注册的servlet来看看. index.jsp <form id="form1" name="form1" method="post" action="loginServlet"> <table width="357" border="0" align="center"> <t

《淘宝技术这十年》读书笔记 (三). 创造技术TFS和Tair

前面两篇文章介绍了淘宝的发展历程和Java时代的变迁: <淘宝技术这十年>读书笔记 (一).淘宝网技术简介及来源 <淘宝技术这十年>读书笔记 (二).Java时代的脱胎换骨和坚若磐石 马云说过"创新不是为了与对手竞争,而是跟明天竞争",所以这篇文章讲述淘宝的创新技术TFS和Tair及创新的产品. 该篇文章不仅仅对在读大学生非常有所帮助,因为你能从文章中看到很多你需要学习的知识,不仅仅包括数据库.计算机网络.操作系统.数据结构等基础课程:还根据时代的技术变迁讲述了

《算法导论》读书笔记(三)

本章介绍了快速排序及其算法分析,快速排序采用的是分治算法思想,对包含n个数的输入数组,最坏情况下运行时间为θ(n^2),但是平均性能相当好,期望的运行时间为θ(nlgn).另外快速排序能够就地排序(我理解是不需要引入额外的辅助空间,每次划分能确定一个元素的具体位置),在虚拟环境中能很好的工作. 1.快速排序的描述 快速排序算法采用的分治算法,因此对一个子数组A[p-r]进行快速排序的三个步骤为: (1)分解:数组A[p...r]被划分为两个(可能为空)子数组A[p...q-1]和A[q+1...

软件需求模式 读书笔记三

通过这一个月的阅读,我终于读完了<软件需求模模式>这本书,前两个读书笔记已经把这本书的几种模式介绍了,之前有基础需求模式,信息需求模式,数据实体需求模式,用户功能需求模式.这次介绍的是性能需求模式,适应性需求模式,访问控制需求模式和商业需求模式. 性能需求模式包括五种的性能的需求模式:影响时间(系统需要多少时间完成一个请求).动态容量(系统能够同时处理多少件事).吞吐量(系统处理时间的速率).静态容量(系统可以保存多少某种类型烦的实体)和可用性(什么时候系统对用户是可用的,以及多么可靠). 当