Sirius
Sirius

目录

Golang sync.WaitGroup

sync.WaitGroup 是 Go 标准库 sync 包中一个非常常用的并发原语。它的主要作用是等待一组 Goroutine 全部执行完成。它非常适合于那种“主 Goroutine 派发多个子 Goroutine,并需要等待所有子 Goroutine 完成后再继续执行”的场景。

WaitGroup 对外提供了三个核心方法:

  1. Add(delta int): 增加或减少等待组的计数器。

  2. Done(): 对计数器减一,是 Add(-1) 的简写。

  3. Wait(): 阻塞当前 Goroutine,直到等待组的计数器归零。

WaitGroup 的结构定义非常简洁,但其内部状态的管理却十分精巧:

// src/sync/waitgroup.go
type WaitGroup struct {
    noCopy noCopy // 一个特殊类型,用于静态分析工具检查 WaitGroup 是否被复制

    // 64位的值,分为两部分:
    // 高32位:计数器 (counter)
    // 低32位:等待者数量 (waiter count)
    // 另外还有一个信号量,用于唤醒等待者。
    // 为了64位原子操作在32位平台上的对齐问题,这些状态被放在一个数组中。
    state1 [3]uint32
}
  • noCopy: 这个字段的存在是为了通过 go vet 等工具,在编译期间检查出 WaitGroup 被复制的错误用法。WaitGroup 在使用后是不能被复制的。

  • state1 [3]uint32: 这是 WaitGroup 的核心。它是一个包含3个 uint32 元素的数组,但概念上,它存储了两个核心状态:

    1. 一个 64 位的状态整数 (statep): 这个整数由 state1 的前两个 uint32 元素组合而成。

      • 高 32 位: 存储当前的计数器 (counter),即还需要等待多少个 Done() 被调用。

      • 低 32 位: 存储当前有多少个 Goroutine 正在调用 Wait() 方法并处于等待状态 (waiter count)

    2. 一个 32 位的信号量 (sema): 由 state1 的第三个 uint32 元素表示。当 Goroutine 调用 Wait() 需要阻塞时,会通过这个信号量进入休眠;当计数器归零时,AddDone 方法会通过这个信号量唤醒所有等待的 Goroutine。

将计数器和等待者数量合并到一个 64 位整数中,使得 Go 运行时可以通过一次原子操作同时修改或读取这两个状态,极大地简化了并发控制逻辑,避免了额外的锁。

graph TD
    subgraph wg [sync.WaitGroup 实例]
        noCopy["noCopy (禁止复制)"]
        state1["state1 ([3]uint32)"]
    end
    
    subgraph state_usage [state1 数组的 conceptual 用途]
        direction LR
        statep_ptr["statep (*uint64)"]
        sema_ptr["sema (*uint32)"]
    end
    
    subgraph statep_detail [statep 指向的 64位整数 的位布局]
        direction LR
        counter["Counter (高32位)"]
        waiters["Waiter Count (低32位)"]
    end
    
    subgraph sema_usage [sema 的用途]
       sema_desc["用于阻塞/唤醒等待 'Wait()' 的 Goroutine"]
    end
    
    state1 -- "前两个元素组成" --> statep_ptr
    state1 -- "第三个元素" --> sema_ptr
    
    statep_ptr --> statep_detail
    sema_ptr --> sema_usage
    
    counter --> desc_counter["记录还需等待的 goroutine 数量"]
    waiters --> desc_waiters["记录有多少 goroutine 在调用 'Wait()'"]

    style wg fill:#f9f,stroke:#333,stroke-width:2px

Done() 内部直接调用 Add(-1)Add 方法是 WaitGroup 状态变化的核心。

  1. 原子更新计数器: 使用 atomic.AddUint64 操作,将 delta 加到 64 位 state 整数的高 32 位(即计数器部分)。

  2. 检查计数器合法性:

    • delta 为正数时,表示要增加等待的 Goroutine 数量。这通常发生在 Wait() 调用之前。

    • delta 为负数时(Done()),表示一个 Goroutine 已完成。

    • Add 操作后,会检查新的计数器值。如果变为负数,意味着 Done() 的调用次数超过了 Add 的次数,这是一种错误用法,程序会panic

  3. 判断是否唤醒等待者:

    • 如果 Add 操作后,新的计数器值变为 0,这意味着所有等待的 Goroutine 都已完成。

    • 此时,Add 方法会检查 state 整数的低 32 位(即等待者数量 waiter count)。

    • 如果 waiter count 大于 0,说明有 Goroutine 正在 Wait() 方法上阻塞。

    • Add 方法会通过 runtime_Semrelease 操作,根据 waiter count 的数量,释放信号量 sema 相应次数,从而唤醒所有正在等待的 Goroutine。

    • 唤醒后,会将 state 整数重置为 0。

Wait() 方法用于阻塞,直到计数器归零。

  1. 快速路径 (Fast Path):

    • Wait() 首先会通过原子操作读取 64 位的 state 整数。

    • 它检查高 32 位的计数器部分。如果计数器已经为 0,说明无需等待,Wait() 方法立即返回。

  2. 慢速路径 (Slow Path):

    • 如果计数器不为 0,Wait() 会进入一个循环,准备阻塞。

    • 增加等待者数量: 在循环中,它会尝试通过 CAS (Compare-And-Swap) 原子操作,将 state 整数的低 32 位(waiter count)加一。

    • 再次检查计数器: CAS 成功后,它会再次检查计数器是否已经变为 0。这一步是为了处理一个竞态条件:可能在当前 Goroutine 准备进入休眠的瞬间,其他 Goroutine 已经完成了所有任务,并将计数器置零了。如果此时发现计数器已为0,就没必要休眠了,它会把刚加上的 waiter count 再减回去,然后退出循环并返回。

    • 进入休眠: 如果再次检查后,计数器仍然不为 0,则调用 runtime_Semacquire,在 sema 信号量上阻塞当前 Goroutine,进入休眠状态。

    • 被唤醒: 当计数器归零,Add 方法释放了信号量后,这个 Goroutine 会被唤醒,Wait() 方法执行完毕并返回。

  1. Add 必须在 Wait 之前调用:主 Goroutine 应该在启动子 Goroutine 之前,就调用 Add 设置好需要等待的数量。如果在子 Goroutine 内部调用 Add,可能会因为子 Goroutine 还没来得及执行 Add,主 Goroutine 的 Wait 就已经执行并发现计数器为0,从而提前退出。

  2. WaitGroup 不可重用:一旦 WaitGroup 的计数器归零,并且 Wait() 返回后,它就不能被重用了(例如,再次调用 Add 增加计数器)。如果需要新的等待组,应该创建一个新的 WaitGroup 实例。

  3. 不要拷贝 WaitGroupWaitGroup 在首次使用后就不应该被复制。复制会导致其内部状态不一致,go vet 工具可以检查出这种错误。

sync.WaitGroup 的实现非常高效,其精髓在于:

  • 状态压缩:通过一个 64 位的原子整数 state,巧妙地同时管理了“需要等待的任务数量”和“正在等待的 Goroutine 数量”。

  • 无锁设计:核心状态的变更都通过原子操作完成,避免了使用互斥锁带来的开销和复杂性。

  • 高效的阻塞/唤醒机制:利用内部的信号量 (sema),让等待的 Goroutine 高效地休眠和被唤醒,而不会空耗 CPU。

这使得 WaitGroup 成为 Go 中处理一组并发任务同步的轻量级、高性能且易于使用的标准工具。