针对 Golang 1.9 的 sync.Mutex 进行分析, 与 Golang 1.10 基本一样除了将 panic 改为了 throw 之外其他的都一样.
源代码位置: sync\mutex.go.
可以看到注释如下:
- Mutex can be in 2 modes of operations: normal and starvation.
- In normal mode waiters are queued in FIFO order, but a woken up waiter does not own the mutex and competes with new arriving goroutines over the ownership. New arriving goroutines have an advantage -- they are already running on CPU and there can be lots of them, so a woken up waiter has good chances of losing. In such case it is queued at front of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, it switches mutex to the starvation mode.
- In starvation mode ownership of the mutex is directly handed off from the unlocking goroutine to the waiter at the front of the queue. New arriving goroutines don't try to acquire the mutex even if it appears to be unlocked, and don't try to spin. Instead they queue themselves at the tail of the wait queue.
- If a waiter receives ownership of the mutex and sees that either (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, it switches mutex back to normal operation mode.
- Normal mode has considerably better performance as a goroutine can acquire a mutex several times in a row even if there are blocked waiters.
- Starvation mode is important to prevent pathological cases of tail latency.
博主英文很烂, 就粗略翻译一下, 仅供参考:
互斥量可分为两种操作模式: 正常和饥饿.
在正常模式下, 等待的 goroutines 按照 FIFO(先进先出) 顺序排队, 但是 goroutine 被唤醒之后并不能立即得到 mutex 锁, 它需要与新到达的 goroutine 争夺 mutex 锁.
因为新到达的 goroutine 已经在 CPU 上运行了, 所以被唤醒的 goroutine 很大概率是争夺 mutex 锁是失败的. 出现这样的情况时候, 被唤醒的 goroutine 需要排队在队列的前面.
如果被唤醒的 goroutine 有超过 1ms 没有获取到 mutex 锁, 那么它就会变为饥饿模式.
在饥饿模式中, mutex 锁直接从解锁的 goroutine 交给队列前面的 goroutine. 新达到的 goroutine 也不会去争夺 mutex 锁 (即使没有锁, 也不能去自旋), 而是到等待队列尾部排队.
在饥饿模式下, 有一个 goroutine 获取到 mutex 锁了, 如果它满足下条件中的任意一个, mutex 将会切换回去正常模式:
1. 是等待队列中的最后一个 goroutine
2. 它的等待时间不超过 1ms.
正常模式有更好的性能, 因为 goroutine 可以连续多次获得 mutex 锁;
饥饿模式对于预防队列尾部 goroutine 一致无法获取 mutex 锁的问题.
看了这段解释, 那么基本的业务逻辑也就了解了, 可以整理一下衣装, 准备看代码.
打开 mutex.go 看到如下代码:
- type Mutex struct {
- state int32 // 将一个 32 位整数拆分为 当前阻塞的 goroutine 数 (29 位)| 饥饿状态 (1 位)| 唤醒状态 (1 位)| 锁状态 (1 位) 的形式, 来简化字段设计
- sema uint32 // 信号量
- }
- const (
- mutexLocked = 1 <<iota // 1 0001 含义: 用最后一位表示当前对象锁的状态, 0 - 未锁住 1 - 已锁住
- mutexWoken // 2 0010 含义: 用倒数第二位表示当前对象是否被唤醒 0 - 唤醒 1 - 未唤醒
- mutexStarving // 4 0100 含义: 用倒数第三位表示当前对象是否为饥饿模式, 0 为正常模式, 1 为饥饿模式.
- mutexWaiterShift = iota // 3, 从倒数第四位往前的 bit 位表示在排队等待的 goroutine 数
- starvationThresholdNs = 1e6 // 1ms
- )
可以看到 Mutex 中含有:
一个非负数信号量 sema;
state 表示 Mutex 的状态.
常量:
mutexLocked 表示锁是否可用 (0 可用, 1 被别的 goroutine 占用)
mutexWoken=2 表示 mutex 是否被唤醒
mutexWaiterShift=4 表示统计阻塞在该 mutex 上的 goroutine 数目需要移位的数值.
将 3 个常量映射到 state 上就是
- state: |32|31|...| |3|2|1|
- \__________/ | | |
- | | | |
| | | mutex 的占用状态 (1 被占用, 0 可用)
| | |
| | mutex 的当前 goroutine 是否被唤醒
| |
| 饥饿位, 0 正常, 1 饥饿
|
等待唤醒以尝试锁定的 goroutine 的计数, 0 表示没有等待者
如果同学们熟悉 Java 的锁, 就会发现与 AQS 的设计是类似, 只是没有 AQS 设计的那么精致, 不得不感叹, JAVA 的牛逼.
有同学是否会有疑问为什么使用的是 int32 而不是 int64 呢, 因为 32 位原子性操作更好, 当然也满足的需求.
Mutex 在 1.9 版本中就两个函数 Lock() 和 Unlock().
下面我们先来分析最难的 Lock() 函数:
- func (m *Mutex) Lock() {
- // 如果 m.state=0, 说明当前的对象还没有被锁住, 进行原子性赋值操作设置为 mutexLocked 状态, CompareAnSwapInt32 返回 true
- // 否则说明对象已被其他 goroutine 锁住, 不会进行原子赋值操作设置, CopareAndSwapInt32 返回 false
- if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)
- if race.Enabled {
- race.Acquire(unsafe.Pointer(m))
- }
- return
- }
- // 开始等待时间戳
- var waitStartTime int64
- // 饥饿模式标识
- starving := false
- // 唤醒标识
- awoke := false
- // 自旋次数
- iter := 0
- // 保存当前对象锁状态
- old := m.state
- // 看到这个 for {} 说明使用了 cas 算法
- for {
- // 相当于 xxxx...x0xx & 0101 = 01, 当前对象锁被使用
- if old&(mutexLocked|mutexStarving) == mutexLocked &&
- // 判断当前 goroutine 是否可以进入自旋锁
- runtime_canSpin(iter) {
- // 主动旋转是有意义的. 试着设置 mutexwake 标志, 告知解锁, 不要唤醒其他阻塞的 goroutines.
- if !awoke &&
- // 再次确定是否被唤醒: xxxx...xx0x & 0010 = 0
- old&mutexWoken == 0 &&
- // 查看是否有 goroution 在排队
- old>>mutexWaiterShift != 0 &&
- // 将对象锁改为唤醒状态: xxxx...xx0x | 0010 = xxxx...xx1x
- atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
- awoke = true
- }//END_IF_Lock
- // 进入自旋锁后当前 goroutine 并不挂起, 仍然在占用 cpu 资源, 所以重试一定次数后, 不会再进入自旋锁逻辑
- runtime_doSpin()
- // 自加, 表示自旋次数
- iter++
- // 保存 mutex 对象即将被设置成的状态
- old = m.state
- continue
- }// END_IF_spin
- // 以下代码是不使用 ** 自旋 ** 的情况
- new := old
- // 不要试图获得饥饿的互斥, 新来的 goroutines 必须排队.
- // 对象锁饥饿位被改变, 说明处于饥饿模式
- // xxxx...x0xx & 0100 = 0xxxx...x0xx
- if old&mutexStarving == 0 {
- // xxxx...x0xx | 0001 = xxxx...x0x1, 标识对象锁被锁住
- new |= mutexLocked
- }
- // xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0; 当前 mutex 处于饥饿模式并且锁已被占用, 新加入进来的 goroutine 放到队列后面
- if old&(mutexLocked|mutexStarving) != 0 {
- // 更新阻塞 goroutine 的数量, 表示 mutex 的等待 goroutine 数目加 1
- new += 1 <<mutexWaiterShift
- }
- // 当前的 goroutine 将互斥锁转换为饥饿模式. 但是, 如果互斥锁当前没有解锁, 就不要打开开关, 设置 mutex 状态为饥饿模式. Unlock 预期有饥饿的 goroutine
- if starving &&
- // xxxx...xxx1 & 0001 != 0; 锁已经被占用
- old&mutexLocked != 0 {
- // xxxx...xxx | 0101 => xxxx...x1x1, 标识对象锁被锁住
- new |= mutexStarving
- }
- // goroutine 已经被唤醒, 因此需要在两种情况下重设标志
- if awoke {
- // xxxx...xx1x & 0010 = 0, 如果唤醒标志为与 awoke 不相协调就 panic
- if new&mutexWoken == 0 {
- panic("sync: inconsistent mutex state")
- }
- // new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x : 设置唤醒状态位 0, 被唤醒
- new &^= mutexWoken
- }
- // 获取锁成功
- if atomic.CompareAndSwapInt32(&m.state, old, new) {
- // xxxx...x0x0 & 0101 = 0, 已经获取对象锁
- if old&(mutexLocked|mutexStarving) == 0 {
- // 结束 cas
- break
- }
- // 以下的操作都是为了判断是否从饥饿模式中恢复为正常模式
- // 判断处于 FIFO 还是 LIFO 模式
- queueLifo := waitStartTime != 0
- if waitStartTime == 0 {
- waitStartTime = runtime_nanotime()
- }
- runtime_SemacquireMutex(&m.sema, queueLifo)
- starving = starving || runtime_nanotime()-waitStartTime> starvationThresholdNs
- old = m.state
- // xxxx...x1xx & 0100 != 0
- if old&mutexStarving != 0 {
- // xxxx...xx11 & 0011 != 0
- if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
- panic("sync: inconsistent mutex state")
- }
- delta := int32(mutexLocked - 1<<mutexWaiterShift)
- if !starving || old>>mutexWaiterShift == 1 {
- delta -= mutexStarving
- }
- atomic.AddInt32(&m.state, delta)
- break
- }
- awoke = true
- iter = 0
- } else {
- // 保存 mutex 对象状态
- old = m.state
- }
- }// cas 结束
- if race.Enabled {
- race.Acquire(unsafe.Pointer(m))
- }
- }
看了 Lock() 函数之后是不是觉得一片懵逼状态, 告诉大家一个方法, 看 Lock() 函数时候需要想着如何 Unlock. 下面就开始看看 Unlock() 函数.
- func (m *Mutex) Unlock() {
- if race.Enabled {
- _ = m.state
- race.Release(unsafe.Pointer(m))
- }
- // state-1 标识解锁
- new := atomic.AddInt32(&m.state, -mutexLocked)
- // 验证锁状态是否符合
- if (new+mutexLocked)&mutexLocked == 0 {
- panic("sync: unlock of unlocked mutex")
- }
- // xxxx...x0xx & 0100 = 0 ; 判断是否处于正常模式
- if new&mutexStarving == 0 {
- old := new
- for {
- // 如果没有等待的 goroutine 或 goroutine 已经解锁完成
- if old>>mutexWaiterShift == 0 ||
- // xxxx...x0xx & (0001 | 0010 | 0100) => xxxx...x0xx & 0111 != 0
- old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
- return
- }
- // Grab the right to wake someone.
- new = (old - 1<<mutexWaiterShift) | mutexWoken
- if atomic.CompareAndSwapInt32(&m.state, old, new) {
- runtime_Semrelease(&m.sema, false)
- return
- }
- old = m.state
- }
- } else {
- // 饥饿模式: 将 mutex 所有权移交给下一个等待的 goroutine
- // 注意: mutexlock 没有设置, goroutine 会在唤醒后设置.
- // 但是互斥锁仍然被认为是锁定的, 如果互斥对象被设置, 所以新来的 goroutines 不会得到它
- runtime_Semrelease(&m.sema, true)
- }
- }
在网上还会有一些基于 go1.6 的分析, 但是与 go 1.9 的差距有点大.
上面的分析, 因个人水平有限, 难免存在错误, 请各位老师同学多多指点, 不喜勿喷.
附录
- https://github.com/golang/go/blob/dev.boringcrypto.go1.9/src/sync/mutex.go
- https://segmentfault.com/a/1190000000506960
来源: http://blog.51cto.com/qiangmzsx/2134786