Cpp Concurrency In Action(读书笔记3)——同步并发操作

等待一个事件或其他条件

第一,它可以持续的检查共享数据标志(用于做保护工作的互斥量),直到另一线程完成工作时对这个标志进行重设。

第二个选择是在等待线程在检查间隙,使用 std::this_thread::sleep_for() 进行周期性的间歇:

#include <iostream>
#include <thread>
#include <mutex>
class wait_test {
	bool flag;
	std::mutex m;
public:
	wait_test(bool _flag):flag(_flag){}
	void setFlag(bool _flag)
	{
		std::unique_lock<std::mutex> lk(m);
		flag = _flag;
	}
	bool getFlag()
	{
		std::unique_lock<std::mutex> lk(m);
		return flag;
	}
	void wait_for_flag()
	{
		std::unique_lock<std::mutex> lk(m);
		while (!flag)
		{
			lk.unlock(); // 1 解锁互斥量
			std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
			lk.lock(); // 3 再锁互斥量
		}
	}
};
void funA(wait_test &wt, int &i)
{
	while (!wt.getFlag())
	{
		++i;
	}
}
void funB(wait_test &wt, int &i)
{
	std::cout << "begin\t" << i << std::endl;
	wt.wait_for_flag();//等待主线程set
	std::cout << "end\t" << i << std::endl;
}
int main()
{
	wait_test wt{ false };
	int i{ 0 };
	std::thread t1{ funA,std::ref(wt),std::ref(i) }, t2{ funB,std::ref(wt),std::ref(i) };
	t1.detach();
	t2.detach();
	wt.setFlag(true);
	system("pause");
	return 0;
}

第三个选择(也是优先的选择)是,使用C++标准库提供的工具去等待事件的发生。通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时),这种机制就称为“条件变量”(condition variable)。从概念上来说,一个条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行)终止的线程将会向等待着的线程广播“条件达成”的信息。

等待条件达成

C++标准库对条件变量有两套实现: std::condition_variable 和 std::condition_variable_any 。这两个实现都包含在 <condition_variable> 头文件的声明中。两者都需要与一个互斥量一起才能工作(互斥量是为了同步);前者仅限于与 std::mutex 一起工作,而后者可以和任何满足最低标准的互斥量一起工作,从而加上了_any的后缀。因为 std::condition_variable_any
更加通用,这就可能从体积、性能,以及系统资源的使用方面产生额外的开销,所以 std::condition_variable 一般作为首选的类型,当对灵活性有硬性要求时,我们才会去考虑 std::condition_variable_any 。

使用 std::condition_variable 处理数据等待

#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
struct data_chunk {
	int m;
};
struct A {
	std::mutex mut;
	std::queue<data_chunk> data_queue; // 1
	std::condition_variable data_cond;
	bool more_data_to_prepare()
	{
		return data_queue.size() < 10;
	}
	bool is_last_chunk()
	{
		return data_queue.size() == 3;
	}
};
int i = 0;
data_chunk prepare_data()
{
	data_chunk r;
	r.m = ++i;
	return r;
}
void data_preparation_thread(A &a)
{
	std::cout << "preparation  begin"<< std::endl;
	while (a.more_data_to_prepare())
	{
		const data_chunk data = prepare_data();
		std::lock_guard<std::mutex> lk(a.mut);
		a.data_queue.push(data); // 2
		std::cout << "preparation notify" << std::endl;
		a.data_cond.notify_one(); // 3
	}
	std::cout << "preparation  end" << std::endl;
}
void process(const data_chunk &d)
{
	std::cout << d.m << std::endl;
}
void data_processing_thread(A &a)
{
	while (true)
	{
		std::unique_lock<std::mutex> lk(a.mut); // 4
		a.data_cond.wait(
			lk, [&a] {return !a.data_queue.empty();}); // 5
		std::cout << "process  wait end" << std::endl;
		data_chunk data = a.data_queue.front();
		a.data_queue.pop();
		lk.unlock(); // 6
		process(data);
		if (a.is_last_chunk())
			break;
	}
}
int main()
{
	A a;
	std::thread t1{ data_preparation_thread,std::ref(a) },
		t2{ data_processing_thread,std::ref(a) };
	t1.join();
	t2.join();
	system("pause");
	return 0;
}

