本文基于 Go1.14.2

Go 自带的 context 包设计的很巧妙,最近阅读了下源码实现,可谓是短小精悍,很值得投入时间去学习。

什么是 context

Go1.7 开始引入的 context 标准库包,主要用来在协程之间传递上下文信息,包括:取消信号、超时控制、截止时间、k-v等。

随着 context 包的引入,标准库中很多接口因此加上了 context 参数,例如 database/sql 包,context 几乎成为了并发控制和超时控制的标准做法。

由于在底层实现中使用了锁,所以在多个 goroutine 之间同时使用 context 是并发安全的。

源码阅读

Context 接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Context interface {
    // 返回取消时间 deadline,以及是否会被自动取消
    Deadline() (deadline time.Time, ok bool)
    // 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
	Done() <-chan struct{}
    // 在 channel Done 关闭后,返回 context 取消原因
	Err() error
    // 获取 key 对应的 value,不存在时返回 nil
	Value(key interface{}) interface{}
}

这里要着重说明下 Done,该方法返回一个只读的 channel,可以表示 context 被取消的信号:

  • 当 channel 未被关闭时,子协程进行读取时会发生阻塞,直到被关闭;
  • 当 channel 发生关闭时,子协程可以读到 channel 的零值,从而收到 context 取消的消息;

正是利用关闭 channel 的这种”广播机制“,所有监听 Done 方法的子协程可以在收到信号时进行一些收尾工作,尽快退出。

这里同时也要注意下,由于 channel 的底层也用到了锁,所以当大量协程在等待 ctx.Done() 时,实际上在争同一把大锁,可能会有性能问题。可以考虑使用 WithCancel 来复制通知 channel,减少锁竞争。

canceler 接口

1
2
3
4
type canceler interface {
    cancel(removeFromParent bool, err error)
    Done() <-chan struct{}
}

只要实现了 canceler 接口就可以执行取消操作,*cancelCtx*timerCtx 实现了该接口,注意是加了 * 号的,是这两个结构体的指针实现了 canceler 接口。

Context 接口设计成这样的原因:

  • “取消”操作应该是建议性,而非强制性;
  • “取消”操作应该可传递,“取消”某个函数时,和它相关联的其他函数也应该“取消”;

emptyCtx

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

对于 Context 的实现源码里只有一个最基本的实现,就是私有的 emptyCtx,这实际上是一个空的 context,永远不会被 cancel,没有存储值,也没有 deadline。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

func Background() Context {
	return background
}

func TODO() Context {
	return todo
}
  • background 通常用在 main 函数中,作为所有 context 的根节点。

  • todo 通常用在并不知道传递什么 context 时使用。例如,调用一个需要传递 context 参数的函数,你手头并没有其他 context 可以传递,这时就可以传递 todo。这常常发生在重构进行中,给一些函数添加了一个 Context 参数,但不知道要传什么,就用 todo “占个位子”,最终要换成其他 context。

cancelCtx

1
2
3
4
5
6
7
8
type cancelCtx struct {
	Context

	mu       sync.Mutex            // 保护以下字段
	done     chan struct{}         // 惰性创建,当被取消时进行关闭
	children map[canceler]struct{} // 保存了可被取消的子 canceler,最终会形成树状结构
	err      error                 // 保存发生取消时会的错误信息
}

这是一个可被取消的 cancelCtx,实现了 canceler 接口。由于内嵌了 Context 接口,所以同时也是一个 Context,并重写了里面的 Err、Done、Value 方法。

Value 方法实现

1
2
3
4
5
6
func (c *cancelCtx) Value(key interface{}) interface{} {
	if key == &cancelCtxKey {   // 这里是为了在调用 parentCancelCtx 时返回可挂载的 cancelCtx
		return c
	}
	return c.Context.Value(key) // 递归获取 key 对应的 value
}

Done 方法实现

1
2
3
4
5
6
7
8
9
func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

可以看出 c.done 是惰性创建的,只有调用了 Done() 方法的时候才会被创建。

WithCancel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

WithCancel 通过参数 parent context(这通常是一个 background,作为根节点)来创建 cancelCtx。这里重点看下 propagateCancel 和 cancel 的实现。

