上一篇 我们分析断路器时有用到 metricExchange,但并未展开详细介绍,本篇我们主要介绍它以及实现原理。
metricExchange 主要是用来收集处理上报的所有事件,并对事件进行汇总处理,最后根据计算出来的一段时间内的错误率来表示服务的健康度。下面我们看下其源码实现。
源码分析
metricExchange
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
type metricExchange struct {
Name string
Updates chan *commandExecution // 上报的信息
Mutex *sync.RWMutex
metricCollectors []metricCollector.MetricCollector // 统计收集器列表
}
type commandExecution struct {
Types []string `json:"types"` // 事件列表
Start time.Time `json:"start_time"` // 开始时间
RunDuration time.Duration `json:"run_duration"` // 运行耗时
ConcurrencyInUse float64 `json:"concurrency_inuse"` // 当前并发量占比
}
|
这里我们主要看下 Updates 和 metricCollectors 字段:
- Updates 是用来接收 *commandExecution 对象信息的,包括开始时间、运行耗时、并发占比
- metricCollectors 可以包含多个统计收集器,对上报来的运行指标就行收集处理,默认只包含 newDefaultMetricCollector 收集器
Monior 方法
在执行 newMetricExchange 的时候会启动一个协程 go m.Monitor()
去监控 Updates 的数据,然后上报给 metricCollectors 保存执行的信息数据比如前面提到的调用次数,失败次数,被拒绝次数等等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (m *metricExchange) Monitor() {
for update := range m.Updates {
// we only grab a read lock to make sure Reset() isn't changing the numbers.
m.Mutex.RLock()
totalDuration := time.Since(update.Start)
wg := &sync.WaitGroup{}
for _, collector := range m.metricCollectors {
wg.Add(1)
go m.IncrementMetrics(wg, collector, update, totalDuration)
}
wg.Wait()
m.Mutex.RUnlock()
}
}
|
通过上一篇的分析,我们知道在断路器中的 ReportEvent 方法中会进行指标上报:
1
2
3
4
5
6
|
circuit.metrics.Updates <- &commandExecution{ // 发送命令执行的指标数据
Types: eventTypes, // 事件列表
Start: start, // 开始事件
RunDuration: runDuration, // 执行耗时
ConcurrencyInUse: concurrencyInUse, // 当前的并发占比
}
|
而对 circuit.metrics.Updates
的接收便是在 Monitor 方法中,这下便能对应起来了。
Montor 的执行也很简单,首先是遍历 metricCollectors 收集器列表,然后对每个收集器分别执行 go m.IncrementMetrics(wg, collector, update, totalDuration)
进行收集处理。
IncrementMetrics 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
// granular metrics
r := metricCollector.MetricResult{
Attempts: 1,
TotalDuration: totalDuration,
RunDuration: update.RunDuration,
ConcurrencyInUse: update.ConcurrencyInUse,
}
switch update.Types[0] {
case "success":
r.Successes = 1
case "failure":
r.Failures = 1
r.Errors = 1
case "rejected":
// ...
collector.Update(r)
wg.Done()
}
|
IncrementMetrics 对不同的事件进行计数,最后调用收集器的 collector.Update(r)
方法进行汇总,关于默认收集器的实现,我们在下面分析。
IsHealthy 方法
通过之前的分析我们知道,断路器是调用 IsHealthy 方法来判断健康度,然后进行熔断的,我们看下其实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// 计算最近 10s 内的请求错误率
func (m *metricExchange) ErrorPercent(now time.Time) int {
m.Mutex.RLock()
defer m.Mutex.RUnlock()
var errPct float64
reqs := m.requestsLocked().Sum(now) // 计算过去 10s 内累计的请求数
errs := m.DefaultCollector().Errors().Sum(now) // 计算过去 10s 内累计的请求失败数
if reqs > 0 {
errPct = (float64(errs) / float64(reqs)) * 100
}
// 向上取整,好奇为什么不直接用 math.Ceil(errPct)
return int(errPct + 0.5)
}
// 根据错误率计算当前健康度,当错误率到达阈值返回 false,未达到阈值返回 true
func (m *metricExchange) IsHealthy(now time.Time) bool {
return m.ErrorPercent(now) < getSettings(m.Name).ErrorPercentThreshold
}
|
首先,调用 m.ErrorPercent(now)
获取默认收集器过去 10s 内的请求总数和请求错误数,然后计算出错误率,并做向上取整。
然后,获取 hystrix-go 配置的错误率阈值 ErrorPercentThreshold
。
一旦 m.ErrorPercent(now) < getSettings(m.Name).ErrorPercentThreshold
表明当前服务的错误率超过了阈值,即健康度偏低,这样简单粗暴的方式来判断是否需要熔断的。
默认收集器
该仓库除了提供默认的收集器外,还提供了其他几个收集器,其实只要实现了 MetricCollector
接口都可以作为收集器。
1
2
3
4
5
6
|
type MetricCollector interface {
// Update accepts a set of metrics from a command execution for remote instrumentation
Update(MetricResult)
// Reset resets the internal counters and timers.
Reset()
}
|
我们主要看下默认的收集器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
type DefaultMetricCollector struct {
mutex *sync.RWMutex
numRequests *rolling.Number // 统计请求总数
errors *rolling.Number // 统计错误总数
successes *rolling.Number // 统计请求成功数
failures *rolling.Number // 统计请求失败数
rejects *rolling.Number // 统计请求被拒绝数(触发限流)
shortCircuits *rolling.Number // 统计请求被熔断数
timeouts *rolling.Number // 统计请求超时数
contextCanceled *rolling.Number // 统计请求 ctx 取消数
contextDeadlineExceeded *rolling.Number // 统计请求 ctx 超时数
fallbackSuccesses *rolling.Number // 统计 fallback 成功数
fallbackFailures *rolling.Number // 统计 fallback 失败数
totalDuration *rolling.Timing // 统计所有请求总耗时
runDuration *rolling.Timing // 统计当个请求耗时
}
|
这里我们主要看下 *rolling.Number
,它才是状态最终保存的地方。
1
2
3
4
5
6
7
8
|
type Number struct {
Buckets map[int64]*numberBucket
Mutex *sync.RWMutex
}
type numberBucket struct {
Value float64
}
|
Number 保存了 10 秒内的 Buckets 数据信息,每一个 Bucket 的统计时长为 1 秒。字典字段 Buckets map[int64]*numberBucket
中的 Key 保存的是当前时间(秒时间戳)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
// 获取当前时间戳对应的 numberBucket,不存在时创建
func (r *Number) getCurrentBucket() *numberBucket {
now := time.Now().Unix()
var bucket *numberBucket
var ok bool
if bucket, ok = r.Buckets[now]; !ok {
bucket = &numberBucket{}
r.Buckets[now] = bucket
}
return bucket
}
// 移除 10s 以前的 numberBucket
func (r *Number) removeOldBuckets() {
now := time.Now().Unix() - 10
for timestamp := range r.Buckets {
// TODO: configurable rolling window
if timestamp <= now {
delete(r.Buckets, timestamp)
}
}
}
// 增加当前时间对应的 numberBucket 值
func (r *Number) Increment(i float64) {
if i == 0 {
return
}
r.Mutex.Lock()
defer r.Mutex.Unlock()
b := r.getCurrentBucket() // 获取当前 numberBucket
b.Value += i // numberBucket.Value += i
r.removeOldBuckets() // 移除 10s 以前的 numberBucket
}
|
*Number 除了上面的三个方法,还实现了 UpdateMax、Sum、Max、Avg 方法,代码比较简单,这里不再说明。
默认收集器 DefaultMetricCollector 的 Update 方法才是最终更新统计信息的地方。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func (d *DefaultMetricCollector) Update(r MetricResult) {
d.mutex.RLock()
defer d.mutex.RUnlock()
d.numRequests.Increment(r.Attempts)
d.errors.Increment(r.Errors)
d.successes.Increment(r.Successes)
d.failures.Increment(r.Failures)
d.rejects.Increment(r.Rejects)
d.shortCircuits.Increment(r.ShortCircuits)
d.timeouts.Increment(r.Timeouts)
d.fallbackSuccesses.Increment(r.FallbackSuccesses)
d.fallbackFailures.Increment(r.FallbackFailures)
d.contextCanceled.Increment(r.ContextCanceled)
d.contextDeadlineExceeded.Increment(r.ContextDeadlineExceeded)
d.totalDuration.Add(r.TotalDuration)
d.runDuration.Add(r.RunDuration)
}
|
可以看到,Update 方法内部其实是调用的 *rolling.Number 的 Increment 方法来更新数据的。
参考
https://github.com/afex/hystrix-go
雪崩利器 hystrix-go 源码分析