当等待线程重新获取互斥量并检查条件时,如果它并非直接响应另一个线程的通知,这就是所谓的“伪唤醒”(spurious wakeup)。因为任何伪唤醒的数量和频率都是不确定的,这里不建议使用一个有副作用的函数做条件检查。当你这样做了,就必须做好多次产生副作用的心理准备。

用 std::unique_lock 而不使用 std::lock_guard ——等待中的线程必须在等待期间解锁互斥量,并在这这之后对互斥量再次上锁,而 std::lock_guard 没有这么灵活。

解锁 std::unique_lock 的灵活性,不仅适用于对wait()的调用;它还可以用于有待处理但还未处理的数据。

使用条件变量构建线程安全队列

使用条件变量的线程安全队列(完整版),并且融入上一段程序:

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>//头文件
template<typename T>
class threadsafe_queue
{
private:
	mutable std::mutex mut; // 1 互斥量必须是可变的
	std::queue<T> data_queue;
	std::condition_variable data_cond;
public:
	threadsafe_queue()
	{}
	threadsafe_queue(threadsafe_queue const& other)
	{
		std::lock_guard<std::mutex> lk(other.mut);
		data_queue = other.data_queue;
	}
	void push(T new_value)
	{
		std::lock_guard<std::mutex> lk(mut);
		data_queue.push(new_value);
		data_cond.notify_one();
	}
	void wait_and_pop(T& value)
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty();});
		value = data_queue.front();
		data_queue.pop();
	}
	std::shared_ptr<T> wait_and_pop()
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty();});
		std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
		data_queue.pop();
		return res;
	}
	bool try_pop(T& value)
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return false;
		value = data_queue.front();
		data_queue.pop();
		return true;
	}
	std::shared_ptr<T> try_pop()
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return std::shared_ptr<T>();
		std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
		data_queue.pop();
		return res;
	}
	bool empty() const
	{
		std::lock_guard<std::mutex> lk(mut);
		return data_queue.empty();
	}
};
threadsafe_queue<data_chunk> data_queue; // 1
void data_preparation_thread()
{
	while (more_data_to_prepare())
	{
		data_chunk const data = prepare_data();
		data_queue.push(data); // 2
	}
}
void data_processing_thread()
{
	while (true)
	{
		data_chunk data;
		data_queue.wait_and_pop(data); // 3
		process(data);
		if (is_last_chunk(data))
			break;
	}
}

使用期望等待一次性事件

C++标准库模型将这种一次性事件称为“期望” (future)。

当事件发生时(并且期望状态为就绪),这个“期望”就不能被重置。

在C++标准库中,有两种“期望”,使用两种类型模板实现,声明在头文件中: 唯一期望(uniquefutures)( std::future<> )和共享期望(shared futures)( std::shared_future<> )。后者的实现中,所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据。这种数据关联与模板有关,比如 std::unique_ptr 和 std::shared_ptr 的模板参数就是相关联的数据类型。在与数据无关的

地方,可以使用 std::future<void> 与 std::shared_future<void> 的特化模板。

带返回值的后台任务

当任务的结果你不着急要时,你可以使用 std::async 启动一个异步任务。与 std::thread 对象等待运行方式的不同, std::async 会返回一个 std::future 对象,这个对象持有最终计算出来的结果。当你需要这个值时,你只需要调用这个对象的get()成员函数;并且直到“期望”状态为就绪的情况下,线程才会阻塞;之后,返回计算结果。

使用 std::future 从异步任务中获取返回值:

#include <future>
#include <iostream>
int find_the_answer_to_ltuae()
{
	std::this_thread::sleep_for(std::chrono::milliseconds(100));
	return 10;
}
void do_other_stuff()
{
	std::this_thread::sleep_for(std::chrono::milliseconds(120));
}
int main()
{
	std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
	do_other_stuff();
	std::cout << "The answer is " << the_answer.get() << std::endl;
	system("pause");
	return 0;
}

std::async 允许你通过添加额外的调用参数,向函数传递额外的参数。当第一个参数是一个指向成员函数的指针,第二个参数提供有这个函数成员类的具体对象(不是直接的,就是通过指针,还可以包装在 std::ref 中),剩余的参数可作为成员函数的参数传入。否则,第二个和随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。

使用 std::async 向函数传递参数

