C++ 安全并发访问容器元素

C++ 安全并发访问容器元素

2014-9-24 flyfish

标准库STL的vector, deque, list等等不是线程安全的

例如

线程1正在使用迭代器(iterator)读vector

线程2正在对该vector进行插入操作,使vector重新分配内存,这样就造成线程1中的迭代器失效

STL的容器

多个线程读是安全的,在读的过程中,不能对容器有任何写入操作

多个线程可以同时对不同的容器做写入操作。

不能指望任何STL实现来解决线程难题,必须手动做同步控制.

方案1 对vector进行加锁处理

effective STL给出的Lock框架

template<typename Container> //一个为容器获取和释放互斥体的模板
class Lock
{ //框架;其中的很多细节被省略了
public:
	Lock(const Container& container) :c(container)
	{
		getMutexFor(c);
		//在构造函数中获取互斥体
	}
	~Lock()
	{
		releaseMutexFor(c);
		//在析构函数中释放它
	}
private: const Container& c;
};  

如果需要实现工业强度,需要做更多的工作。

方案2

微软的Parallel Patterns Library (PPL)

看MSDN

PPL 提供的功能

1 Task Parallelism: a mechanism to execute several work items (tasks) in parallel

任务并行:一种并行执行若干工作项(任务)的机制

2 Parallel algorithms: generic algorithms that act on collections of data in parallel

并行算法:并行作用于数据集合的泛型算法

3 Parallel containers and objects: generic container types that provide safe concurrent access to their

elements

并行容器和对象:提供对其元素的安全并发访问的泛型容器类型

示例是对斐波那契数列(Fibonacci)的顺序计算和并行计算的比较

顺序计算是

使用 STL std::for_each 算法

结果存储在 std::vector 对象中。

并行计算是

使用 PPL Concurrency::parallel_for_each 算法

结果存储在 Concurrency::concurrent_vector 对象中。

// parallel-fibonacci.cpp
// compile with: /EHsc
#include <windows.h>
#include <ppl.h>
#include <concurrent_vector.h>
#include <array>
#include <vector>
#include <tuple>
#include <algorithm>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Computes the nth Fibonacci number.
int fibonacci(int n)
{
   if(n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   __int64 elapsed;

   // An array of Fibonacci numbers to compute.
   array<int, 4> a = { 24, 26, 41, 42 };

   // The results of the serial computation.
   vector<tuple<int,int>> results1;

   // The results of the parallel computation.
   concurrent_vector<tuple<int,int>> results2;

   // Use the for_each algorithm to compute the results serially.
   elapsed = time_call([&]
   {
      for_each (a.begin(), a.end(), [&](int n) {
         results1.push_back(make_tuple(n, fibonacci(n)));
      });
   });
   wcout << L"serial time: " << elapsed << L" ms" << endl;

   // Use the parallel_for_each algorithm to perform the same task.
   elapsed = time_call([&]
   {
      parallel_for_each (a.begin(), a.end(), [&](int n) {
         results2.push_back(make_tuple(n, fibonacci(n)));
      });

      // Because parallel_for_each acts concurrently, the results do not
      // have a pre-determined order. Sort the concurrent_vector object
      // so that the results match the serial version.
      sort(results2.begin(), results2.end());
   });
   wcout << L"parallel time: " << elapsed << L" ms" << endl << endl;

   // Print the results.
   for_each (results2.begin(), results2.end(), [](tuple<int,int>& pair) {
      wcout << L"fib(" << get<0>(pair) << L"): " << get<1>(pair) << endl;
   });
}

命名空间Concurrency首字母大写,一般命名空间全是小写。

贴一个简单的示例代码

使用parallel_for_each 算法计算std::array 对象中每个元素的平方

参数分别是lambda 函数、函数对象和函数指针。

#include "stdafx.h"
#include <ppl.h>
#include <array>
#include <iostream>
using namespace Concurrency;
using namespace std;
using namespace std::tr1;

// Function object (functor) class that computes the square of its input.
template<class Ty>
class SquareFunctor
{
public:
	void operator()(Ty& n) const
	{
		n *= n;
	}
};

// Function that computes the square of its input.
template<class Ty>
void square_function(Ty& n)
{
	n *= n;
}
int _tmain(int argc, _TCHAR* argv[])
{
	// Create an array object that contains 5 values.
	array<int, 5> values = { 1, 2, 3, 4, 5 };

	// Use a lambda function, a function object, and a function pointer to
	// compute the square of each element of the array in parallel.

	// Use a lambda function to square each element.
	parallel_for_each(values.begin(), values.end(), [](int& n){n *= n;});

	// Use a function object (functor) to square each element.
	parallel_for_each(values.begin(), values.end(), SquareFunctor<int>());

	// Use a function pointer to square each element.
	parallel_for_each(values.begin(), values.end(), &square_function<int>);

	// Print each element of the array to the console.
	for_each(values.begin(), values.end(), [](int& n) {
		wcout << n << endl;
	});
	return 0;
}

在微软的concurrent_vector.h文件中有这样一句

Microsoft would like to acknowledge that this concurrency data structure implementation

is based on Intel implementation in its Threading Building Blocks ("Intel Material").

也就是微软的concurrent_vector是在Intel 的Threading Building Blocks基础上实现的。

方案3 Intel TBB(Threading Building Blocks)

Intel TBB 提供的功能

1 直接使用的线程安全容器,比如 concurrent_vector 和 concurrent_queue。

2 通用的并行算法,如 parallel_for 和 parallel_reduce。

3 模板类 atomic 中提供了无锁(Lock-free或者mutex-free)并发编程支持。

方案4

无锁数据结构支持库Concurrent Data Structures (libcds).

地址

http://sourceforge.net/projects/libcds/

下载以后里面直接有从VC2008到VC2013的编译环境,依赖于boost库

方案5 Boost

使用boost.lockfree

boost.lockfree实现了三种无锁数据结构:

1 boost::lockfree::queue

2 boost::lockfree::stack

3 boost::lockfree::spsc_queue

生产者-消费者

下面的代码实现的是

实现了一个多写生成,多消费 队列。

产生整数,并被4个线程消费

#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <iostream>

#include <boost/atomic.hpp>

boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);

