上一篇 我们分析断路器时有用到 metricExchange,但并未展开详细介绍,本篇我们主要介绍它以及实现原理。

metricExchange 主要是用来收集处理上报的所有事件,并对事件进行汇总处理,最后根据计算出来的一段时间内的错误率来表示服务的健康度。下面我们看下其源码实现。

源码分析

metricExchange

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type metricExchange struct {
	Name    string
	Updates chan *commandExecution  // 上报的信息
	Mutex   *sync.RWMutex

	metricCollectors []metricCollector.MetricCollector  // 统计收集器列表
}

type commandExecution struct {
	Types            []string      `json:"types"`             // 事件列表
	Start            time.Time     `json:"start_time"`        // 开始时间
	RunDuration      time.Duration `json:"run_duration"`      // 运行耗时
	ConcurrencyInUse float64       `json:"concurrency_inuse"` // 当前并发量占比
}

这里我们主要看下 Updates 和 metricCollectors 字段:

  • Updates 是用来接收 *commandExecution 对象信息的,包括开始时间、运行耗时、并发占比
  • metricCollectors 可以包含多个统计收集器,对上报来的运行指标就行收集处理,默认只包含 newDefaultMetricCollector 收集器

Monior 方法

在执行 newMetricExchange 的时候会启动一个协程 go m.Monitor() 去监控 Updates 的数据,然后上报给 metricCollectors 保存执行的信息数据比如前面提到的调用次数,失败次数,被拒绝次数等等。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (m *metricExchange) Monitor() {
	for update := range m.Updates {
		// we only grab a read lock to make sure Reset() isn't changing the numbers.
		m.Mutex.RLock()

		totalDuration := time.Since(update.Start)
		wg := &sync.WaitGroup{}
		for _, collector := range m.metricCollectors {
			wg.Add(1)
			go m.IncrementMetrics(wg, collector, update, totalDuration)
		}
		wg.Wait()

		m.Mutex.RUnlock()
	}
}

通过上一篇的分析,我们知道在断路器中的 ReportEvent 方法中会进行指标上报:

1
2
3
4
5
6
circuit.metrics.Updates <- &commandExecution{ // 发送命令执行的指标数据
    Types:            eventTypes,       // 事件列表
    Start:            start,            // 开始事件
    RunDuration:      runDuration,      // 执行耗时
    ConcurrencyInUse: concurrencyInUse, // 当前的并发占比
}

而对 circuit.metrics.Updates 的接收便是在 Monitor 方法中,这下便能对应起来了。

Montor 的执行也很简单,首先是遍历 metricCollectors 收集器列表,然后对每个收集器分别执行 go m.IncrementMetrics(wg, collector, update, totalDuration) 进行收集处理。

IncrementMetrics 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
	// granular metrics
	r := metricCollector.MetricResult{
		Attempts:         1,
		TotalDuration:    totalDuration,
		RunDuration:      update.RunDuration,
		ConcurrencyInUse: update.ConcurrencyInUse,
	}

	switch update.Types[0] {
	case "success":
		r.Successes = 1
	case "failure":
		r.Failures = 1
		r.Errors = 1
	case "rejected":
    // ...

	collector.Update(r)

	wg.Done()
}

IncrementMetrics 对不同的事件进行计数,最后调用收集器的 collector.Update(r) 方法进行汇总,关于默认收集器的实现,我们在下面分析。

IsHealthy 方法

通过之前的分析我们知道,断路器是调用 IsHealthy 方法来判断健康度,然后进行熔断的,我们看下其实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// 计算最近 10s 内的请求错误率
func (m *metricExchange) ErrorPercent(now time.Time) int {
	m.Mutex.RLock()
	defer m.Mutex.RUnlock()

	var errPct float64
	reqs := m.requestsLocked().Sum(now)            // 计算过去 10s 内累计的请求数
	errs := m.DefaultCollector().Errors().Sum(now) // 计算过去 10s 内累计的请求失败数

	if reqs > 0 {
		errPct = (float64(errs) / float64(reqs)) * 100
	}

	// 向上取整,好奇为什么不直接用 math.Ceil(errPct)
	return int(errPct + 0.5)
}

// 根据错误率计算当前健康度,当错误率到达阈值返回 false,未达到阈值返回 true
func (m *metricExchange) IsHealthy(now time.Time) bool {
	return m.ErrorPercent(now) < getSettings(m.Name).ErrorPercentThreshold
}

首先,调用 m.ErrorPercent(now) 获取默认收集器过去 10s 内的请求总数和请求错误数,然后计算出错误率,并做向上取整。

