在分析 hystrix-go 主流程中,我们知道只有当获取令牌成功后才能继续往下正常执行用户自定义的闭包函数,否则还是会进行降级处理。

1
cmd.ticket = <-circuit.executorPool.Tickets: // 获取令牌成功

今天我们分析下,hystrix-go 是如何进行流量控制的。

源码分析

hystrix-go 对流量控制的代码是很简单的。用了一个简单的令牌算法,能得到令牌的就可以执行后继的工作,执行完后要返还令牌。得不到令牌就拒绝,拒绝后调用用户设置的 fallback 方法,如果没有设置就不执行。

executorPool 结构体

1
2
3
4
5
6
type executorPool struct {
	Name    string         // command name
	Metrics *poolMetrics   // 当前运行统计
	Max     int            // 最大令牌数
	Tickets chan *struct{} // 存放令牌信息
}

结构体 executorPool 就是 hystrix-go 流量控制的具体实现。字段 Max 就是每秒最大的并发值。

这里我们只需要关心 Tickets 和 Metrics 即可。

  • Metrics 用来统计上报当前的运行状态信息,下面详细说明
  • Tickets 存放当前的令牌数,只有获取到令牌才能继续执行

初始化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func newExecutorPool(name string) *executorPool {
	p := &executorPool{}
	p.Name = name
	p.Metrics = newPoolMetrics(name)
	p.Max = getSettings(name).MaxConcurrentRequests

	// 初始化令牌池
	p.Tickets = make(chan *struct{}, p.Max)
	for i := 0; i < p.Max; i++ {
		p.Tickets <- &struct{}{}
	}

	return p
}

在创建 executorPool 的时候,会根据 Max 值来创建令牌。Max 值如果没有设置会使用默认值 10。

获取令牌

1
<-circuit.executorPool.Tickets: // 获取令牌成功

获取令牌很简单,直接读取 Tickets channel 即可。

归还令牌

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 归还令牌
func (p *executorPool) Return(ticket *struct{}) {
	if ticket == nil {
		return
	}

	// 通知运行池更新统计指标
	p.Metrics.Updates <- poolMetricsUpdate{
		activeCount: p.ActiveCount(),
	}

	// 放回令牌池
	p.Tickets <- ticket
}
// 计算当前的并发数
func (p *executorPool) ActiveCount() int {
	return p.Max - len(p.Tickets)
}

这里主要有两个操作:

  • 上报当前的并发数
  • 归还令牌

poolMetrics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 断路器并发数统计
type poolMetrics struct {
	Mutex   *sync.RWMutex          // 读写锁
	Updates chan poolMetricsUpdate // 指标更新通知

	Name              string          // command name
	MaxActiveRequests *rolling.Number // 统计最大请求数
	Executed          *rolling.Number // 统计当前运行数
}
type poolMetricsUpdate struct {
	activeCount int
}

这里可以看出 MaxActiveRequests 和 Executed 字段都是 *rolling.Number 类型,之前我们也介绍过,用于统计收集过去 10s 内的指标数据。

newPoolMetrics 初始化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func newPoolMetrics(name string) *poolMetrics {
	m := &poolMetrics{}
	m.Name = name
	m.Updates = make(chan poolMetricsUpdate)
	m.Mutex = &sync.RWMutex{}

	m.Reset()

	go m.Monitor()

	return m
}

func (m *poolMetrics) Monitor() {
	for u := range m.Updates {
		m.Mutex.RLock()

		m.Executed.Increment(1)                               // 递增运行数
		m.MaxActiveRequests.UpdateMax(float64(u.activeCount)) // 更新最大并发数

		m.Mutex.RUnlock()
	}
}

newPoolMetrics 中除了初始化 poolMetrics 结构体外,还通过 go m.Monitor() 来开启协程来监听上报的信息,然后分别做汇总收集:

1
2
m.Executed.Increment(1)                               // 递增运行数
m.MaxActiveRequests.UpdateMax(float64(u.activeCount)) // 更新最大并发数

参考

https://github.com/afex/hystrix-go

雪崩利器 hystrix-go 源码分析