boost::lockfree::queue<int> queue(128);

const int iterations = 10000000;
const int producer_thread_count = 4;
const int consumer_thread_count = 4;

void producer(void)
{
    for (int i = 0; i != iterations; ++i) {
        int value = ++producer_count;
        while (!queue.push(value))
            ;
    }
}

boost::atomic<bool> done (false);
void consumer(void)
{
    int value;
    while (!done) {
        while (queue.pop(value))
            ++consumer_count;
    }

    while (queue.pop(value))
        ++consumer_count;
}

int main(int argc, char* argv[])
{
    using namespace std;
    cout << "boost::lockfree::queue is ";
    if (!queue.is_lock_free())
        cout << "not ";
    cout << "lockfree" << endl;

    boost::thread_group producer_threads, consumer_threads;

    for (int i = 0; i != producer_thread_count; ++i)
        producer_threads.create_thread(producer);

    for (int i = 0; i != consumer_thread_count; ++i)
        consumer_threads.create_thread(consumer);

    producer_threads.join_all();
    done = true;

    consumer_threads.join_all();

    cout << "produced " << producer_count << " objects." << endl;
    cout << "consumed " << consumer_count << " objects." << endl;
}

时间: 2024-10-12 06:14:35

C++ 安全并发访问容器元素的相关文章

deque双端队列容器(对象创建,数组、迭代器方式访问,元素的赋值、插入、删除等)

deque与vector非常相似,不仅可以在尾部插入和删除元素,还可以在头部插入和删除.不过当考虑到容器元素的内存分配策略和操作性能时,deque相对vector较为有优势. 头文件 #include<deque> 创建deque对象 1)deque();//创建一个没有任何元素的deque对象. deque<int> d 2)deque(size_typen);//创建一个具有n个元素的deque对象,每个元素采用它的类型下的默认值. deque<int> d(10)

架构师养成--7.同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

并发编程(9):同步类容器与并发类容器

1.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作. 复合操作,如: 迭代(反复访问元素,遍历完容器中所有的元素) 跳转(根据指定的顺序找到当前元素的下一个元素) 条件运算 这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的就是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector.HashTble.这

线程学习--(七)单例和多线程、同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

17.并发类容器设计

并发类容器设计 1.ConcurrentHashMap:代替散列普通的hashTable,添加了复合操作支持. private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); for (Map.Entry<String, Object> m : resultMap.entrySet()) { count += (Long) m.getValue(

Struts2学习之路(五)—— 访问web元素

以下是Action类中访问web元素的示例,共有四种方法.其中第一种和第三种依赖容器,第二种和第四种利用IOC思想.前面两种取得Map类型的request.session.application:后面两种是真实类型HttpServletRequest.HttpServletSession.ServletContext的引用. 1.第一种方法: 1 package com.user.action; 2 3 import java.util.Map; 4 5 import com.opensymph

日志系统数据采集客户端的实现--并发编程容器选型

一个集中的日志系统,第三方应用每次写日志,都需要发送一个远程的rpc或者http请求,造成写日志的延时比较大. 改进的做法是:提供一个写日志调用包,第三方应用写日志时,先把日志缓存到一个线程安全的容器里,然后后台线程实时消费容器内的日志,如果有持久化的需求,就可以实时的把日志flush到文件中,然后再用另外一个线程消费文件真正把日志发送到日志服务器. 所以我们需要一个线程安全的容器,以下是常见的并发编程容器. CopyOnWriteArrayList是线程安全的, 且处理读操作不需要进行同步和加

大数据量高并发访问的数据库优化方法

一.数据库结构的设计 如果不能设计一个合理的数据库模型,不仅会增加客户端和服务器段程序的编程和维护的难度,而且将会影响系统实际运行的性能.所以,在一个系统开始实施之前,完备的数据库模型的设计是必须的. 在一个系统分析.设计阶段,因为数据量较小,负荷较低.我们往往只注意到功能的实现,而很难注意到性能的薄弱之处,等到系统投入实际运行一段时间后,才发现系统的性能在降低,这时再来考虑提高系统性能则要花费更多的人力物力,而整个系统也不可避免的形成了一个打补丁工程. 所以在考虑整个系统的流程的时候,我们必须