propagateCancel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func propagateCancel(parent Context, child canceler) {  
	done := parent.Done()
	if done == nil {    // done == nil 时,表示父节点是个空节点,永远不会被取消
		return 
	}

	select {
	case <-done:
		// 使用 select 检查父类是否已经被取消,是的话直接递归取消子 context,否则继续执行
		child.cancel(false, parent.Err())
		return
	default:
	}

    // 找到可以挂靠的父 context,实际上是找上一个父 cancelCtx
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// 父节点已被取消,子节点也要被取消
			child.cancel(false, p.err)
		} else {
            // 父节点未被取消
			if p.children == nil {
				p.children = make(map[canceler]struct{})
            }
            // 挂载到父节点上
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
        atomic.AddInt32(&goroutines, +1)
        // 如果没有找到可取消的父 context,新启动一个协程监控父节点或子节点取消信号
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

propagateCancel 方法的作用就是向上寻找可以挂靠的并且可取消的父 context 节点,并把子节点挂靠上去。这样,调用父节点 cancel 方法的时候,就可以层层传递,将那些挂靠的子 context 节点同时取消。

最后 else 这段代码可能比较难以理解,其实就是为了解决当找不到可以挂靠的父节点时如何取消该子节点:

  • case <- parent.Done(): 表示在单独的 goroutine 监听父类的取消消息,当父类取消时执行子类取消操作;
  • case <- child.Done(): 表示监听子类单独的取消消息,这个 case 主要是为了防止父类一直不取消时候协程泄露问题;

cancel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil { // 取消错误消息不能为空
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil { // err 不为空说明已经取消
		c.mu.Unlock()
		return
	}
	c.err = err
	if c.done == nil {  // c.done 未初始化时,直接服用全局已关闭的 closedchan
		c.done = closedchan
	} else {
		close(c.done)   // 关闭 channel,广播关闭消息
	}
	for child := range c.children {
		// 遍历取消子 context 节点
		child.cancel(false, err)
	}
	c.children = nil    // 将子节点置空,等待 gc 回收
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c)   // 当 removeFromParent == true 时,从父节点移除子 context
	}
}

cancel 的作用其实就以下几点:

  • 关闭自己 c.done,广播消息;
  • 递归取消所有子节点;
  • 从父节点删除自己;

还有一个问题是,cancel 的第一个参数为什么有时传 true,有时传 false?

  • 当主动取消从 WithCancel 返回的 cancel 方法时使用 true,此时需要仅仅从父节点删除自己;
  • 当父类节点递归取消子节点时使用 false,这里不使用 true 的原因是父节点最后会执行 c.children = nil 操作,会销毁所有子节点;

timerCtx

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type timerCtx struct {
	cancelCtx
	timer *time.Timer 

	deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
	return c.deadline, true
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
	c.cancelCtx.cancel(false, err)  // 调用内嵌的 cancelCtx 的取消操作
	if removeFromParent {           // 当 removeFromParent = true 时,从父节点删除自己。这里不太理解为啥不直接这样调用 `c.cancelCtx.cancel(true, err)` 
		removeChild(c.cancelCtx.Context, c)
	}
	c.mu.Lock()
	if c.timer != nil { // 停止 timer,防止在 deadline 时,会再次取消
		c.timer.Stop()  
		c.timer = nil
	}
	c.mu.Unlock()
}

timerCtx 基于 cancelCtx,只是多了 timer 和 deadline 参数,并重写了 Deadline 和 cancel 方法。

WithDeadline

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// 如果父节点 context 的 deadline 早于指定时间,直接构建一个可取消的 context。
		// 原因是一旦父节点超时,自动调用 cancel 函数,子节点也会随之取消。
		// 所以不用单独处理子节点的计时器时间到了之后,自动调用 cancel 函数
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
    }
    // 挂靠到父节点上
    propagateCancel(parent, c)
    
    // 计算当前距离 deadline 的时间
	dur := time.Until(d)
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
        // d 时间后,timer 会自动调用 cancel 函数,自动取消
        // WithDeadline 方法的核心逻辑
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

WithTimeout

1
2
3
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

WithTimeout 本质还是调用 WithDeadline 方法。

valueCtx

结构体

1
2
3
4
type valueCtx struct {
	Context
	key, val interface{}
}

valueCtx 结构体添加 key、value 字段,用来存储 k-v 信息。

valueCtx 直接将 Context 作为匿名字段,因此尽管只实现了 Value 方法,其他方法继承自父 context,但它仍然是一个 Context,这是 Go 语言的一个特点。

Value 方法

1
2
3
4
5
6
7
8
func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key { // 比较当前 c.key 是否是 key,是的话直接返回,否则继续递归寻找
		return c.val
    }
    
    // 递归寻找 key 对应的 value,不存在时返回 nil
    return c.Context.Value(key)
}

WithValue 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func WithValue(parent Context, key, val interface{}) Context {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if key == nil {
		panic("nil key")
    }
    // key 必须可比较,否则 panic
	if !reflectlite.TypeOf(key).Comparable() {
		panic("key is not comparable")
    }
    // 存储 k-v,并返回一个新的 valueCtx
	return &valueCtx{parent, key, val}
}

