最近一个项目中用到了 peterson 算法来做临界区的保护, 简简单单的十几行代码, 就能实现两个线程对临界区的无锁访问, 确实很精炼. 但是在这不是来分析 peterson 算法的, 在实际应用中发现 peterson 算法并不能对临界区进行互斥访问, 也就是说两个线程还是有可能同时进入临界区. 但是按照代码的分析, 明明可以实现互斥访问的呀, 这是怎么回事呢?
首先用一个测试程序来检验一下. 临界区是对一个全局变量的自加一运算, 两个线程各加一百万次, 最后结果应该是两百万. 由于自加一运算不是原子的, 如果两个线程同时进入临界区, 最后的结果就会少于两百万.
- #include <iostream>
- #include <pthread.h>
- using namespace std;
- static volatile bool flag[2] = {false, false};
- static volatile int turn = 0;
- static volatile int gCount = 0;
- void procedure0() {
- flag[0] = true;
- turn = 1;
- while (flag[1] && (turn == 1));
- gCount++;
- flag[0] = false;
- }
- void procedure1() {
- flag[1] = true;
- turn = 0;
- while (flag[0] && (turn == 0));
- gCount++;
- flag[1] = false;
- }
- void* ThreadFunc0(void* args)
- {
- int i;
- for (i = 0; i<1000000; i++)
- procedure0();
- return NULL;
- }
- void* ThreadFunc1(void* args)
- {
- int i;
- for (i = 0; i<1000000; i++)
- procedure1();
- return NULL;
- }
- int main()
- {
- pthread_t pid0, pid1;
- if (pthread_create(&pid0, 0, &ThreadFunc0, NULL))
- {
- cout << "Create thread0 failed." << endl;
- return 1;
- }
- if (pthread_create(&pid1, 0, &ThreadFunc1, NULL))
- {
- cout << "Create thread1 failed." << endl;
- return 1;
- }
- pthread_join(pid0, NULL);
- pthread_join(pid1, NULL);
- cout << gCount << endl;
- if (gCount == 2000000)
- cout << "Success" << endl;
- else
- cout << "Fail" << endl;
- return 0;
- }
peterson 测试代码
x86 平台 peterson 锁失效 arm 平台 peterson 锁失效
这个测试程序在 Linux 上用 gcc 编译, 无论用 O0,O1,O2 编译选项, 我试过 x86 平台, Arm 平台, 结果都有可能小于两百万, 也就是这样实现的 peterson 锁不能阻止两个线程同时进入临界区. 原因在于现代的编译器和多核 CPU 因为优化代码的原因, 最擅长的事情就是指令乱序执行. 编译器做的是静态乱序优化, CPU 做的是动态乱序优化. 简单来说, 就是指令最终在 CPU 的执行顺序和我们在程序中写的顺序可能是大相径庭的. 当然这种乱序执行是要在保证最终执行结果正确的前提下的, 大多数情况下都不会引起问题, 我们对指令的乱序执行也毫无感知. 但是在一些特殊的情况下, 比如 peterson 算法里, 乱序优化可能会引起问题.
通常情况下, 乱序优化都可以把对不同地址的 load 操作提到 store 之前去, 我想这是因为 load 操作如果 cache 命中的话, 要比 store 快很多. 以线程 0 为例, 看这 3 行.
- flag[0] = true;
- turn = 1;
- while (flag[1] && (turn == 1));
前两行是 store, 第三行是 load. 但是对同一变量 turn 的 store 再 load, 乱序优化是不可能对他们交换顺序的. 但是 flag[0] 和 flag[1] 是不同的变量, 先 store 后 load 就可能被乱序优化成先 load flag[1], 再 store flag[0]. 假设两个线程都已退出临界区, 准备再次进入, 此时 flag[0] 和 flag[1] 都是 false. 按乱序执行先 load, 两个线程都会有 while 条件为假, 则同时都可以进入了临界区, 互斥失效! 这就是在有些情况下要保持代码的顺序一致性的重要.
这个问题怎么解决呢? 也很简单, 就是使用内存栅栏 (memory barrier). 顾名思义, 他就像个栅栏一样摆在两段代码之间, 阻止编译器或者 CPU 在这两段代码之间进行乱序优化. 在 x86 平台上, 阻止编译器的静态乱序优化的汇编代码是
asm volatile("":::"memory");
但是它不能阻止 CPU 运行时的乱序优化. 在这里我们需要的不仅仅是阻止静态乱序, 还要阻止动态乱序. x86 的动态内存栅栏汇编命令有三条, 分别是 lfence,sfence 和 mfence, 分别表示 load 栅栏, store 栅栏和读写栅栏. 也就是 lfence 只能保证 lfence 之前的读命令不和它之后的读命令发生乱序. sfence 保证 sfence 之前的写命令不和它之后的写命令发生乱序. mfence 保证了它前后的读写命令不发生乱序. 这里我们需要用 mfence, 不过实际上我是了 sfence 也是可以的, 但是 lfence 不行.
- flag[0] = true;
- turn = 1;
- asm("mfence");
- while (flag[1] && (turn == 1));
在中间插入一行内存栅栏指令, 这样 peterson 测试程序执行才是完全正确的.
在 arm 平台, 相应的内存栅栏指令有三条, dmb(data memroy barrier),dsb(date synchronization barrier) 和 isb(instruction synchronization barrier).dmb 保证在 dmb 之前的内存访问指令在它之后的内存访问指令之前完成, 也就是阻止了乱序. dsb 更严格一些, 保证在 dsb 完成之前, 所有它之前的指令都执行完成. isb 最严格, 它会清空处理器的流水线, 当然就能保证之前的所有指令执行完, 它之后的指令必须从 cache 或内存获取. 在这里我们用 dmb 就足够了, dmb 指令带有参数. 用来表达该 barrier 生效的 Shareability Domain(NSH 表示 Non-shareable,ISH 表示 Inner Shareable,OSH 表示 Outer Shareable,SY 表示 Full system, 缺省是 SY) 和内存操作类型 (LD 表示读操作, ST 表示写操作, 缺省表示读写操作), 比如 DMB ISHST 表示对 Inner Shareability Domain 的读写操作生效. 在 peterson 算法这里是能保证正常工作的, 或者直接用 dmb sy.
- flag[0] = true;
- turn = 1;
- asm("dmb ishst");
- //asm("dmb sy");
- while (flag[1] && (turn == 1));
由于汇编指令和平台相关, 移植不便. 4.4.0 和之后版本的 gcc 方便地提供了__sync_synchronize 函数完成内存栅栏指令.
- flag[0] = true;
- turn = 1;
- __sync_synchronize();
- while (flag[1] && (turn == 1));
来源: https://www.cnblogs.com/mightycode/p/9226887.html