一, 多线程下共享变量的问题
在多线程编程中经常需要在不同线程之间共享一些变量, 然而对于共享变量操作却经常造成一些莫名奇妙的错误, 除非老老实实加锁对访问保护, 否则经常出现一些 (看起来) 匪夷所思的情况. 比如下面便是两种比较 "喜闻乐见" 的情况.
(a) i++ 问题
在多线程编程中, 最常拿来举例的问题便是著名的 i++ 问题, 即: 多个线程对同一个共享变量 i 执行 i++ 操作. 这样做之所以会出现问题的原因在于 i++ 这个操作可以分为三个步骤:
step | operation |
---|---|
1 | i->reg(读取 i 的值到寄存器) |
2 | inc-reg(在寄存器中自增 i 的值) |
3 | reg->i (写回内存中的 i) |
上面三个步骤中间是可以间隔的, 并非原子操作, 也就是说多个线程同时执行的时候可能出步骤的交叉执行, 例如下面的情况:
step | thread A | thread B |
---|---|---|
1 | i->reg | |
2 | inc-reg | |
3 | i->reg | |
4 | inc-reg | |
5 | reg->i | |
6 | reg->i |
假设 i 一开始为 0, 则执行完第 4 步后, 在两个线程都认为寄存器中的值为 1, 然后在第 5,6 两步分别写回去. 最终两个线程执行完成后 i 的值为 1. 但是实际上我们在两个线程中执行了 i++, 原本希望 i 的值为 2.i++ 实际上可以代表多线程编程中由于操作不是原子的而引发的交叉执行这一类的问题, 但是在这里我们先只关注对单个变量的操作.
(b)指令重排问题
有时候, 我们会用一个变量作为标志位, 当这个变量等于某个特定值的时候就进行某些操作. 但是这样依然可能会有一些意想不到的坑, 例如两个线程以如下顺序执行:
step | thread A | thread B |
---|---|---|
1 | a = 1 | |
2 | flag= true | |
3 | if flag== true | |
4 | assert(a == 1) |
当 B 判断 flag 为 true 后, 断言 a 为 1, 看起来的确是这样. 那么一定是这样吗? 可能不是, 因为编译器和 CPU 都可能将指令进行重排(编译器不同等级的优化和 CPU 的乱序执行). 实际上的执行顺序可能变成这样:
step | thread A | thread B |
---|---|---|
1 | flag = true | |
2 | if flag== true | |
3 | assert(a == 1) | |
4 | a = 1 |
这种重排有可能会导致一个线程内相互之间不存在依赖关系的指令交换执行顺序, 以获得更高的执行效率. 比如上面: flag 与 a 在 A 线程看起来是没有任何依赖关系, 似乎执行顺序无关紧要. 但问题在于 B 使用了 flag 作为是否读取 a 的依据, A 的指令重排可能会导致 step3
的时候断言失败.
解决方案
一个比较稳妥的办法就是对于共享变量的访问进行加锁, 加锁可以保证对临界区的互斥访问, 例如第一种场景如果加锁后再执行 i++ 然后解锁, 则同一时刻只会有一个线程在执行 i++ 操作. 另外, 加锁的内存语义能保证一个线程在释放锁前的写入操作一定能被之后加锁的线程所见(即有 happens before 语义), 可以避免第二种场景中读取到错误的值.
那么如果觉得加锁操作过重太麻烦而不想加锁呢? C++11 提供了一些原子变量与原子操作来支持.
二, C++11 的原子量
C++11 标准在标准库 atomic 头文件提供了模版 atomic<>来定义原子量:
- template<class T>
- struct atomic;
它提供了一系列的成员函数用于实现对变量的原子操作, 例如读操作 load, 写操作 store, 以及 CAS 操作 compare_exchange_weak/compare_exchange_strong 等. 而对于大部分内建类型, C++11 提供了一些特化:
- std::atomic_bool std::atomic<bool>
- std::atomic_char std::atomic<char>
- std::atomic_schar std::atomic<signed char>
- std::atomic_uchar std::atomic<unsigned char>
- std::atomic_short std::atomic<short>
- std::atomic_ushort std::atomic<unsigned short>
- std::atomic_int std::atomic<int>
- std::atomic_uint std::atomic<unsigned int>
- std::atomic_long std::atomic<long>
- ......
- // 更多类型见: http://en.cppreference.com/w/cpp/atomic/atomic
实际上这些特化就是相当于取了一个别名, 本质上是同样的定义. 而对于整形的特化而言, 会有一些特殊的成员函数, 例如原子加 fetch_add
, 原子减 fetch_sub, 原子与 fetch_and, 原子或 fetch_or 等
. 常见操作符 ++,--,+=,&= 等也有对应的重载版本.
接下来以 int 类型为例, 解决我们的前面提到的 i++ 场景中的问题. 先定义一个 int 类型的原子量:
std::atomic<int> i;
由于 int 型的原子量重载了 ++ 操作符, 所以 i++ 是一个不可分割的原子操作, 我们用多个线程执行 i++ 操作来进行验证, 测试代码如下:
- #include <iostream>
- #include <atomic>
- #include <vector>
- #include <functional>
- #include <thread>
- std::atomic<int> i;
- const int count = 100000;
- const int n = 10;
- void add()
- {
- for (int j = 0; j <count; ++j)
- i++;
- }
- int main()
- {
- i.store(0);
- std::vector<std::thread> workers;
- std::cout <<"start" << n << "workers,"
- << "every woker inc" << count << "times" << std::endl;
- for (int j = 0; j < n; ++j)
- workers.push_back(std::move(std::thread(add)));
- for (auto & w : workers)
- w.join();
- std::cout << "workers end"
- << "finally i is" << i << std::endl;
- if (i == n * count)
- std::cout << "i++ test passed!" << std::endl;
- else
- std::cout << "i++ test failed!" << std::endl;
- return 0;
- }
在测试中, 我们定义了一个原子量 i, 在 main 函数开始的时候初始化为 0, 然后启动 10 个线程, 每个线程执行 i++ 操作十万次, 最终检查 i 的值是否正确. 执行的最后结果如下:
start 10 workers, every woker inc 100000 times
- workers end finally i is 1000000
- i++ test passed!
上面我们可以看到, 10 个线程同时进行大量的自增操作, i 的值依然正常. 假如我们把 i 修改为一个普通的 int 变量, 再次执行程序可以得到结果如下:
start 10 workers, every woker inc 100000 times
- workers end finally i is 445227
- i++ test failed!
显然, 由于自增操作各个步骤的交叉执行, 导致最后我们得到一个错误的结果.
原子量可以解决 i++ 问题, 那么可以解决指令重排的问题吗? 也是可以的, 和原子量选择的内存序有关, 我们把这个问题放到下一节专门研究.
上面已经看到 atomic 是一个模版, 那么也就意味着我们可以把自定义类型变成原子变量. 但是是否任意类型都可以定义为原子类型呢? 当然不是, cppreference 中的描述是必须为 TriviallyCopyable 类型. 这个连接为 TriviallyCopyable 的详细定义:
http://en.cppreference.com/w/cpp/concept/TriviallyCopyable
一个比较简单的判断标准就是这个类型可以用 std::memcpy 按位复制, 例如下面的类:
- class {
- int x;
- int y;
- }
这个类是一个 TriviallyCopyable 类型, 然而如果给它加上一个虚函数:
- class {
- int x;
- int y;
- virtual int add ()
- {
- return x + y;
- }
- }
这个类便不能按位拷贝了, 不满足条件, 不能进行原子化.
如果一个类型能够满足 atomic 模版的要求, 可以原子化, 它就不用进行加锁操作了, 因而速度更快吗? 依然不是, atomic 有一个成员函数 is_lock_free, 这个成员函数可以告诉我们到底这个类型的原子量是使用了原子 CPU 指令实现了无锁化, 还是依然使用的加锁的方式来实现原子操作. 不过不管是否用锁来实现, atomic 的使用方式和表现出的语义都是没有区别的. 具体用哪种方式实现 C++ 标准并没有做约束(除了 std::atomic_flag 特化要求必须为 lock free), 跟平台有关.
例如在我的 Cygwin64,GCC7.3 环境下执行如下代码:
- #include <iostream>
- #include <atomic>
- #define N 8
- struct A {
- char a[N];
- };
- int main()
- {
- std::atomic<A> a;
- std::cout << sizeof(A) << std::endl;
- std::cout << a.is_lock_free() << std::endl;
- return 0;
- }
结果为:
8 1
证明上面定义的类型 A 的原子量是无锁的. 我在这个平台上进行了实验, 修改 N 的大小, 结果如下:
N | sizeof(A) | is_lock_free() |
---|---|---|
1 | 1 | 1 |
2 | 2 | 1 |
3 | 3 | 0 |
4 | 4 | 1 |
5 | 5 | 0 |
6 | 6 | 0 |
7 | 7 | 0 |
8 | 8 | 1 |
> 8 | / | 0 |
将 A 修改为内建类型, 对于内建类型的实验结果如下:
type | sizeof() | is_lock_free() |
---|---|---|
char | 1 | 1 |
short | 2 | 1 |
int | 4 | 1 |
long long | 8 | 1 |
float | 4 | 1 |
double | 8 | 1 |
可以看出在我的平台下常用内建类型都是 lock free 的, 自定义类型则和大小有关.
从上面的统计还可以看出似乎当自定义类型的长度和某种自定义类型相等的时候 is_lock_free()就为 true. 我推测可能我这里的 atomic 实现的无锁是通过编译器内建的原子操作实现的, 只有当数据长度刚好能调用编译器内建原子操作时才能进行无锁化. 查看 GCC 参考手册( https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fatomic-Builtins.html#g_t_005f_005fatomic-Builtins ) 中内建原子操作的原型, 以 CAS 操作为例:
bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder)
bool __atomic_compare_exchange (type *ptr, type *expected, type *desired, bool weak, int success_memorder, int failure_memorder)
其参数类型为 type * 的指针, 在同一页可以找到 GCC 关于 type 的描述:
The '__atomic' builtins can be used with any integral scalar or pointer type that is 1, 2, 4, or 8 bytes in length. 16-byte integral types are also allowed if '__int128' (see __int128) is supported by the architecture.
type 类型的长度应该为 1,2,4,8 字节中的一个, 少数支持__int128 的平台可以到 16 字节, 所以只有长度为 1,2,4,8 字节的数据才能实现无锁. 这个只是我的推测, 具体是否如此尚不明白.
三, C++11 的六种内存序
前面我们解决 i++ 问题的时候已经使用过原子量的写操作 load 将原子量赋值, 实际上成员函数还有另一个参数:
void store( T desired, std::memory_order order = std::memory_order_seq_cst )
这个参数代表了该操作使用的内存序, 用于控制变量在不同线程见的顺序可见性问题, 不只 load, 其他成员函数也带有该参数. c++11 提供了六种内存序供选择, 分别为:
- typedef enum memory_order {
- memory_order_relaxed,
- memory_order_consume,
- memory_order_acquire,
- memory_order_release,
- memory_order_acq_rel,
- memory_order_seq_cst
- } memory_order;
之前在场景 2 中, 因为指令的重排导致了意料之外的错误, 通过使用原子变量并选择合适内存序, 可以解决这个问题. 下面先来看看这几种内存序
memory_order_release/memory_order_acquire
内存序选项用来作为原子量成员函数的参数, memory_order_release 用于 store 操作, memory_order_acquire 用于 load 操作, 这里我们把使用了 memory_order_release 的调用称之为 release 操作. 从逻辑上可以这样理解: release 操作可以阻止这个调用之前的读写操作被重排到后面去, 而 acquire 操作则可以保证调用之后的读写操作不会重排到前面来. 听起来有种很绕的感觉, 还是以一个例子来解释: 假设 flag 为一个 atomic 特化的 bool 原子量, a 为一个 int 变量, 并且有如下时序的操作:
step | thread A | thread B |
---|---|---|
1 | a = 1 | |
2 | flag.store(true, memory_order_release) | |
3 | if( true == flag.load(memory_order_acquire)) | |
4 | assert(a == 1) |
实际上这就是把我们上文场景 2 中的 flag 变量换成了原子量, 并用其成员函数进行读写. 在这种情况下的逻辑顺序上, step1 不会跑到 step2 后面去, step4 不会跑到 step3 前面去. 这样一来, 实际上我们就已经保证了当读取到 flag 为 true 的时候 a 一定已经被写入为 1 了, 场景 2 得到了解决. 换一种比较严谨的描述方式可以总结为:
对于同一个原子量, release 操作前的写入, 一定对随后 acquire 操作后的读取可见.
这两种内存序是需要配对使用的, 这也是将他们放在一起介绍的原因. 还有一点需要注意的是: 只有对同一个原子量进行操作才会有上面的保证, 比如 step3 如果是读取了另一个原子量 flag2, 是不能保证读取到 a 的值为 1 的.
memory_order_release/memory_order_consume
memory_order_release 还可以和 memory_order_consume 搭配使用. memory_order_release 操作的作用没有变化, 而 memory_order_consume 用于 load 操作, 我们简称为 consume 操作, comsume 操作防止在其后对原子变量有依赖的操作被重排到前面去. 这种情况下:
对于同一个原子变量, release 操作所依赖的写入, 一定对随后 consume 操作后依赖于该原子变量的操作可见.
这个组合比上一种更宽松, comsume 只阻止对这个原子量有依赖的操作重拍到前面去, 而非像 aquire 一样全部阻止. 将上面的例子稍加改造来展示这种内存序, 假设 flag 为一个 atomic 特化的 bool 原子量, a 为一个 int 变量, b,c 各为一个 bool 变量, 并且有如下时序的操作:
step | thread A | thread B |
---|---|---|
1 | b = true | |
2 | a = 1 | |
3 | flag.store(b, memory_order_release) | |
4 | while (!(c = flag.load(memory_order_consume))) | |
5 | assert(a == 1) | |
6 | assert(c == true) | |
7 | assert(b == true) |
step4 使得 c 依赖于 flag, 当 step4 线程 B 读取到 flag 的值为 true 的时候, 由于 flag 依赖于 b,b 在之前的写入是可见的, 此时 b 一定为 true, 所以 step6,step7 的断言一定会成功. 而且这种依赖关系具有传递性, 假如 b 又依赖与另一个变量 d, 则 d 在之前的写入同样对 step4 之后的操作可见. 那么 a 呢? 很遗憾在这种内存序下 a 并不能得到保证, step5 的断言可能会失败.
memory_order_acq_rel
这个选项看名字就很像 release 和 acquire 的结合体, 实际上它的确兼具两者的特性. 这个操作用于 "读取 - 修改 - 写回" 这一类既有读取又有修改的操作, 例如 CAS 操作. 可以将这个操作在内存序中的作用想象为将 release 操作和 acquire 操作捆在一起, 因此任何读写操作的重拍都不能跨越这个调用. 依然以一个例子来说明, flag 为一个 atomic 特化的 bool 原子量, a,c 各为一个 int 变量, b 为一个 bool 变量, 并且刚好按如下顺序执行:
step | thread A | thread B |
---|---|---|
1 | a = 1 | |
2 | flag.store(true, memory_order_release) | |
3 | b = true | |
4 | c = 2 | |
5 | while (!flag.compare_exchange_weak(b, false, memory_order_acq_rel)) {b = true} | |
6 | assert(a == 1) | |
7 | if (true == flag.load(memory_order_acquire) | |
8 | assert(c == 2) |
由于 memory_order_acq_rel 同时具有 memory_order_release 与 memory_order_acquire 的作用, 因此 step2 可以和 step5 组合成上面提到的 release/acquire 组合, 因此 step6 的断言一定会成功, 而 step5 又可以和 step7 组成 release/acquire 组合, step8 的断言同样一定会成功.
memory_order_seq_cst
这个内存序是各个成员函数的内存序默认选项, 如果不选择内存序则默认使用 memory_order_seq_cst. 这是一个 "美好" 的选项, 如果对原子变量的操作都是使用的 memory_order_seq_cst 内存序, 则多线程行为相当于是这些操作都以一种特定顺序被一个线程执行, 在哪个线程观察到的对这些原子量的操作都一样. 同时, 任何使用该选项的写操作都相当于 release 操作, 任何读操作都相当于 acquire 操作, 任何 "读取 - 修改 - 写回" 这一类的操作都相当于使用 memory_order_acq_rel 的操作.
memory_order_relaxed
这个选项如同其名字, 比较松散, 它仅仅只保证其成员函数操作本身是原子不可分割的, 但是对于顺序性不做任何保证.
代价
总的来讲, 越严格的内存序其性能开销会越大. 对于我们常用的 x86 处理器而言, 在处理器层级本身就支持 release/acquire 语义, 因此 release 与 acquire/consume 都只影响编译器的优化, 而 memory_order_seq_cst 还会影响处理器的指令重排.
感想
查资料学习原子量与内存序的过程中, 深感多线程和并发的深奥, 尽管出于好奇心会尝试了解各种内存序, 但是在实践中写代码还是尽量选择比较稳妥的方式来实现吧, 能加锁加锁, 实在不行用默认的 memory_order_seq_cst 选项的原子量. 毕竟就普通程序员而言, 其实很难遇到要在这上面挤性能的场景, 如果真觉得需要, 多半是我们的设计不科学 = = ! 假如确确实实遇到这样的场景, 做之前一定要谨慎的多做些研究, 选择简单的使用方式, 做到心中有数.
来源: https://www.cnblogs.com/FateTHarlaown/p/8919235.html