上一篇我们分析 hystrix-go 主流程时知道,对于每个 command 都有一个对应的断路器,而且在 GoC 中首先要通过 GetCircuit(name) 获取断路器对象,该方法会在断路器不存在时新建一个,并赋值给 cmd.circuit。

回顾下 command 结构体:

1
2
3
4
5
type command struct {
    // ...
    circuit *CircuitBreaker // 断路器
    // ...
}

今天我们来看下断路器 CircuitBreaker 是如何实现的。

源码实现

CircuitBreaker 配置

首先看下断路器结构体:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// CircuitBreaker 断路器,根据当前的健康度来决定放行还是熔断
type CircuitBreaker struct {
	Name                   string           // 自定义的断路器名字,也就是 ConfigureCommand 传入的 name
	open                   bool             // 是否为打开状态
	forceOpen              bool             // 是否强制打开断路器
	mutex                  *sync.RWMutex    // 保护锁
	openedOrLastTestedTime int64            // 上一次尝试检查服务是否恢复的时间戳
	executorPool           *executorPool    // 令牌池+相关运行统计指标,后续文章分析
	metrics                *metricExchange  // 统计收集器
}

CircuitBreaker 结构体已经做详细注释,还是比较简单的,其中 executorPool 和 metrics 作用分别是流量控制和收集上报事件,会在后续文章详细介绍,这里只需要知道用途即可,不影响往下阅读。

GetCircuit 方法

前面提到我们通过 GetCircuit(name) 来获取断路器,看下内部如何实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// GetCircuit 根据 name 命令参数返回熔断器对象,或者在不存在时创建它
func GetCircuit(name string) (*CircuitBreaker, bool, error) {
	circuitBreakersMutex.RLock()
	_, ok := circuitBreakers[name]

	// 断路器不存在时创建
	if !ok {
		circuitBreakersMutex.RUnlock()
		circuitBreakersMutex.Lock()
		defer circuitBreakersMutex.Unlock()

		// 由于需要先释放读锁,然后加写锁,所以二次检查防止加写锁期间其他协程已经创建
		if cb, ok := circuitBreakers[name]; ok {
			return cb, false, nil
		}

		// 新建熔断器
		circuitBreakers[name] = newCircuitBreaker(name)
	} else {
		defer circuitBreakersMutex.RUnlock()
	}

	// 参数1: 断路器对象
	// 参数2: 是否为新建
	// 参数3: 错误信息
	return circuitBreakers[name], !ok, nil
}

整体逻辑就是,首先看下是否存在名字为 name 的断路器,有的话直接取出来返回;没有的话新建一个断路器对象,保存起来并返回。

但要注意的是为什么要两次加锁?

  • 使用读锁:检查名字为 name 的断路器是否存在,不存在时需要写入操作,所以需要释放读锁,添加写锁。
  • 使用写锁:为了防止加写锁期间其他协程已经创建了名字为 name 的断路器,所以这里二次检查下,有就取出并返回;

创建断路器:newCircuitBreaker

使用 newCircuitBreaker 创建断路器,代码很简单:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 创建断路器
func newCircuitBreaker(name string) *CircuitBreaker {
	c := &CircuitBreaker{}
	c.Name = name
	c.metrics = newMetricExchange(name)    // 创建统计收集器对象
	c.executorPool = newExecutorPool(name) // 创建流量控制对象
	c.mutex = &sync.RWMutex{}

	return c
}

AllowRequest 方法

在上一篇文章,我们知道通过调用 cmd.circuit.AllowRequest()来判断当前请求时熔断还是放行,这里看下内部实现原理。

1
2
3
4
5
func (circuit *CircuitBreaker) AllowRequest() bool {
	// 当断路器打开时,会判断是否允许尝试检查服务已经恢复
	// 当断路器关闭时,直接返回 true
	return !circuit.IsOpen() || circuit.allowSingleTest()
}

代码逻辑为:

  • 首先通过 !circuit.IsOpen() 检查断路器为关闭状态,是则直接返回 true;
  • !circuit.IsOpen() 为 false,即断路器为开启状态时,再检查是否允许尝试检查服务已经恢复

我们分别看下 IsOpen()allowSingleTest() 方法。

