在 Golang 中, WaitGroup 主要用来做 go Routine 的等待, 当启动多个 go 程序, 通过 waitgroup 可以等待所有 go 程序结束后再执行后面的代码逻辑, 比如:
- func Main() {
- wg := sync.WaitGroup{}
- for i := 0; i <10; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- time.Sleep(10 * time.Second)
- }()
- }
- wg.Wait() // 等待在此, 等所有 go func 里都执行了 Done() 才会退出
- }
WaitGroup 主要是三个方法, Add(int),Done() 和 Wait(), 其中 Done() 是调用了 Add(-1), 推荐使用方法是, 先统一 Add, 在 goroutine 里并发的 Done, 然后 Wait
WaitGroup 主要维护了 2 个计数器, 一个是请求计数器 v, 一个是等待计数器 w, 二者组成一个 64bit 的值, 请求计数器占高 32bit, 等待计数器占低 32bit.
简单来说, 当 Add(n) 执行时, 请求计数器 v 就会加 n, 当 Done() 执行时, v 就会减 1, 可以想到, v 为 0 时就是结束, 可以触发 Wait() 执行了, 所谓的触发 Wait() 是通过信号量实现的.
那么等待计数器拿来干嘛? 是因为 Wait() 方法支持并发, 每一次 Wait() 方法执行, 等待计数器 w 就会加 1, 而等待 v 为 0 触发 Wait() 时, 要根据 w 的数量发送 w 份的信号量, 正确的触发所有的 Wait().
同时, WaitGroup 里还有对使用逻辑进行了严格的检查, 比如 Wait() 一旦开始不能 Add().
下面是带注释的代码:
- func (wg *WaitGroup) Add(delta int) {
- statep := wg.state()
- // 更新 statep,statep 将在 wait 和 add 中通过原子操作一起使用
- state := atomic.AddUint64(statep, uint64(delta)<<32)
- v := int32(state>> 32)
- w := uint32(state)
- if v <0 {
- panic("sync: negative WaitGroup counter")
- }
- if w != 0 && delta> 0 && v == int32(delta) {
- // wait 不等于 0 说明已经执行了 Wait, 此时不容许 Add
- panic("sync: WaitGroup misuse: Add called concurrently with Wait")
- }
- // 正常情况, Add 会让 v 增加, Done 会让 v 减少, 如果没有全部 Done 掉, 此处 v 总是会大于 0 的, 直到 v 为 0 才往下走
- // 而 w 代表是有多少个 goruntine 在等待 done 的信号, wait 中通过 compareAndSwap 对这个 w 进行加 1
- if v> 0 || w == 0 {
- return
- }
- // This goroutine has set counter to 0 when waiters> 0.
- // Now there can't be concurrent mutations of state:
- // - Adds must not happen concurrently with Wait,
- // - Wait does not increment waiters if it sees counter == 0.
- // Still do a cheap sanity check to detect WaitGroup misuse.
- // 当 v 为 0(Done 掉了所有) 或者 w 不为 0(已经开始等待) 才会到这里, 但是在这个过程中又有一次 Add, 导致 statep 变化, panic
- if *statep != state {
- panic("sync: WaitGroup misuse: Add called concurrently with Wait")
- }
- // Reset waiters count to 0.
- // 将 statep 清 0, 在 Wait 中通过这个值来保护信号量发出后还对这个 Waitgroup 进行操作
- *statep = 0
- // 将信号量发出, 触发 wait 结束
- for ; w != 0; w-- {
- runtime_Semrelease(&wg.sema, false)
- }
- }
- // Done decrements the WaitGroup counter by one.
- func (wg *WaitGroup) Done() {
- wg.Add(-1)
- }
- // Wait blocks until the WaitGroup counter is zero.
- func (wg *WaitGroup) Wait() {
- statep := wg.state()
- for {
- state := atomic.LoadUint64(statep)
- v := int32(state>> 32)
- w := uint32(state)
- if v == 0 {
- // Counter is 0, no need to wait.
- if race.Enabled {
- race.Enable()
- race.Acquire(unsafe.Pointer(wg))
- }
- return
- }
- // Increment waiters count.
- // 如果 statep 和 state 相等, 则增加等待计数, 同时进入 if 等待信号量
- // 此处做 CAS, 主要是防止多个 goroutine 里进行 Wait() 操作, 每有一个 goroutine 进行了 wait, 等待计数就加 1
- // 如果这里不相等, 说明 statep, 在 从读出来 到 CAS 比较 的这个时间区间内, 被别的 goroutine 改写了, 那么不进入 if, 回去再读一次, 这样写避免用锁, 更高效些
- if atomic.CompareAndSwapUint64(statep, state, state+1) {
- if race.Enabled && w == 0 {
- // Wait must be synchronized with the first Add.
- // Need to model this is as a write to race with the read in Add.
- // As a consequence, can do the write only for the first waiter,
- // otherwise concurrent Waits will race with each other.
- race.Write(unsafe.Pointer(&wg.sema))
- }
- // 等待信号量
- runtime_Semacquire(&wg.sema)
- // 信号量来了, 代表所有 Add 都已经 Done
- if *statep != 0 {
- // 走到这里, 说明在所有 Add 都已经 Done 后, 触发信号量后, 又被执行了 Add
- panic("sync: WaitGroup is reused before previous Wait has returned")
- }
- return
- }
- }
- }
来源: https://www.cnblogs.com/jiangz222/p/10348763.html