目录
锁
锁的开销
锁的优化
CAS
ABA 问题
ABA 解决方法
CAS 原理应用
参考
锁
互斥锁是用来保护一个临界区, 即保护一个访问共用资源的程序片段, 而这些共用资源又无法同时被多个线程访问的特性. 当有线程进入临界区段时, 其他线程或是进程必须等待.
在谈及锁的性能开销, 一般都会说锁的开销很大, 那锁的开销有多大, 主要耗在哪, 怎么提高锁的性能.
锁的开销
现在锁的机制一般使用 futex(fast Userspace mutexes), 内核态和用户态的混合机制. 还没有 futex 的时候, 内核是如何维护同步与互斥的呢? 系统内核维护一个对象, 这个对象对所有进程可见, 这个对象是用来管理互斥锁并且通知阻塞的进程. 如果进程 A 要进入临界区, 先去内核查看这个对象, 有没有别的进程在占用这个临界区, 出临界区的时候, 也去内核查看这个对象, 有没有别的进程在等待进入临界区, 然后根据一定的策略唤醒等待的进程. 这些不必要的系统调用 (或者说内核陷入) 造成了大量的性能开销. 为了解决这个问题, Futex 就应运而生.
Futex 是一种用户态和内核态混合的同步机制. 首先, 同步的进程间通过 mmap 共享一段内存, futex 变量就位于这段共享的内存中且操作是原子的, 当进程尝试进入互斥区或者退出互斥区的时候, 先去查看共享内存中的 futex 变量, 如果没有竞争发生, 则只修改 futex, 而不用再执行系统调用了. 当通过访问 futex 变量告诉进程有竞争发生, 则还是得执行系统调用去完成相应的处理 (wait 或者 wake up). 简单的说, futex 就是通过在用户态的检查,(motivation) 如果了解到没有竞争就不用陷入内核了, 大大提高了 low-contention 时候的效率.
mutex 是在 futex 的基础上用的内存共享变量来实现的, 如果共享变量建立在进程内, 它就是一个线程锁, 如果它建立在进程间共享内存上, 那么它是一个进程锁. pthread_mutex_t 中的 _lock 字段用于标记占用情况, 先使用 CAS 判断_lock 是否占用, 若未占用, 直接返回. 否则, 通过__lll_lock_wait_private 调用 SYS_futex 系统调用迫使线程进入沉睡. CAS 是用户态的 CPU 指令, 若无竞争, 简单修改锁状态即返回, 非常高效, 只有发现竞争, 才通过系统调用陷入内核态. 所以, FUTEX 是一种用户态和内核态混合的同步机制, 它保证了低竞争情况下的锁获取效率.
所以如果锁不存在冲突, 每次获得锁和释放锁的处理器开销仅仅是 CAS 指令的开销.
确定一件事情最好的方法是实际测试和观测它, 让我们写一段代码来测试无冲突时锁的开销:
- #include <pthread.h>
- #include <stdlib.h>
- #include <stdio.h>
- #include <time.h>
- static inline long long unsigned time_ns(struct timespec* const ts) {
- if (clock_gettime(CLOCK_REALTIME, ts)) {
- exit(1);
- }
- return ((long long unsigned) ts->tv_sec) * 1000000000LLU
- + (long long unsigned) ts->tv_nsec;
- }
- int main()
- {
- int res = -1;
- pthread_mutex_t mutex;
- // 初始化互斥量, 使用默认的互斥量属性
- res = pthread_mutex_init(&mutex, NULL);
- if(res != 0)
- {
- perror("pthread_mutex_init failed\n");
- exit(EXIT_FAILURE);
- }
- long MAX = 1000000000;
- long c = 0;
- struct timespec ts;
- const long long unsigned start_ns = time_ns(&ts);
- while(c <MAX)
- {
- pthread_mutex_lock(&mutex);
- c = c + 1;
- pthread_mutex_unlock(&mutex);
- }
- const long long unsigned delta = time_ns(&ts) - start_ns;
- printf("%f\n", delta/(double)MAX);
- return 0;
- }
说明: 以下性能测试在腾讯云 Intel(R) Xeon(R) CPU E5-26xx v4 1 核 2399.996MHz 下进行.
运行了 10 亿次, 平摊到每次加锁 / 解锁操作大概是 2.2ns 每次加锁 / 解锁(扣除了循环耗时 2.7ns)
在锁冲突的情况下, 开销就没有这么小了.
首先 pthread_mutex_lock 会真正的调用 sys_futex 来进入内核来试图加锁, 被锁住以后线程会进入睡眠, 这带来了上下文切换和线程调度的开销.
可以写两个互相解锁的线程来测试这个过程的开销:
- // Copyright (C) 2010 Benoit Sigoure
- //
- // This program is free software: you can redistribute it and/or modify
- // it under the terms of the GNU General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // This program is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU General Public License for more details.
- //
- // You should have received a copy of the GNU General Public License
- // along with this program. If not, see <http://www.gnu.org/licenses/>.
- #include <pthread.h>
- #include <sched.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <sys/ipc.h>
- #include <sys/shm.h>
- #include <sys/syscall.h>
- #include <sys/wait.h>
- #include <time.h>
- #include <unistd.h>
- #include <Linux/futex.h>
- static inline long long unsigned time_ns(struct timespec* const ts) {
- if (clock_gettime(CLOCK_REALTIME, ts)) {
- exit(1);
- }
- return ((long long unsigned) ts->tv_sec) * 1000000000LLU
- + (long long unsigned) ts->tv_nsec;
- }
- static const int iterations = 500000;
- static void* thread(void* restrict ftx) {
- int* futex = (int*) ftx;
- for (int i = 0; i <iterations; i++) {
- sched_yield();
- while (syscall(SYS_futex, futex, FUTEX_WAIT, 0xA, NULL, NULL, 42)) {
- // retry
- sched_yield();
- }
- *futex = 0xB;
- while (!syscall(SYS_futex, futex, FUTEX_WAKE, 1, NULL, NULL, 42)) {
- // retry
- sched_yield();
- }
- }
- return NULL;
- }
- int main(void) {
- struct timespec ts;
- const int shm_id = shmget(IPC_PRIVATE, sizeof (int), IPC_CREAT | 0666);
- int* futex = shmat(shm_id, NULL, 0);
- pthread_t thd;
- if (pthread_create(&thd, NULL, thread, futex)) {
- return 1;
- }
- *futex = 0xA;
- const long long unsigned start_ns = time_ns(&ts);
- for (int i = 0; i < iterations; i++) {
- *futex = 0xA;
- while (!syscall(SYS_futex, futex, FUTEX_WAKE, 1, NULL, NULL, 42)) {
- // retry
- sched_yield();
- }
- sched_yield();
- while (syscall(SYS_futex, futex, FUTEX_WAIT, 0xB, NULL, NULL, 42)) {
- // retry
- sched_yield();
- }
- }
- const long long unsigned delta = time_ns(&ts) - start_ns;
- const int nswitches = iterations << 2;
- printf("%i thread context switches in %lluns (%.1fns/ctxsw)\n",
- nswitches, delta, (delta / (float) nswitches));
- wait(futex);
- return 0;
- }
编译使用 gcc -std=gnu99 -pthread context_switch.c.
运行的结果是 2003.4ns/ctxsw, 所以锁冲突的开销大概是不冲突开销的 910 倍了, 相差出乎意料的大.
另外一个 c 程序可以用来测试 "纯上下文切换" 的开销, 线程只是使用 sched_yield 来放弃处理器, 并不进入睡眠.
- // Copyright (C) 2010 Benoit Sigoure
- //
- // This program is free software: you can redistribute it and/or modify
- // it under the terms of the GNU General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // This program is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU General Public License for more details.
- //
- // You should have received a copy of the GNU General Public License
- // along with this program. If not, see <http://www.gnu.org/licenses/>.
- #include <sched.h>
- #include <pthread.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <time.h>
- #include <string.h>
- #include <errno.h>
- static inline long long unsigned time_ns(struct timespec* const ts) {
- if (clock_gettime(CLOCK_REALTIME, ts)) {
- exit(1);
- }
- return ((long long unsigned) ts->tv_sec) * 1000000000LLU
- + (long long unsigned) ts->tv_nsec;
- }
- static const int iterations = 500000;
- static void* thread(void*ctx) {
- (void)ctx;
- for (int i = 0; i <iterations; i++)
- sched_yield();
- return NULL;
- }
- int main(void) {
- struct sched_param param;
- param.sched_priority = 1;
- if (sched_setscheduler(getpid(), SCHED_FIFO, ¶m))
- fprintf(stderr, "sched_setscheduler(): %s\n", strerror(errno));
- struct timespec ts;
- pthread_t thd;
- if (pthread_create(&thd, NULL, thread, NULL)) {
- return 1;
- }
- long long unsigned start_ns = time_ns(&ts);
- for (int i = 0; i < iterations; i++)
- sched_yield();
- long long unsigned delta = time_ns(&ts) - start_ns;
- const int nswitches = iterations << 2;
- printf("%i thread context switches in %lluns (%.1fns/ctxsw)\n",
- nswitches, delta, (delta / (float) nswitches));
- return 0;
- }
"纯上下文切换" 消耗了大概 381.2ns/ctxsw.
这样我们大致可以把锁冲突的开销分成三部分,"纯上下文切换" 开销, 大概是 381.2ns, 调度器开销 (把线程从睡眠变成就绪或者反过来) 大概是 1622.2ns, 在多核系统上, 还存在跨处理器调度的开销, 那部分开销很大. 在真实的应用场景里, 还要考虑上下文切换带来的 cache 不命中和 TLB 不命中的开销, 开销只会进一步加大.
锁的优化
从上面可以知道, 真正消耗时间的不是上锁的次数, 而是锁冲突的次数. 减少锁冲突的次数才是提升性能的关键. 使用更细粒度的锁, 可以减少锁冲突. 这里说的粒度包括时间和空间, 比如哈希表包含一系列哈希桶, 为每个桶设置一把锁, 空间粒度就会小很多 -- 哈希值相互不冲突的访问不会导致锁冲突, 这比为整个哈希表维护一把锁的冲突机率低很多. 减少时间粒度也很容易理解, 加锁的范围只包含必要的代码段, 尽量缩短获得锁到释放锁之间的时间, 最重要的是, 绝对不要在锁中进行任何可能会阻塞的操作. 使用读写锁也是一个很好的减少冲突的方式, 读操作之间不互斥, 大大减少了冲突.
假设单向链表中的插入 / 删除操作很少, 主要操作是搜索, 那么基于单一锁的方法性能会很差. 在这种情况下, 应该考虑使用读写锁, 即 pthread_rwlock_t, 这么做就允许多个线程同时搜索链表. 插入和删除操作仍然会锁住整个链表. 假设执行的插入和搜索操作数量差不多相同, 但是删除操作很少, 那么在插入期间锁住整个链表是不合适的, 在这种情况下, 最好允许在链表中的分离点 (disjoint point) 上执行并发插入, 同样使用基于读写锁的方式. 在两个级别上执行锁定, 链表有一个读写锁, 各个节点包含一个互斥锁, 在插入期间, 写线程在链表上建立读锁, 然后继续处理. 在插入数据之前, 锁住要在其后添加新数据的节点, 插入之后释放此节点, 然后释放读写锁. 删除操作在链表上建立写锁. 不需要获得与节点相关的锁; 互斥锁只建立在某一个操作节点之上, 大大减少锁冲突的次数.
锁本身的行为也存在进一步优化的可能性, sys_futex 系统调用的作用在于让被锁住的当前线程睡眠, 让出处理器供其它线程使用, 既然这个过程的消耗很高, 也就是说如果被锁定的时间不超过这个数值的话, 根本没有必要进内核加锁 -- 释放的处理器时间还不够消耗的. sys_futex 的时间消耗够跑很多次 CAS 的, 也就是说, 对于一个锁冲突比较频繁而且平均锁定时间比较短的系统, 一个值得考虑的优化方式是先循环调用 CAS 来尝试获得锁(这个操作也被称作自旋锁), 在若干次失败后再进入内核真正加锁. 当然这个优化只能在多处理器的系统里起作用(得有另一个处理器来解锁, 否则自旋锁无意义). 在 glibc 的 pthread 实现里, 通过对 pthread_mutex 设置 PTHREAD_MUTEX_ADAPTIVE_NP 属性就可以使用这个机制.
CAS
锁产生的一些问题:
等待互斥锁会消耗宝贵的时间, 锁的开销很大.
低优先级的线程可以获得互斥锁, 因此阻碍需要同一互斥锁的高优先级线程. 这个问题称为优先级倒置(priority inversion )
可能因为分配的时间片结束, 持有互斥锁的线程被取消调度. 这对于等待同一互斥锁的其他线程有不利影响, 因为等待时间现在会更长. 这个问题称为锁护送(lock convoying)
无锁编程的好处之一是一个线程被挂起, 不会影响到另一个线程的执行, 避免锁护送; 在锁冲突频繁且平均锁定时间较短的系统, 避免上下文切换和调度开销.
CAS (comapre and swap 或者 check and set), 比较并替换, 引用 wiki, 它是一种用于线程数据同步的原子指令.
CAS 核心算法涉及到三个参数, 即内存值, 更新值和期望值; CAS 指令会先检查一个内存位置是否包含预期的值; 如果是这样, 就把新的值复制到这个位置, 返回 true; 如果不是则返回 false.
CAS 对应一条汇编指令 CMPXCHG, 因此是原子性的.
- bool compare_and_swap (int *accum, int *dest, int newval)
- {
- if ( *accum == *dest ) {
- *dest = newval;
- return true;
- }
- return false;
- }
一般, 程序会在循环里使用 CAS 不断去完成一个事务性的操作, 一般包含拷贝一个共享的变量到一个局部变量, 然后再使用这个局部变量执行任务计算得到新的值, 最后再使用 CAS 比较保存再局部变量的旧值和内存值来尝试提交你的修改, 如果尝试失败, 会重新读取一遍内存值, 再重新计算, 最后再使用 CAS 尝试提交修改, 如此循环. 比如:
- void LockFreeQueue::push(Node* newHead)
- {
- for (;;)
- {
- // 拷贝共享变量(m_Head) 到一个局部变量
- Node* oldHead = m_Head;
- // 执行任务, 可以不用关注其他线程
- newHead->next = oldHead;
- // 下一步尝试提交更改到共享变量
- // 如果共享变量没有被其他线程修改过, 仍为 oldHead, 则 CAS 将 newHead 赋值给共享变量 m_Head 并返回
- // 否则继续循环重试
- if (_InterlockedCompareExchange(&m_Head, newHead, oldHead))
- return;
- }
- }
上面的数据结构设置了一个共享的头节点 m_Head, 当 push 一个新的节点时, 会把新节点加在头节点后面; 不要相信程序的执行是连续的, CPU 的执行是多线程并发. 在 _InterlockedCompareExchange 即 CAS 之前, 线程可能因为时间片用完被调度出去, 新调度进来的线程执行完了 push 操作, 多个线程共享了 m_Head 变量, 此时 m_Head 已被修改了, 如果原来线程继续执行, 把 oldHead 覆盖到 m_Head, 就会丢失其他线程 push 进来的节点. 所以需要比较 m_Head 是不是还等于 oldHead, 如果是, 说明头节点不变, 可以使用 newHead 覆盖 m_Head; 如果不是, 说明有其他线程 push 了新的节点, 那么需要使用最新的 m_Head 更新 oldHead 的值重新走一下循环,_InterlockedCompareExchange 会自动把 m_Head 赋值给 oldHead.
ABA 问题
因为 CAS 需要在提交修改时检查期望值和内存值有没有发生变化, 如果没有则进行更新, 但是如果原来一个值从 A 变成 B 又变成 A, 那么使用 CAS 检查的时候发现值没有发生变化, 但实际上已经发生了一系列变化.
内存的回收利用会导致 CAS 出现严重的问题:
- T* ptr1 = new T(8, 18);
- T* old = ptr1;
- delete ptr1;
- T* ptr2 = new T(0, 1);
- // 我们不能保证操作系统不会重新使用 ptr1 内存地址, 一般的内存管理器都会这样子做
- if (old1 == ptr2) {
- // 这里表示, 刚刚回收的 ptr1 指向的内存被用于后面申请的 ptr2 了
- }
ABA 问题是无锁结构实现中常见的一种问题, 可基本表述为:
进程 P1 读取了一个数值 A
P1 被挂起(时间片耗尽, 中断等), 进程 P2 开始执行
P2 修改数值 A 为数值 B, 然后又修改回 A
P1 被唤醒, 比较后发现数值 A 没有变化, 程序继续执行.
对于 P1 来说, 数值 A 未发生过改变, 但实际上 A 已经被变化过了, 继续使用可能会出现问题. 在 CAS 操作中, 由于比较的多是指针, 这个问题将会变得更加严重. 试想如下情况:
有一个堆 (先入后出) 中有 top 和节点 A, 节点 A 目前位于堆顶 top 指针指向 A. 现在有一个进程 P1 想要 pop 一个节点, 因此按照如下无锁操作进行
- pop()
- {
- do{
- ptr = top; // ptr = top = NodeA
- next_prt = top->next; // next_ptr = NodeX
- } while(CAS(top, ptr, next_ptr) != true);
- return ptr;
- }
而进程 P2 在执行 CAS 操作之前打断了 P1, 并对堆进行了一系列的 pop 和 push 操作, 使堆变为如下结构:
进程 P2 首先 pop 出 NodeA, 之后又 Push 了两个 NodeB 和 C, 由于内存管理机制中广泛使用的内存重用机制, 导致 NodeC 的地址与之前的 NodeA 一致.
这时 P1 又开始继续运行, 在执行 CAS 操作时, 由于 top 依旧指向的是 NodeA 的地址(实际上已经变为 NodeC), 因此将 top 的值修改为了 NodeX, 这时堆结构如下:
经过 CAS 操作后, top 指针错误的指向了 NodeX 而不是 NodeB.
ABA 解决方法
Tagged state reference, 增加额外的 tag bits 位, 它像一个版本号; 比如, 其中一种算法是在内存地址的低位记录指针的修改次数, 在指针修改时, 下一次 CAS 会返回失败, 即使因为内存重用机制导致地址一样. 有时我们称这种机制位 ABA', 因为我们使第二个 A 稍微有点不同于第一个. tag 的位数长度会影响记录修改的次数, 在现有的 CPU 下, 使用 60 bit tag, 在不重启程序 10 年才会产生溢出问题; 在 X64 CPU, 趋向于支持 128 bit 的 CAS 指令, 这样更能保证避免出现 ABA 问题.
下面参考 liblfds 库代码说明下 Tagged state reference 的实现过程.
我们想要避免 ABA 问题的方法之一是使用更长的指针, 这样便需要一个支持 dword 长度的 CAS 指令. liblfds 是怎么跨平台实现 128 bit 指令的呢?
在 liblfds 下, CAS 指令为 LFDS710_PAL_ATOMIC_DWCAS 宏, 它的完整形式是:
LFDS710_PAL_ATOMIC_DWCAS( pointer_to_destination, pointer_to_compare, pointer_to_new_destination, cas_strength, result)
pointer_to_destination: [in, out], 指向目标的指针, 是一个由两个 64 bit 整数组成的数组;
pointer_to_compare: [in, out], 用于和目标指针比较的指针, 同样是一个由两个 64 bit 整数组成的数组;
pointer_to_new_destination: [in], 和目标指针交换的新指针;
result: [out], 如果 128 bit 的 pointer_to_compare 与 pointer_to_destination 相等, 则使用 pointer_to_new_destination 覆盖 pointer_to_destination,result 返回 1; 如果不相等, 则 pointer_to_destination 不变, 且 pointer_to_compare 的值变为 pointer_to_destination.
从上面可以看出, liblfds 库使用一个由两个元素组成的一维数组来表示 128 bit 指针.
Linux 提供了 cmpxchg16b 用于实现 128 bit 的 CAS 指令, 而在 Windows, 使用 _InterlockedCompareExchange128. 只有 128 位指针完全相等的情况下, 才视为相等.
参考 liblfds/liblfds7.1.0/liblfds710/inc/liblfds710/lfds710_porting_abstraction_layer_compiler.h 下关于 CAS 的 Windows 实现:
- #define LFDS710_PAL_ATOMIC_DWCAS( pointer_to_destination, pointer_to_compare, pointer_to_new_destination, cas_strength, result ) \
- { \
- LFDS710_PAL_BARRIER_COMPILER_FULL; \
- (result) = (char unsigned) _InterlockedCompareExchange128( (__int64 volatile *) (pointer_to_destination), (__int64) (pointer_to_new_destination[1]), (__int64) (pointer_to_new_destination[0]), (__int64 *) (pointer_to_compare) ); \
- LFDS710_PAL_BARRIER_COMPILER_FULL; \
- }
再重点研究 new_top 的定义和提交修改过程.
new_top 是一个具有两个元素的一维数组, 元素是 struct lfds710_stack_element 指针, 两个元素分别使用 POINTER 0 和 COUNTER 1 标记. COUNTER 相当于前面说的 tag 标记, POINTER 保存的时真正的节点指针. 在 X64 下, 指针长度是 64 bit, 所以这里使用的是 64 bit tag 记录 pointer 修改记录.
liblfds 用原 top 的 COUNTER + 1 来初始化 new top COUNTER, 即使用 COUNTER 标记 ss->top 的更换次数, 这样每一次更换 top,top 里的 COUNTER 都会变.
只有在 ss->top 和 original_top 的 POINTER 和 COUNTER 完全相等的情况下, new_top 才会覆盖到 ss->top, 否则会使用 ss->top 覆盖 original_top, 下次循环用最新的 original_top 再次操作和比较.
参考 liblfds/liblfds7.1.0/liblfds710/src/lfds710_stack/lfds710_stack_push.c, 无锁堆栈的实现:
- void lfds710_stack_push( struct lfds710_stack_state *ss,
- struct lfds710_stack_element *se )
- {
- char unsigned
- result;
- lfds710_pal_uint_t
- backoff_iteration = LFDS710_BACKOFF_INITIAL_VALUE;
- struct lfds710_stack_element LFDS710_PAL_ALIGN(LFDS710_PAL_ALIGN_DOUBLE_POINTER)
- *new_top[PAC_SIZE],
- *volatile original_top[PAC_SIZE];
- LFDS710_PAL_ASSERT( ss != NULL );
- LFDS710_PAL_ASSERT( se != NULL );
- new_top[POINTER] = se;
- original_top[COUNTER] = ss->top[COUNTER];
- original_top[POINTER] = ss->top[POINTER];
- do
- {
- se->next = original_top[POINTER];
- LFDS710_MISC_BARRIER_STORE;
- new_top[COUNTER] = original_top[COUNTER] + 1;
- LFDS710_PAL_ATOMIC_DWCAS( ss->top, original_top, new_top, LFDS710_MISC_CAS_STRENGTH_WEAK, result );
- if( result == 0 )
- LFDS710_BACKOFF_EXPONENTIAL_BACKOFF( ss->push_backoff, backoff_iteration );
- }
- while( result == 0 );
- LFDS710_BACKOFF_AUTOTUNE( ss->push_backoff, backoff_iteration );
- return;
- }
CAS 原理应用
无锁数据结构, 参考 https://github.com/liblfds/liblfds
高性能内存队列 disruptor 中的 CAS, 参考 http://ifeve.com/disruptor/
数据库乐观锁
参考
- [wiki Compare-and-swap] https://en.wikipedia.org/wiki/Compare-and-swap
- [wiki ABA problem] https://en.wikipedia.org/wiki/ABA_problem
[左耳朵耗子无锁队列的实现] https://coolshell.cn/articles/8239.html
[IBM 设计不使用互斥锁的并发数据结构]
- [ABA problem]
- [_InterlockedCompareExchange128]
[Linux 互斥锁的实现原理(pthread_mutex_t)] https://www.bbsmax.com/A/x9J2WXvW56/
[futex 机制介绍]
[an-introduction-to-lock-free-programming]
[多进程, 多线程与多处理器计算平台的性能问题]
[Implement Lock-Free Queue]
[上下文切换和线程调度性能测试]
[纯上下文切换性能测试]
[锁的开销]
[pthread 包的 mutex 实现分析]
[IBM 通用线程: POSIX 线程详解]
来源: https://www.cnblogs.com/cposture/p/10761396.html