前言
当我们的程序在运行过程中需要执行多个子任务时,我们可以利用 Go 协程并发地执行这些子任务,然后等待它们执行结束,从而缩短程序串行执行的耗费时间。Go 语言标准库自带了该组件:sync.WaitGroup。
使用实例
看一个 Go 官方提供的使用实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
wg.Add(1) // 增加一个任务
go func(url string) {
defer wg.Done() //任务完成
http.Get(url)
}(url)
}
wg.Wait() //阻塞等待所有任务完成
|
可以看到,使用非常简单。
Add
方法用于增加一个任务,Done
方法在完成该任务时调用,Wait
方法用于阻塞等待所有任务完成。
信号量
sync.WaitGroup 源码是基于计数器状态和信号量来实现的,看下维基百科的信号量介绍。
信号量是一个同步对象,用于保持在 0 至指定最大值之间的一个计数值:
- 当线程完成一次对该 semaphore 对象的等待时,该计数值 -1;
- 当线程完成一次对该 semaphore 对象的释放时,该计数值 +1;
源码分析
接下来,我们看下 sync.WaitGroup 的源码是如何实现的。
结构体
1
2
3
4
5
6
7
8
9
|
type WaitGroup struct {
// noCopy 标记不可复制,使用 go vet 作为检测使用,并因此只能进行指针传递,从而保证全局唯一
noCopy noCopy
// 64位值:高32位为计数器,低32位为等待计数。
// 不使用64位值是因为32位的编译器不能确保64位原子操作的位对齐
// 一个uit32占4个byte,我们分配了12个byte对齐的8个byte作为计数,剩下4个用于信号量的存储
state1 [3]uint32
}
|
sync.WaiyGroup 的结构也很简单,只包含了 noCopy 和 state1 两个字段。
noCopy 使用 go vet 来做静态检查,当发生复制时会报错提示,但不影响程序编译运行,这里需要注意一下。noCopy 的实现原理分析请移步👉这篇博客。
state1 的使用就比较值得推敲了,sync.WaitGroup 是通过原子操作信号量和计数器来实现的,而信号量和计数器都保存在 [3]uint32
内,我们看下源码是如何取的:
1
2
3
4
5
6
7
8
9
10
11
|
// state 函数返回计数器状态和信号量的指针
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
// 判定地址是否8位对齐
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 前8bytes做uint64指针计数状态,后4bytes做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 后8bytes做uint64指针计数状态,前4bytes做信号量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
|
state1 包含了 worker 计数器、waiter 计数器和信号量,取值逻辑是:
- worker 计数器:v 是 statep *uint64 的左 32 位
- waiter 计数器:w 是 statep *uint64 的右 32 位
- 信号量:semap 是 state1 [3]uint32 的第一个字节/最后一个字节
通过 state 方法我们可以看出,计数状态值是 *uint64
类型,信号量是 *uint32
类型,然后在使用时通过原子操作来读取,这里就涉及到内存对齐的知识点了。
作为 Go 标准库自带的公共组件肯定是要考虑到不同平台的兼容问题,在 64 位平台原子操作 uint64 类型没有问题,但在 32 位平台操作 uint64 类型可能会发生报错,为什么呢?
在 atomic-bug 中提到:
1
2
3
4
|
On x86-32, the 64-bit functions use instructions unavailable before the Pentium MMX.
On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core.
On ARM, x86-32, and 32-bit MIPS, it is the caller’s responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.
|
也就是说,在 32 位系统上想要原子操作 64 位字(如uint64)的话,需要由调用方保证其数据地址是 64 位对齐的,否则原子访问会有异常。那该如何保证呢?🤔
在 unsafe 包规范中提到:
1
2
3
|
Computer architectures may require memory addresses to be aligned; that is, for addresses of a variable to be a multiple of a factor, the variable’s type’s alignment. The function Alignof takes an expression denoting a variable of any type and returns the alignment of the (type of the) variable in bytes. For a variable x:
uintptr(unsafe.Pointer(&x)) % unsafe.Alignof(x) == 0
|
也就是说,如果类型 t 的对齐保证是 n,那么类型 t 的每个值的地址在运行时必须是 n 的倍数。上面的 state 方法正是基于这个规则来实现即便在 32 平台依然可以保证 uint64 是内存对齐的。
那么 noCopy 和 state1 字段能否可以调换位置呢?留个思考在这里🤔。
关于这块内存对齐的介绍,先说到这里,后面会单独总结篇博客来详细说明下,敬请期待~
Add
首先看 Add 方法的源码实现,可以添加或者删除 worker 的数量,参数 delta 可能为负的,这里去掉了 race 相关的代码,清爽了很多:
- worker 减少到零值,所有阻塞的 waiter 都会被释放
- worker 减少到负数,会发生 panic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func (wg *WaitGroup) Add(delta int) {
//获取计数状态和信号量的指针
statep, semap := wg.state()
//statep 高32位表示worker计数,低32位表示waiter计数状态
//使用原子加操作worker计数
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
//如果计数worker减少到负数,则panic
if v < 0 {
panic("sync: negative WaitGroup counter")
}
//添加与等待并发调用,报panic
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//worder计数添加成功
if v > 0 || w == 0 {
return
}
//这时 Goroutine 已经将计数器清零,且等待器大于零(并发调用导致)
//这时不允许出现并发使用导致的状态突变,否则就应该 panic
//- Add 不能与 Wait 并发调用
//- Wait 在计数器已经归零的情况下,不能再继续增加等待器了
//仍然检查来保证 WaitGroup 不会被滥用
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 重置计数器为0
*statep = 0
for ; w != 0; w-- {
//根据waiter计数依次释放信号,唤醒阻塞等待的waiter
runtime_Semrelease(semap, false, 0)
}
}
|
Done
Done 方法其实就是 Add(-1):
1
2
3
|
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
|
Wait
Wait 方法的逻辑是修改 waiter 计数器,并阻塞等待信号量释放,同样去掉了 race 相关的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
//Wait 方法保持阻塞等待直到 worker计数减少到 0
func (wg *WaitGroup) Wait() {
//获取计数器状态和信号量的指针
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
//worker计数状态
v := int32(state >> 32)
//waiter计数状态
w := uint32(state)
if v == 0 {
//当worker计数为 0 时,直接返回不需要阻塞等待
return
}
//原子增加 waiter 计数值,原子操作低 32 位,不需要移位
//通过 cas 和 for 循环这种无锁编程来保证 Wait 方法的并发调用
if atomic.CompareAndSwapUint64(statep, state, state+1) {
//阻塞等待信号量的释放
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
//信号量获取成功
return
}
}
}
|
小结
- Add 操作必须早于 Wait 调用,否则会发生 panic
- WaitGroup 不保证 worker 的执行顺序
- WaitGroup 无法指定固定的 worker 数量
- WaitGroup 禁止拷贝,只能通过指针引用
参考
Go 夜读 sync.WaitGroup 源码分析
Go 之聊聊 struct 的内存对齐
Go 语言原本 - 同步组