#include <string>
#include <future>
#include <iostream>
struct X
{
	int m;
	void foo(int i, std::string const& s)
	{
		std::cout << s << "\t" << i << std::endl;
	}
	std::string bar(std::string const &s)
	{
		return "bar("+s+")";
	}
};
struct Y
{
	double operator()(double d)
	{
		return d + 1.1;
	}
};
X baz(X& _x)
{
	++_x.m;
	return _x;
}
class move_only
{
public:
	move_only() = default;
	move_only(move_only&&) = default;
	move_only(move_only const&) = delete;
	move_only& operator=(move_only&&) = default;
	move_only& operator=(move_only const&) = delete;
	void operator()()
	{
		std::cout << "move_only()" << std::endl;
	}
};
void fun()
{
	X x;
	auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用p->foo(42, "hello"),p是指向x的指针,指针
	auto f2 = std::async(&X::bar, x, "goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本,具体对象
	std::cout << f2.get() << std::endl;
	Y y;
	auto f3 = std::async(Y(), 3.141); // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
	std::cout << f3.get() << std::endl;
	auto f4 = std::async(std::ref(y), 2.718); // 调用y(2.718)
	std::cout << f4.get() << std::endl;
	x.m = 1;
	auto f5 = std::async(baz, std::ref(x)); // 调用baz(x)
	std::cout << f5.get().m << std::endl;
	auto f6 = std::async(move_only()); // 调用tmp(),tmp是通过std::move(move_only())构造得到
}
int main()
{
	fun();
	system("pause");
	return 0;
}

在默认情况下,这取决于 std::async 是否启动一个线程,或是否在期望等待时同步任务。在大多数情况下(估计这就是你想要的结果),但是你也可以在函数调用之前,向 std::async 传递一个额外参数。这个参数的类型是 std::launch ,还可以是std::launch::defered ,用来表明函数调用被延迟到wait()或get()函数调用时才执行, std::launch::async 表明函数必须在其所在的独立线程上执行, std::launch::deferred
| std::launch::async 表明实现可以选择这两种方式的一种。最后一个选项是默认的。当函数调用被延迟,它可能不会在运行了。

baz函数更改,方便观察效果:

X baz(X& _x,int i)
{
	_x.m=i;
	std::cout << _x.m<<"调用" << std::endl;
	return _x;
}

调用:

	auto f7 = std::async(std::launch::async, Y(), 1.2); // 在新线程上执行
	std::cout <<"f7\t"<< f7.get() << std::endl;
	auto f8 = std::async(std::launch::deferred, baz, std::ref(x),2); // 在wait()或get()调用时执行
	auto f9 = std::async(
		std::launch::deferred | std::launch::async,
		baz, std::ref(x),3); // 实现选择执行方式

	std::cout << "f9\t"<<f9.get().m << std::endl;
	auto f10 = std::async(baz, std::ref(x),4);
	f8.wait(); // 调用延迟函数,后台运行,此时如果有结果就前行,否则阻塞
	std::cout << "f8\t" << f8.get().m << std::endl;
	std::cout <<"f10\t"<< f10.get().m << std::endl;	

这不是让一个 std::future 与一个任务实例相关联的唯一方式;你也可以将任务包装入一个 std::packaged_task<> 实例中,或通过编写代码的方式,使用 std::promise<> 类型模板显示设置值。与 std::promise<> 对比, std::packaged_task<> 具有更高层的抽象,所以我们从“高抽象”的模板说起。

任务与期望

时间: 2024-08-08 05:35:16

Cpp Concurrency In Action(读书笔记3)——同步并发操作的相关文章

《C++ Concurrency in Action》读书笔记三 同步并发操作

本章要点 *等待事件 *使用futures等待一次性事件(waiting for one-off events with futures) *等待时间限制 *使用同步操作来简化代码 这章主要描述了如何使用条件变量和futures来等待事件,以及如何使用他们来使线程同步操作更加简化. CP4 1. 等待事件或者其他条件 a.如果一个线程需要等待另外一个线程处理的结果,可以采取不停的检测共享资源,等另外一个线程完成特定操作以后会修改共享资源.该线程检测到修改以后执行后续的操作,但这种方法需要不停的检

R in action读书笔记(6)-第七章:基本统计分析(中)

7.2 频数表和列联表 > library(vcd) > head(Arthritis) ID Treatment Sex Age Improved 1 57 Treated Male 27 Some 2 46 Treated Male 29 None 3 77 Treated Male 30 None 4 17 Treated Male 32 Marked 5 36 Treated Male 46 Marked 6 23 Treated Male 58 Marked 7.2.1 生成频数表

AngularJS in Action读书笔记5(实战篇)——在directive中引入D3饼状图显示

前言: "宁肯像种子一样等待  也不愿像疲惫的陀螺  旋转得那样勉强" 这是前几天在查资料无意间看到的一位园友的签名,看完后又读了两遍,觉得很有味道.后来一寻根究底才知这是出资大诗人汪国真之口,出处<她>.且抛开上下文,单从这短短几句,正恰如其分的折射出有一群人,他们穿着不那么fashion,言辞不那么犀利,但是内心某一块地方像是躁动的火山,拥有无尽的动力和激情,矢志不渝种子般投身到技术研究和心得分享当中. 或许每一次的生长都是那么悄无声息,但是无数次的坚持只是为了破土那日

R in action读书笔记(22)第十六章 高级图形进阶(下)

16.2.4 图形参数 在lattice图形中,lattice函数默认的图形参数包含在一个很大的列表对象中,你可通过trellis.par.get()函数来获取,并用trellis.par.set()函数来修改.show.settings()函数可展示当前的图形参数设置情况.查看当前的默认设置,并将它们存储到一个mysettings列表中: > show.settings() > mysettings<-trellis.par.get() 查看叠加点的默认设置值: > mysett

R in action读书笔记(19)第十四章 主成分和因子分析

第十四章:主成分和因子分析 本章内容 主成分分析 探索性因子分析 其他潜变量模型 主成分分析(PCA)是一种数据降维技巧,它能将大量相关变量转化为一组很少的不相关变量,这些无关变量称为主成分.探索性因子分析(EFA)是一系列用来发现一组变量的潜在结构的方法.它通过寻找一组更小的.潜在的或隐藏的结构来解释已观测到的.显式的变量间的关系. PCA与EFA模型间的区别 主成分(PC1和PC2)是观测变量(X1到X5)的线性组合.形成线性组合的权重都是通过最大化各主成分所解释的方差来获得,同时还要保证个

R in action读书笔记(17)第十二章 重抽样与自助法

12.4 置换检验点评 除coin和lmPerm包外,R还提供了其他可做置换检验的包.perm包能实现coin包中的部分功能,因此可作为coin包所得结果的验证.corrperm包提供了有重复测量的相关性的置换检验. logregperm包提供了Logistic回归的置换检验.另外一个非常重要的包是glmperm,它涵盖了广义线性模型的置换检验依靠基础的抽样分布理论知识,置换检验提供了另外一个十分强大的可选检验思路.对于上面描述的每一种置换检验,我们完全可以在做统计假设检验时不理会正态分布.t分

R in action读书笔记(16)第十二章 重抽样与自助法之 置换检验

第十二章:重抽样与自助法 本章,我们将探究两种应用广泛的依据随机化思想的统计方法:置换检验和自助法 12.1 置换检验 置换检验,也称随机化检验或重随机化检验. 有两种处理条件的实验,十个受试者已经被随机分配到其中一种条件(A或B)中,相应的结果变量(score)也已经被记录.实验结果如下: 如果两种处理方式真的等价,那么分配给观测得分的标签(A处理或B处理)便是任意的.为检验两种处理方式的差异,我们可遵循如下步骤: (1) 与参数方法类似,计算观测数据的t统计量,称为t0: (2) 将10个得

R in action读书笔记(10)-第八章:回归-- 异常观测值 改进措施

8.4 异常观测值 8.4.1 离群点 car包也提供了一种离群点的统计检验方法.outlierTest()函数可以求得最大标准化残差绝对值Bonferroni调整后的p值: > library(car) > outlierTest(fit) rstudent unadjusted p-value Bonferonni p Nevada 3.542929 0.00095088 0.047544 可以看到Nevada被判定为离群点(p=0.048).注意,该函数只是根据单个最大(或正或负)残差值

R in action读书笔记(6)-第七章:基本统计分析(下)

7.3相关 相关系数可以用来描述定量变量之间的关系.相关系数的符号(±)表明关系的方向(正相关或负相关),其值的大小表示关系的强弱程度(完全不相关时为0,完全相关时为1).除了基础安装以外,我们还将使用psych和ggm包. 7.3.1 相关的类型 1.Pearson.Spearman和Kendall相关 Pearson积差相关系数衡量了两个定量变量之间的线性相关程度.Spearman等级相关系数则衡 量分级定序变量之间的相关程度.Kendall’s Tau相关系数也是一种非参数的等级相关度量.