IsOpen 方法
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// IsOpen 需要在每次执行调用前来检查是否可以尝试执行,IsOpen 为 true 的话表示断路器为打开状态,即禁止尝试执行
func (circuit *CircuitBreaker) IsOpen() bool {
	circuit.mutex.RLock()
	o := circuit.forceOpen || circuit.open // 优先检查是否强制打开
	circuit.mutex.RUnlock()

	// 一旦为打开状态,直接返回 true
	if o {
		return true
	}

	// 判断是否触发开启断路器检查的最小请求量(相当于 sentinel 的静默请求数量)
	// 如果还没到达 RequestVolumeThreshold 请求量,直接放行,不做断路器检查
	if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
		return false
	}

	// 根据当前的错误率判断健康度
	// 当不是健康状态时,打开断路器开关,并返回 true
	if !circuit.metrics.IsHealthy(time.Now()) {
		// 太多的错误会触发断路器为打开状态
		circuit.setOpen()
		return true
	}

	// 当为健康状态时,返回 false
	return false
}

首先,通过 forceOpen 和 open 参数看下断路器是否为打开状态,是则直接返回 true。

然后,判断当前是否触发开启断路器检查的最小请求量(相当于 sentinel 的静默请求数量),如果还没到达 RequestVolumeThreshold 请求量,直接放行,不做断路器检查。这里是为了防止服务刚启动时,请求量较少,一旦有错误就会有较高的错误率,容易误判。

最后,通过调用 circuit.metrics.IsHealthy(time.Now()) 来判断当前的健康度,当健康度低于设置的阈值时,触发熔断并设置当前断路器为打开状态。

否则,返回 false。

allowSingleTest 方法

当断路器为打开状态时,我们总要有策略来尝试检查服务是否已经恢复,不能一直处于熔断状态,allowSingleTest 的作用就是判断是否允许服务已经恢复。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 判断是否允许检查服务已经恢复
func (circuit *CircuitBreaker) allowSingleTest() bool {
	circuit.mutex.RLock()
	defer circuit.mutex.RUnlock()

	now := time.Now().UnixNano()
	openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)

	// 当断路器为打开状态 && 已经经过了允许尝试时间窗口 && cas 成功,才允许进行尝试检查服务已经恢复
	if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
		swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)
		if swapped {
			log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
		}
		return swapped
	}

	return false
}

circuit.openedOrLastTestedTime:保存最近一次触发熔断或者尝试服务是否已经恢复的时间。

getSettings(circuit.Name).SleepWindow:为全局时间窗口,当熔断器被打开后,控制过多久后去尝试探测服务是否可用了,默认值是 5000ms。

allowSingleTest 逻辑为当断路器为打开状态 && 已经过了允许尝试时间窗口 && CAS 更新为当前时间成功,才允许进行尝试检查服务是否已经恢复。

上报事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// ReportEvent 记录统计指标,以跟踪最近的错误率并将数据公开给仪表板
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
	if len(eventTypes) == 0 {
		return fmt.Errorf("no event types sent for metrics")
	}

	circuit.mutex.RLock()
	o := circuit.open
	circuit.mutex.RUnlock()

	// 当为成功事件 && 当前断路器状态为打开时,将断路器状态置为关闭状态
	if eventTypes[0] == "success" && o {
		circuit.setClose()
	}

	var concurrencyInUse float64
	if circuit.executorPool.Max > 0 {
		// 计算当前的并发量和支持的最大并发比率
		concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
	}

	select {
	case circuit.metrics.Updates <- &commandExecution{ // 发送命令执行的指标数据
		Types:            eventTypes,       // 事件列表
		Start:            start,            // 开始事件
		RunDuration:      runDuration,      // 执行耗时
		ConcurrencyInUse: concurrencyInUse, // 当前的并发占比
	}:
	default:
		return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
	}

	return nil
}

ReportEvent 方法首先检查当前为成功事件 && 断路器状态为打开时,将断路器状态置为关闭状态。

1
2
3
if eventTypes[0] == "success" && o {
    circuit.setClose()
}

其中这块逻辑个人觉得不太适合,只有一次成功请求就要关闭断路器么?应该统计过去一段时间内的成功率才对吧?

然后,计算当前的并发请求占比,用于统计和看板展示,没有其他用处。

最后,上报给 circuit.metrics.Updates 统计控制器,用于计算服务健康度和看板展示。

参考

https://github.com/afex/hystrix-go