前言

当我们的程序在运行过程中需要执行多个子任务时,我们可以利用 Go 协程并发地执行这些子任务,然后等待它们执行结束,从而缩短程序串行执行的耗费时间。Go 语言标准库自带了该组件:sync.WaitGroup。

使用实例

看一个 Go 官方提供的使用实例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var wg sync.WaitGroup
var urls = []string{
	"http://www.golang.org/",
	"http://www.google.com/",
	"http://www.somestupidname.com/",
}
for _, url := range urls {
	wg.Add(1)   // 增加一个任务
	go func(url string) {
		defer wg.Done() //任务完成
		http.Get(url)
	}(url)
}
wg.Wait()   //阻塞等待所有任务完成

可以看到,使用非常简单。

Add方法用于增加一个任务,Done方法在完成该任务时调用,Wait方法用于阻塞等待所有任务完成。

信号量

sync.WaitGroup 源码是基于计数器状态和信号量来实现的,看下维基百科的信号量介绍

信号量是一个同步对象,用于保持在 0 至指定最大值之间的一个计数值:

  • 当线程完成一次对该 semaphore 对象的等待时,该计数值 -1;
  • 当线程完成一次对该 semaphore 对象的释放时,该计数值 +1;

源码分析

接下来,我们看下 sync.WaitGroup 的源码是如何实现的。

1
本文基于:go1.14

结构体

1
2
3
4
5
6
7
8
9
type WaitGroup struct {
    // noCopy 标记不可复制,使用 go vet 作为检测使用,并因此只能进行指针传递,从而保证全局唯一
    noCopy noCopy
    
    // 64位值:高32位为计数器,低32位为等待计数。
    // 不使用64位值是因为32位的编译器不能确保64位原子操作的位对齐 
    // 一个uit32占4个byte,我们分配了12个byte对齐的8个byte作为计数,剩下4个用于信号量的存储
    state1 [3]uint32
}

sync.WaiyGroup 的结构也很简单,只包含了 noCopy 和 state1 两个字段。

noCopy 使用 go vet 来做静态检查,当发生复制时会报错提示,但不影响程序编译运行,这里需要注意一下。noCopy 的实现原理分析请移步👉这篇博客

state1 的使用就比较值得推敲了,sync.WaitGroup 是通过原子操作信号量和计数器来实现的,而信号量和计数器都保存在 [3]uint32 内,我们看下源码是如何取的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// state 函数返回计数器状态和信号量的指针
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    // 判定地址是否8位对齐
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        // 前8bytes做uint64指针计数状态,后4bytes做信号量
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {
        // 后8bytes做uint64指针计数状态,前4bytes做信号量
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}

state1 包含了 worker 计数器、waiter 计数器和信号量,取值逻辑是:

  • worker 计数器:v 是 statep *uint64 的左 32 位
  • waiter 计数器:w 是 statep *uint64 的右 32 位
  • 信号量:semap 是 state1 [3]uint32 的第一个字节/最后一个字节

通过 state 方法我们可以看出,计数状态值是 *uint64 类型,信号量是 *uint32 类型,然后在使用时通过原子操作来读取,这里就涉及到内存对齐的知识点了。

作为 Go 标准库自带的公共组件肯定是要考虑到不同平台的兼容问题,在 64 位平台原子操作 uint64 类型没有问题,但在 32 位平台操作 uint64 类型可能会发生报错,为什么呢?

atomic-bug 中提到:

1
2
3
4
On x86-32, the 64-bit functions use instructions unavailable before the Pentium MMX.
On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core.

On ARM, x86-32, and 32-bit MIPS, it is the caller’s responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.

也就是说,在 32 位系统上想要原子操作 64 位字(如uint64)的话,需要由调用方保证其数据地址是 64 位对齐的,否则原子访问会有异常。那该如何保证呢?🤔

unsafe 包规范中提到:

1
2
3
Computer architectures may require memory addresses to be aligned; that is, for addresses of a variable to be a multiple of a factor, the variable’s type’s alignment. The function Alignof takes an expression denoting a variable of any type and returns the alignment of the (type of the) variable in bytes. For a variable x:

uintptr(unsafe.Pointer(&x)) % unsafe.Alignof(x) == 0

也就是说,如果类型 t 的对齐保证是 n,那么类型 t 的每个值的地址在运行时必须是 n 的倍数。上面的 state 方法正是基于这个规则来实现即便在 32 平台依然可以保证 uint64 是内存对齐的。

那么 noCopy 和 state1 字段能否可以调换位置呢?留个思考在这里🤔。

关于这块内存对齐的介绍,先说到这里,后面会单独总结篇博客来详细说明下,敬请期待~

Add

首先看 Add 方法的源码实现,可以添加或者删除 worker 的数量,参数 delta 可能为负的,这里去掉了 race 相关的代码,清爽了很多:

  • worker 减少到零值,所有阻塞的 waiter 都会被释放
  • worker 减少到负数,会发生 panic
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (wg *WaitGroup) Add(delta int) {
    //获取计数状态和信号量的指针
    statep, semap := wg.state()
    
    //statep 高32位表示worker计数,低32位表示waiter计数状态
    //使用原子加操作worker计数
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32)
    w := uint32(state)
    
    //如果计数worker减少到负数,则panic
	if v < 0 {
		panic("sync: negative WaitGroup counter")
    }
    
    //添加与等待并发调用,报panic
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    
    //worder计数添加成功
	if v > 0 || w == 0 {
		return
    }

    //这时 Goroutine 已经将计数器清零,且等待器大于零(并发调用导致)
    //这时不允许出现并发使用导致的状态突变,否则就应该 panic
    //- Add 不能与 Wait 并发调用
    //- Wait 在计数器已经归零的情况下,不能再继续增加等待器了
    //仍然检查来保证 WaitGroup 不会被滥用
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
    
    // 重置计数器为0
	*statep = 0
	for ; w != 0; w-- {
        //根据waiter计数依次释放信号,唤醒阻塞等待的waiter
		runtime_Semrelease(semap, false, 0)
	}
}

Done

Done 方法其实就是 Add(-1):

1
2
3
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait

Wait 方法的逻辑是修改 waiter 计数器,并阻塞等待信号量释放,同样去掉了 race 相关的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//Wait 方法保持阻塞等待直到 worker计数减少到 0
func (wg *WaitGroup) Wait() {
    //获取计数器状态和信号量的指针
	statep, semap := wg.state()
	for {
        state := atomic.LoadUint64(statep)
        //worker计数状态
        v := int32(state >> 32)
        //waiter计数状态
		w := uint32(state)
		if v == 0 {
			//当worker计数为 0 时,直接返回不需要阻塞等待
			return
        }
        
        //原子增加 waiter 计数值,原子操作低 32 位,不需要移位
        //通过 cas 和 for 循环这种无锁编程来保证 Wait 方法的并发调用
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
            //阻塞等待信号量的释放
			runtime_Semacquire(semap)
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            //信号量获取成功
			return
		}
	}
}

小结

  • Add 操作必须早于 Wait 调用,否则会发生 panic
  • WaitGroup 不保证 worker 的执行顺序
  • WaitGroup 无法指定固定的 worker 数量
  • WaitGroup 禁止拷贝,只能通过指针引用

参考

Go 夜读 sync.WaitGroup 源码分析

Go 之聊聊 struct 的内存对齐

Go 语言原本 - 同步组