有一种单一写线程。多个读线程并发的场景,比方測量数据的读取与更新,消费者会比較多。生产者仅仅有一个。下面图为例:
左側是一种经典的解法,对数据整个操作加锁。为了一个写数据线程,于将全部读线程也进行加锁显然有点浪费了。于是提出读写锁(Reader/Writer Lock), 即使是使用了读写锁。其本质也是一样的。并且在POSIX下的pthread它的内部实现是基于mutex,所以它的开销更大。假设没有非常重的读操作来抵消它引入的开销,反而会引起性能的下降。已经多组測试数据来证明这一点。我自己也做了验证。得到数据例如以下 (单个写线程。20个读线程),使用读写锁反而比使用mutex要慢。
具体能够參考两个链接:
* Multi-threaded programming: efficiency of locking
这一类问题,在数据库领域有一类解决方式。被称为Multiversion Concurrency Control, 其目的是以添加数据复本保证用户每一次使用都能够用到完整的数据,但不一定是最新的数据。
再简化一点,其思想就是建立一个数据复本。专门用于写。当数据全然准备好后。切换出来供其他线程读。
原本的数据就转为下一次写使用。
即上图中右側所看到的的方式。
以这个方法,仅仅要对Writing/Reading的处理加锁就能够了。这样測试出来的性能开销由于加锁的处理时间极短,较一般Mutex和Reader/Writer Lock都要好 (最后一个算法):
具体的不展开了。另外有一些更为通用的方式,包含平衡读写的吞吐的问题,称为Spin Buffer。有兴趣能够进一步研究。
附源码例如以下供參考:
#include <pthread.h>
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <ctime>
// #define USE_MUTEX
// #define USE_RW_LOCK
// X + Y = 0
typedef struct _Data{
int x;
int y;
} Data;
namespace {
Data globalData[2] = {{1,-1}, {1,-1}};
int WriteIndex = 0;
int ReadingIndex = 1;
float globalReadingTimeCost = 0.0f;
#ifdef USE_MUTEX
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_t rwlock;
#endif
const int thread_number = 20;
}
void* write_thread(void* param) {
clock_t begin_time = std::clock();
for(int i=1; i<=1000; i++) {
globalData[WriteIndex].x = i;
globalData[WriteIndex].y = -1 * i;
usleep(1);
#ifdef USE_MUTEX
pthread_mutex_lock(&mutex);
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_rdlock(&rwlock);
#endif
ReadingIndex = WriteIndex;
WriteIndex = (WriteIndex + 1) % 2;
#ifdef USE_MUTEX
pthread_mutex_unlock(&mutex);
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_unlock(&rwlock);
#endif
usleep(600);
}
std::cout<< "[Writing Thread]" << float( std::clock () - begin_time ) / CLOCKS_PER_SEC * 1000 << std::endl;
return NULL;
}
void* read_thread(void* param) {
clock_t begin_time = std::clock();
for(int i=1; i<=20000; i++) {
#ifdef USE_MUTEX
pthread_mutex_lock(&mutex);
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_wrlock(&rwlock);
#endif
int index = ReadingIndex;
#ifdef USE_MUTEX
pthread_mutex_unlock(&mutex);
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_unlock(&rwlock);
#endif
int x = globalData[index].x;
int y = globalData[index].y;
if (x + y != 0) {
std::cout << std::endl << "Wrong data:" << x << "," << y << std::endl;
}
usleep(3);
}
std::cout<< "[Reading Thread]" << float( std::clock () - begin_time ) / CLOCKS_PER_SEC * 1000 << std::endl;
return NULL;
}
int main(void) {
int ret = 0;
pthread_t id[thread_number];
#ifdef USE_RW_LOCK
pthread_rwlock_init(&rwlock, NULL);
#endif
clock_t begin_time = std::clock();
// One writing thread
ret = pthread_create(&id[0], NULL, write_thread, NULL);
if (ret) {
std::cout << "Failed to launch writing thread." << std::endl;
return -1;
}
// Four reading threads
for (int i=1; i<thread_number; i++) {
pthread_create(&id[i], NULL, read_thread, NULL);
}
for (int i=0; i<=thread_number; i++) {
pthread_join(id[i], NULL);
}
std::cout<< "Cost:" << float( std::clock () - begin_time ) / CLOCKS_PER_SEC * 1000 << std::endl;
return 0;
}
使用例如以下方式编译測试:
g++ -std=c++11 -DUSE_MUTEX thread.cc -lpthread -o thread
有空再写篇关于多线程算法选择的文档!
时间: 2024-10-12 11:25:52