WithValue 对 key 的要求是可比较,因为之后需要通过 key 取出 context 中的值,可比较是必须的。

valueCtx 寻值过程有几点要注意:

  • 父节点没法获取子节点存储的值,子节点却可以获取父节点的值;
  • 递归寻找,依次比较每个父节点的 key;
  • 递归至根节点(emptyCtx)时,返回 nil,所以用 Value 方法的时候要判断结果是否为 nil;
  • 整体上而言,用 WithValue 构造的其实是一个低效率的链表;

如何使用

context 对外暴露了一下方法:

1
2
3
4
5
6
func Background() Context
func TODO() Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

在官方博客里提出了几点使用建议:

  • 不要将 Context 塞到结构体里。直接将 Context 类型作为函数的第一参数,而且一般都命名为 ctx。
  • 不要向函数传入一个 nil 的 context,如果你实在不知道传什么,标准库给你准备好了一个 context:todo。
  • 不要把本应该作为函数参数的类型塞到 context 中,context 存储的应该是一些共同的数据。例如:登陆的 session、cookie 等。
  • 同一个 context 可能会被传递到多个 goroutine,别担心,context 是并发安全的。

传递共享的数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type ctxKey string

func main() {
	ctx := context.TODO()
	ctx = context.WithValue(ctx, ctxKey("name"), "maratrix")
	ctx = context.WithValue(ctx, ctxKey("sex"), "male")

	fmt.Println("name:", ctx.Value(ctxKey("name")).(string))
	fmt.Println("sex:", ctx.Value(ctxKey("sex")).(string))
}

当我们使用 WithValue 保存数据时,官方建议:

  • key 必须要有可比性;
  • 不应该是字符串或者内置的其他类型,以避免上下文包之间的使用冲突;
  • 用户应该定义自己的 key 类型;

使用 cancelCtx 取消

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
	worker := func(ctx context.Context) {
		ch := make(chan struct{})
	jobLoop:
		for {
			select {
			case <-ch:
				fmt.Println("job doing")
			case <-ctx.Done():
				fmt.Println("context cancel")
				break jobLoop
			}
		}
	}

	ctx, cancel := context.WithCancel(context.TODO())
	go worker(ctx)

	cancel()
	time.Sleep(time.Second)
}

有了 cancelCtx 就可以很方便取消 goroutine 了,核心逻辑是这里:

1
2
3
4
5
6
7
select {
    case <-ch:
        fmt.Println("job doing")
    case <-ctx.Done():
        fmt.Println("context cancel")
        break jobLoop
}

这里我们一边消费自己的 job channel,一边还需要监听 ctx.Done(),如果不监听 ctx.Done(),那显然也就不知道什么时候需要退出了。

使用 timerCtx 超时取消

使用 WithDeadline 和 WithTimeout 都可以生成一个 timerCtx:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func main() {
	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
	defer cancel()

	select {
	case <-ctx.Done():
		fmt.Println(ctx.Err())
	case <-time.After(200 * time.Millisecond):
		fmt.Println("after timeout")
	}
}

timerCtx 有几个点需要注意:

  • 每次执行都会创建新的 timer
  • 子节点的 deadline 一定不会超过父节点
  • 创建过程中发现已经过期了,立刻返回

性能问题

如果不通过 WithCancel 来复制通知 channel,大家都使用同一个 ctx.Done,那么实际上是在争一把大锁,在一些场景可能会有性能问题。

来看一个实例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func main() {
	ctx, _ := context.WithCancel(context.TODO())
	for i := 0; i < 100; i++ {
		go func() {
			select {
			case <-ctx.Done():
			}
		}()
	}
	time.Sleep(time.Hour)
}

小结

到这里,整个 context 包的内容就全部讲完了。源码非常短,很适合学习。

context 包是 Go 1.7 引入的标准库,主要用于在 goroutine 之间传递取消信号、超时时间、截止时间以及一些共享的值等。它并不是太完美,但几乎成了并发控制和超时控制的标准做法。

使用上,先创建一个根节点的 context,之后根据库提供的四个函数创建相应功能的子节点 context。由于它是并发安全的,所以可以放心地传递。

当使用 context 作为函数参数时,直接把它放在第一个参数的位置,并且命名为 ctx。另外,不要把 context 嵌套在自定义的类型里。

最后,大家下次在看到代码里有用到 context 的,观察下是怎么使用的,肯定逃不出上面讲的几种类型。熟悉之后会发现:context 可能并不完美,但它确实简洁高效地解决了问题

参考资料

Go context 官方博文

深度解密 Go 语言之 context

曹大的 context 源码分析

Go:从 context 源码领悟接口的设计