我在之前一篇博文《初探c++11 Thread库之使写多线程程序》中,着重介绍了<thread>头文件中的std::thread类以及其上的一些基本操作,至此我们动手写多线程程序已经基本没有问题了。但是,单线程的那些"坑"我们仍还不知道怎么去避免。
多线程存在的问题
多线程最主要的问题就是共享数据带来的问题。如果共享数据都是只读的,那么没问题,因为只读操作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。但是,当一个或多个线程要修改共享数据时,就会产生很多潜在的麻烦。
#include <iostream> #include <thread> long sum = 0L; void fun() { for(int i=1;i<100000;++i) sum += i; } int main() { std::cout << "Before joining,sun = " << sum << std::endl; std::thread t1(fun); std::thread t2(fun); t1.join(); t2.join(); std::cout << "After joining,sun = " << sum << std::endl; }
程序结构很简单,启动两个线程分别对变量sum加上 1-99999。其内存结构大致上是这样的。
c++多线程程序中,每个线程都有一个线程栈,它们相互独立,因此在线程栈中的数据,是不会被其他线程影响到的。但是在内存的数据段中的数据,是可以在全局被访问到的。我们在上面这段代码中定义的sum变量正是位于数据段中。
在目前来看,我们期望最后程序退出的时候,打印出sum是 9999900000。但是结果却不尽人意,我们试着编译运行:
[thread]g++ condition.cpp -omain -std=c++11 -lpthread [thread]main Before joining,sun = 0 After joining,sun = 5192258282 [thread]main Before joining,sun = 0 After joining,sun = 8418413412 [thread]main Before joining,sun = 0 After joining,sun = 5294478585
显然结果还是比较意外的,运行了三次,都得到了不同的结果,而且没有一次得到我们的期望值,这下我们精准地踩中了多线程的"坑"。试着多运行几遍,看看会不会出现正确的结果。当然手动运行几遍甚至几十遍,还是可以应付得了的。但是要运行几千遍,手动运行下来估计手就得抽筋了。这样的机械般的操作还是交给shell脚本吧,由于我的机器配置不是很牛×,暂且先1000次看看,shell脚本如下,count.sh:
#!/bin/bash #result equal with 9999900000 cnt=0 #result more than 9999900000 cnt_more=0 #result less than 9999900000 cnt_less=0 for((i=0;i<1000;++i)) do var=$(main|tail -1) var=${var#After joining,sun = } if(($var == 9999900000)) then ((cnt++)) fi if(($var > 9999900000)) then ((cnt_more++)) fi if(($var < 9999900000)) then ((cnt_less++)) fi done echo "cnt="$cnt echo "cnt_more="$cnt_more echo "cnt_less="$cnt_less
其中变量cnt来统计1000次运行中总共得到过多少次的正确结果,用cnt_more统计偏大的结果,用cnt_less统计偏小的结果。这是该脚本的运行结果:
[thread]count.sh cnt=315 cnt_more=0 cnt_less=685
1000次运行中还是有315次得到了正确答案,有685次的结果是偏小的,却没有一次的结果是偏大的!那么问题出在哪里了?试着想象一下这样一个场景:你和朋友合租在一间房子里边,房子里面只有一间厨房,你们共用一个锅。有一天你准备做一道西红柿炒蛋,当你把西红柿放入锅中的时候,你的电话响了,你离开厨房去接电话。而这时候你的室友也要做饭,他要做一道红烧鱼,于是他把洗好的鱼放入了锅中煮,然后也离开了厨房(由于某种原因他不知道锅里还有你的食材,在程序中线程也不会知道其他线程对共享的数据做了什么)。当你回来的时候继续往里边放入鸡蛋,最后你得到的是一盘西红柿炒鸡蛋鱼。而你的室友回来厨房的时候他要的红烧鱼就会不见了。
在上面的例子里,你和室友就代表着thread1和thread2线程,sum变量就是那个锅。多线程中共享数据的问题,就是上面场景中你们共用一口锅造成的问题。
原子操作
要解决上面场景的问题,其中有一中可行的方案就是:你们做菜的步骤很短,短到什么程度呢,短到这个步骤不可被分割。例如你做的这道菜只有一个步骤,就是让食材(对应于下面提到的原子数据类型)碰一下锅(当然现实场景中基本没有这样的菜),这样你们的做菜过程就不会被其他室友打断、干扰,即使你们共同在使用一口锅。
而上面的代码中的 sum += i 在CPU指令的层面上是可以被分割的,我用g++的-S选项生成其汇编的指令看到了一段这样的代码:
movl $0, -4(%ebp) // sum = 0 movl $0, -8(%ebp) // i =0 ...... movl -8(%ebp), %eax //将i送入寄存器eax addl %eax, -4(%ebp) //将i的值加上sum的值,将结果保存到 sum中。 movl $0, %eax
汇编指令还是描述的比较清楚的,可以清楚的看到 sum += i;操作被分割成了两条cpu指令,先是将i的值保存在eax寄存器中,然后将eax的值加上sum的值并保存在sum中。
而在c++中原子操作就是这样的一种『小到不可分割的』操作。要使用原子操作我们需要引用c++11的一个新的头文件<atomic>。在这个头文件中定义了一个类模板struct atomic表示原子数据类型,在GNU的实现(/usr/include/c++/4.8.3/atomic)上如下:
template<typename _Tp> struct atomic { private: _Tp _M_i; public: atomic() noexcept = default; ~atomic() noexcept = default; atomic(const atomic&) = delete; //删除了拷贝构造 atomic& operator=(const atomic&) = delete; atomic& operator=(const atomic&) volatile = delete; //删除了 operator= constexpr atomic(_Tp __i) noexcept : _M_i(__i) { } operator _Tp() const noexcept { return load(); } operator _Tp() const volatile noexcept { return load(); } _Tp operator=(_Tp __i) noexcept { store(__i); return __i; } ... };
atomic模板中还实现了操作符的重载(由于篇幅,查看完整的类结构请参阅atomic头文件),因此你可以像使用内置的数据类型那样使用原子数据类型(c++保证这些操作是原子操作)。对应于内置的数据类型,原子数据类型都有一份对应的类型,归纳出来如下:
std::atomic_char | std::atomic<char> |
std::atomic_schar | std::atomic<signed char> |
std::atomic_uchar | std::atomic<unsigned char> |
std::atomic_short | std::atomic<short> |
std::atomic_ushort | std::atomic<unsigned short> |
std::atomic_int | std::atomic<int> |
std::atomic_uint | std::atomic<unsigned int> |
std::atomic_long | std::atomic<long> |
std::atomic_ulong | std::atomic<unsigned long> |
std::atomic_llong | std::atomic<long long> |
std::atomic_ullong | std::atomic<unsigned long long> |
更多的请见:http://en.cppreference.com/w/cpp/atomic/atomic |
我们之前的sum变量是long类型的,对应的原子数据类型是std::atomic_long,下面我们就简单的修改一下开篇的代码:
#include <iostream> #include <thread> #include <atomic> // modified std::atomic_long sum = {0L}; // modified void fun() { for(int i=0;i<100000;++i) sum += i; } int main() { std::cout << "Before joining,sun = " << sum << std::endl; std::thread t1(fun); std::thread t2(fun); t1.join(); t2.join(); std::cout << "After joining,sun = " << sum << std::endl; }
我们只增加了一个<atomic>头文件,并且将 long sum = 0L; 修改成了 std::atomic_long sum {0L}; 注意不要写成『std::atomic_long sum = 0L』的形式,因为long类型是不可以隐式转换为std::atomic_long类型的。
为了证明不是偶然性,我们仍用上面的count.sh这个脚本运行1000次上面的修改过的程序:
[thread]g++ atomic.cpp -o main -std=c++11 -lpthread [thread]count.sh cnt=1000 cnt_more=0 cnt_less=0
可以看到原子操作还是有明显的效果的,这1000次的运行我们都得到了正确的结果。事实证明原子操作的确可以作为解决共享数据引起的问题的一种有效的手段。
"自旋锁"——atomic_flag
和其他的原子数据类型(包括atomic_bool)不同的是,他是锁无关(lock-free)的一种类型,即线程对它的访问是不需要加锁的,因此他也没有其他的原子类型的读写操作(load(),store())、运算符操作等。取而代之的是另外两个原子操作的函数test_and_set()和clear()。atomic_flag类的结构在GNU上是这样的:
#if __GCC_ATOMIC_TEST_AND_SET_TRUEVAL == 1 typedef bool __atomic_flag_data_type; #else typedef unsigned char __atomic_flag_data_type; #endif struct __atomic_flag_base { __atomic_flag_data_type _M_i; }; struct atomic_flag : public __atomic_flag_base { ... bool test_and_set(memory_order __m = memory_order_seq_cst) noexcept; bool test_and_set(memory_order __m = memory_order_seq_cst) volatile noexcept; void clear(memory_order __m = memory_order_seq_cst) noexcept; void clear(memory_order __m = memory_order_seq_cst) volatile noexcept; ... private: static constexpr __atomic_flag_data_type _S_init(bool __i) { return __i ? __GCC_ATOMIC_TEST_AND_SET_TRUEVAL : 0; } };
atomic_flag::test_and_set()和其名字一样,大致上是这样工作的:首先检查这atomic_flag类中的bool成员_M_i是否被设置成true,如果没有就先设置成true,并返回之前的值(flase),如果atomic_flag中的bool成员已经是true,则直接返回true。
相比较而言atomic_flag::clear()更加简单粗暴,它直接将atomic_flag的bool值得标志成员_M_i设置成flase,没有返回值。
既然小标题是『自旋锁——atomic_flag』,那么我们看看这把自旋锁(spin lock)是怎么用的:
#include <iostream> #include <atomic> #include <unistd.h> #include <thread> std::atomic_flag lock = ATOMIC_FLAG_INIT; //初始化 void f(int n) { while(lock.test_and_set()) //获取锁的状态 std::cout << "Waiting ... " << std::endl; std::cout << "Thread " << n << " is starting working." << std::endl; } void g(int n) { sleep(3); std::cout << "Thread " << n << " is going to clear the flag." << std::endl; lock.clear(); // 解锁 } int main() { lock.test_and_set(); std::thread t1(f,1); std::thread t2(g,2); t1.join(); t2.join(); }
进入main函数后我们就先设置好atomic_flag,然后启动了两个线程t1和t2,其中t1中我们一直循环获取atomic_flag的状态,知道t2睡眠3秒后,clear()掉lock的锁定状态。其运行结果:
[thread]g++ atomic_flag.cpp -o main -std=c++11 -lpthread [thread]main Waiting ... Waiting ... Waiting ... Waiting ... Waiting ... // omit lager of "waiting..." thread 2 is going to clear the flag. Thread 1 is starting working.
这样的结果正合我们的期望,实际上我们就是通过自旋锁实现了让t1线程一直在等待t2线程。
更进一步地我们还可以通过简单的封装,来实现一把锁。MyLock.h(为了直观我就都写到一个文件中了):
#ifndef __MYLOCK_H_ #define __MYLOCK_H_ #include <iostream> #include <atomic> #include <thread> class MyLock { private: std::atomic_flag m_flag; public: MyLock(); void lock(); void unlock(); }; MyLock::MyLock() { m_flag.clear(); //if not do this,m_flag will be unspecified } void MyLock::lock() { while(m_flag.test_and_set()) ; } void MyLock::unlock() { m_flag.clear(); } #endif
现在我们就试着使用这把锁,来改写开篇的那个程序:
#include <iostream> #include <thread> #include "MyLock.h" //code above MyLock lk; long sum = 0; void add() { for(int i=0;i<100000;++i) { lk.lock(); sum += i; lk.unlock(); } } int main() { std::thread t1(add); std::thread t2(add); t1.join(); t2.join(); std::cout << "sum = " << sum << std::endl; }
运行后没有问题,正确打印出结果sum=9999900000。
如果你点过开过上边的atomic_flag::test_and_set()的链接,你会发现其实它是有参数的,其原型是这样的:
bool test_and_set(std::memory_order order = std::memory_order_seq_cst) volatile;
bool test_and_set(std::memory_order order = std::memory_order_seq_cst);
关于std::memory_order这个类型的细节将在下一篇再去细抠。
最后谢谢你的阅读,如果你能给我一点建议的话,那就更好了。