上一篇我们分析 hystrix-go 主流程时知道,对于每个 command 都有一个对应的断路器,而且在 GoC 中首先要通过 GetCircuit(name)
获取断路器对象,该方法会在断路器不存在时新建一个,并赋值给 cmd.circuit。
回顾下 command 结构体:
1
2
3
4
5
|
type command struct {
// ...
circuit *CircuitBreaker // 断路器
// ...
}
|
今天我们来看下断路器 CircuitBreaker 是如何实现的。
源码实现
CircuitBreaker 配置
首先看下断路器结构体:
1
2
3
4
5
6
7
8
9
10
|
// CircuitBreaker 断路器,根据当前的健康度来决定放行还是熔断
type CircuitBreaker struct {
Name string // 自定义的断路器名字,也就是 ConfigureCommand 传入的 name
open bool // 是否为打开状态
forceOpen bool // 是否强制打开断路器
mutex *sync.RWMutex // 保护锁
openedOrLastTestedTime int64 // 上一次尝试检查服务是否恢复的时间戳
executorPool *executorPool // 令牌池+相关运行统计指标,后续文章分析
metrics *metricExchange // 统计收集器
}
|
CircuitBreaker 结构体已经做详细注释,还是比较简单的,其中 executorPool 和 metrics 作用分别是流量控制和收集上报事件,会在后续文章详细介绍,这里只需要知道用途即可,不影响往下阅读。
GetCircuit 方法
前面提到我们通过 GetCircuit(name) 来获取断路器,看下内部如何实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// GetCircuit 根据 name 命令参数返回熔断器对象,或者在不存在时创建它
func GetCircuit(name string) (*CircuitBreaker, bool, error) {
circuitBreakersMutex.RLock()
_, ok := circuitBreakers[name]
// 断路器不存在时创建
if !ok {
circuitBreakersMutex.RUnlock()
circuitBreakersMutex.Lock()
defer circuitBreakersMutex.Unlock()
// 由于需要先释放读锁,然后加写锁,所以二次检查防止加写锁期间其他协程已经创建
if cb, ok := circuitBreakers[name]; ok {
return cb, false, nil
}
// 新建熔断器
circuitBreakers[name] = newCircuitBreaker(name)
} else {
defer circuitBreakersMutex.RUnlock()
}
// 参数1: 断路器对象
// 参数2: 是否为新建
// 参数3: 错误信息
return circuitBreakers[name], !ok, nil
}
|
整体逻辑就是,首先看下是否存在名字为 name 的断路器,有的话直接取出来返回;没有的话新建一个断路器对象,保存起来并返回。
但要注意的是为什么要两次加锁?
- 使用读锁:检查名字为 name 的断路器是否存在,不存在时需要写入操作,所以需要释放读锁,添加写锁。
- 使用写锁:为了防止加写锁期间其他协程已经创建了名字为 name 的断路器,所以这里二次检查下,有就取出并返回;
创建断路器:newCircuitBreaker
使用 newCircuitBreaker 创建断路器,代码很简单:
1
2
3
4
5
6
7
8
9
10
|
// 创建断路器
func newCircuitBreaker(name string) *CircuitBreaker {
c := &CircuitBreaker{}
c.Name = name
c.metrics = newMetricExchange(name) // 创建统计收集器对象
c.executorPool = newExecutorPool(name) // 创建流量控制对象
c.mutex = &sync.RWMutex{}
return c
}
|
AllowRequest 方法
在上一篇文章,我们知道通过调用 cmd.circuit.AllowRequest()
来判断当前请求时熔断还是放行,这里看下内部实现原理。
1
2
3
4
5
|
func (circuit *CircuitBreaker) AllowRequest() bool {
// 当断路器打开时,会判断是否允许尝试检查服务已经恢复
// 当断路器关闭时,直接返回 true
return !circuit.IsOpen() || circuit.allowSingleTest()
}
|
代码逻辑为:
- 首先通过
!circuit.IsOpen()
检查断路器为关闭状态,是则直接返回 true;
- 当
!circuit.IsOpen()
为 false,即断路器为开启状态时,再检查是否允许尝试检查服务已经恢复
我们分别看下 IsOpen()
和 allowSingleTest()
方法。
IsOpen 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// IsOpen 需要在每次执行调用前来检查是否可以尝试执行,IsOpen 为 true 的话表示断路器为打开状态,即禁止尝试执行
func (circuit *CircuitBreaker) IsOpen() bool {
circuit.mutex.RLock()
o := circuit.forceOpen || circuit.open // 优先检查是否强制打开
circuit.mutex.RUnlock()
// 一旦为打开状态,直接返回 true
if o {
return true
}
// 判断是否触发开启断路器检查的最小请求量(相当于 sentinel 的静默请求数量)
// 如果还没到达 RequestVolumeThreshold 请求量,直接放行,不做断路器检查
if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
return false
}
// 根据当前的错误率判断健康度
// 当不是健康状态时,打开断路器开关,并返回 true
if !circuit.metrics.IsHealthy(time.Now()) {
// 太多的错误会触发断路器为打开状态
circuit.setOpen()
return true
}
// 当为健康状态时,返回 false
return false
}
|
首先,通过 forceOpen 和 open 参数看下断路器是否为打开状态,是则直接返回 true。
然后,判断当前是否触发开启断路器检查的最小请求量(相当于 sentinel 的静默请求数量),如果还没到达 RequestVolumeThreshold 请求量,直接放行,不做断路器检查。这里是为了防止服务刚启动时,请求量较少,一旦有错误就会有较高的错误率,容易误判。
最后,通过调用 circuit.metrics.IsHealthy(time.Now())
来判断当前的健康度,当健康度低于设置的阈值时,触发熔断并设置当前断路器为打开状态。
否则,返回 false。
allowSingleTest 方法
当断路器为打开状态时,我们总要有策略来尝试检查服务是否已经恢复,不能一直处于熔断状态,allowSingleTest 的作用就是判断是否允许服务已经恢复。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// 判断是否允许检查服务已经恢复
func (circuit *CircuitBreaker) allowSingleTest() bool {
circuit.mutex.RLock()
defer circuit.mutex.RUnlock()
now := time.Now().UnixNano()
openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
// 当断路器为打开状态 && 已经经过了允许尝试时间窗口 && cas 成功,才允许进行尝试检查服务已经恢复
if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)
if swapped {
log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
}
return swapped
}
return false
}
|
circuit.openedOrLastTestedTime
:保存最近一次触发熔断或者尝试服务是否已经恢复的时间。
getSettings(circuit.Name).SleepWindow
:为全局时间窗口,当熔断器被打开后,控制过多久后去尝试探测服务是否可用了,默认值是 5000ms。
allowSingleTest 逻辑为当断路器为打开状态 && 已经过了允许尝试时间窗口 && CAS 更新为当前时间成功,才允许进行尝试检查服务是否已经恢复。
上报事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
// ReportEvent 记录统计指标,以跟踪最近的错误率并将数据公开给仪表板
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
if len(eventTypes) == 0 {
return fmt.Errorf("no event types sent for metrics")
}
circuit.mutex.RLock()
o := circuit.open
circuit.mutex.RUnlock()
// 当为成功事件 && 当前断路器状态为打开时,将断路器状态置为关闭状态
if eventTypes[0] == "success" && o {
circuit.setClose()
}
var concurrencyInUse float64
if circuit.executorPool.Max > 0 {
// 计算当前的并发量和支持的最大并发比率
concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
}
select {
case circuit.metrics.Updates <- &commandExecution{ // 发送命令执行的指标数据
Types: eventTypes, // 事件列表
Start: start, // 开始事件
RunDuration: runDuration, // 执行耗时
ConcurrencyInUse: concurrencyInUse, // 当前的并发占比
}:
default:
return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
}
return nil
}
|
ReportEvent 方法首先检查当前为成功事件 && 断路器状态为打开时,将断路器状态置为关闭状态。
1
2
3
|
if eventTypes[0] == "success" && o {
circuit.setClose()
}
|
其中这块逻辑个人觉得不太适合,只有一次成功请求就要关闭断路器么?应该统计过去一段时间内的成功率才对吧?
然后,计算当前的并发请求占比,用于统计和看板展示,没有其他用处。
最后,上报给 circuit.metrics.Updates
统计控制器,用于计算服务健康度和看板展示。
参考
https://github.com/afex/hystrix-go