然后,获取 hystrix-go 配置的错误率阈值 ErrorPercentThreshold

一旦 m.ErrorPercent(now) < getSettings(m.Name).ErrorPercentThreshold 表明当前服务的错误率超过了阈值,即健康度偏低,这样简单粗暴的方式来判断是否需要熔断的。

默认收集器

该仓库除了提供默认的收集器外,还提供了其他几个收集器,其实只要实现了 MetricCollector 接口都可以作为收集器。

1
2
3
4
5
6
type MetricCollector interface {
	// Update accepts a set of metrics from a command execution for remote instrumentation
	Update(MetricResult)
	// Reset resets the internal counters and timers.
	Reset()
}

我们主要看下默认的收集器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type DefaultMetricCollector struct {
	mutex *sync.RWMutex

	numRequests *rolling.Number // 统计请求总数
	errors      *rolling.Number // 统计错误总数

	successes               *rolling.Number // 统计请求成功数
	failures                *rolling.Number // 统计请求失败数
	rejects                 *rolling.Number // 统计请求被拒绝数(触发限流)
	shortCircuits           *rolling.Number // 统计请求被熔断数
	timeouts                *rolling.Number // 统计请求超时数
	contextCanceled         *rolling.Number // 统计请求 ctx 取消数
	contextDeadlineExceeded *rolling.Number // 统计请求 ctx 超时数

	fallbackSuccesses *rolling.Number // 统计 fallback 成功数
	fallbackFailures  *rolling.Number // 统计 fallback 失败数
	totalDuration     *rolling.Timing // 统计所有请求总耗时
	runDuration       *rolling.Timing // 统计当个请求耗时
}

这里我们主要看下 *rolling.Number,它才是状态最终保存的地方。

1
2
3
4
5
6
7
8
type Number struct {
	Buckets map[int64]*numberBucket
	Mutex   *sync.RWMutex
}

type numberBucket struct {
	Value float64
}

Number 保存了 10 秒内的 Buckets 数据信息,每一个 Bucket 的统计时长为 1 秒。字典字段 Buckets map[int64]*numberBucket 中的 Key 保存的是当前时间(秒时间戳)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 获取当前时间戳对应的 numberBucket,不存在时创建
func (r *Number) getCurrentBucket() *numberBucket {
	now := time.Now().Unix()
	var bucket *numberBucket
	var ok bool

	if bucket, ok = r.Buckets[now]; !ok {
		bucket = &numberBucket{}
		r.Buckets[now] = bucket
	}

	return bucket
}

// 移除 10s 以前的 numberBucket
func (r *Number) removeOldBuckets() {
	now := time.Now().Unix() - 10

	for timestamp := range r.Buckets {
		// TODO: configurable rolling window
		if timestamp <= now {
			delete(r.Buckets, timestamp)
		}
	}
}

// 增加当前时间对应的 numberBucket 值
func (r *Number) Increment(i float64) {
	if i == 0 {
		return
	}

	r.Mutex.Lock()
	defer r.Mutex.Unlock()

	b := r.getCurrentBucket() // 获取当前 numberBucket
	b.Value += i              // numberBucket.Value += i
	r.removeOldBuckets()      // 移除 10s 以前的 numberBucket
}

*Number 除了上面的三个方法,还实现了 UpdateMax、Sum、Max、Avg 方法,代码比较简单,这里不再说明。

默认收集器 DefaultMetricCollector 的 Update 方法才是最终更新统计信息的地方。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (d *DefaultMetricCollector) Update(r MetricResult) {
	d.mutex.RLock()
	defer d.mutex.RUnlock()

	d.numRequests.Increment(r.Attempts)
	d.errors.Increment(r.Errors)
	d.successes.Increment(r.Successes)
	d.failures.Increment(r.Failures)
	d.rejects.Increment(r.Rejects)
	d.shortCircuits.Increment(r.ShortCircuits)
	d.timeouts.Increment(r.Timeouts)
	d.fallbackSuccesses.Increment(r.FallbackSuccesses)
	d.fallbackFailures.Increment(r.FallbackFailures)
	d.contextCanceled.Increment(r.ContextCanceled)
	d.contextDeadlineExceeded.Increment(r.ContextDeadlineExceeded)

	d.totalDuration.Add(r.TotalDuration)
	d.runDuration.Add(r.RunDuration)
}

可以看到,Update 方法内部其实是调用的 *rolling.Number 的 Increment 方法来更新数据的。

参考

https://github.com/afex/hystrix-go

雪崩利器 hystrix-go 源码分析