前言

上一篇文章我们介绍了服务保护熔断器 sentinel-go 如何使用,今天介绍另外一款优秀的集限流、熔断、降级于一身的组件 hystrix-go。hystrix-go 是著名开源库 hystrix 的 Go 语言简化版本,实现了核心功能,源码实现也比较简单,很适合阅读学习。

如何使用

下载 hystrix-go

1
go get -v github.com/afex/hystrix-go/hystrix

同步执行

看个示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
	Timeout:               1000,
	MaxConcurrentRequests: 100,
	ErrorPercentThreshold: 25,
})

err := hystrix.Do("my_command", func() error {
	// 业务逻辑
	return nil
}, func(err error) error {
	// 降级逻辑
	return nil
})

首先使用 hystrix.ConfigureCommand 配置熔断策略,然后调用 hystrix.Do 方法即可。

hystrix.Do 方法有三个参数:

  • 第一个是我们配置的命令,即调用 hystrix.ConfigureCommand 时指定的命令,不能为空;
  • 第二个参数时我们的正常业务逻辑函数;不能为空;
  • 第三个参数为 fallback 参数,当发生熔断会调用第三个参数。如果不希望有降级逻辑,那么第三个参数置为 nil 即可;

异步执行

除了支持同步调用外,还支持异步调用,看个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
	Timeout:               1000,
	MaxConcurrentRequests: 100,
	ErrorPercentThreshold: 25,
})


output := make(chan bool, 1)
errors := hystrix.Go("my_command", func() error {
	// 正常业务逻辑
	output <- true
	return nil
}, nil)

select {
case out := <-output:
	// success
case err := <-errors:
	// failure
}

调用 hystrix.Go 就像启动 goroutine,只是它会返回 error chan,以便于我们能接收错误信息。

其他方法

1
2
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error 
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error

这两个方法除了支持 context.Context 参数外,和上面的 Do 和 Go 方法没啥区别。

其实方法 Do 、DoC、 Go 方法内部都是调用了 hystrix.GoC 方法,只是 Do 和 DoC 方法处理了异步的过程。

源码分析

流程图

配置熔断策略

使用 hystrix-go,我们需要先配置熔断策略,该库提供了两个配置方法:

1
2
func Configure(cmds map[string]CommandConfig)               // 添加批量配置
func ConfigureCommand(name string, config CommandConfig)    // 添加单个配置

我们看下 CommandConfig 结构体是如何定义的:

1
2
3
4
5
6
7
type CommandConfig struct {
    Timeout                int `json:"timeout"`                  // 等待 command 完成的时间,默认 1000(ms)
    MaxConcurrentRequests  int `json:"max_concurrent_requests"`  // 同一个 command 支持的并发量,默认 10
    RequestVolumeThreshold int `json:"request_volume_threshold"` // 触发开启熔断的最小请求量,相当于 sentinel 的静默请求数量,默认 20
    SleepWindow            int `json:"sleep_window"`             // 熔断器打开后,控制过多久后去尝试服务是否回复,默认值是 5000(ms)
    ErrorPercentThreshold  int `json:"error_percent_threshold"`  // 错误百分比,当错误比例达到此值后触发熔断,默认 50(%)
}

当然如果不配置上面的字段,会使用默认值。当我们调用 Do 方法传入不存在的 command 时也使用基于默认配置值的 CommandConfig。

hystrix.Do 方法

我们先看下 hystrix.Do 方法的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Do 以同步阻塞方式调用 run 函数,直到返回成功或者失败,当然也包含触发熔断错误
func Do(name string, run runFunc, fallback fallbackFunc) error {
	// 包装正常函数的调用
	runC := func(ctx context.Context) error {
		return run()
	}

	// 当定义了 fallback 函数,进进行包装
	var fallbackC fallbackFuncC
	if fallback != nil {
		fallbackC = func(ctx context.Context, err error) error {
			return fallback(err)
		}
	}

	// 调用 DoC
	return DoC(context.Background(), name, runC, fallbackC)
}

代码比较简单,只是对 run 和 fallback 做了方法包装,以便于底层统一处理,Do 最后调的是 DoC 方法。

hystrix.DoC 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// DoC 以同步阻塞方式调用 run 函数,直到返回成功或者失败,当然也包含触发熔断错误
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
	// 初始化 done chan 用于接收完成信息,当失败时不会往 done chan 写入数据
	done := make(chan struct{}, 1)

	// 对正常的 run 函数进行二次包装,只有当内部执行成功时才调用 done <- struct{}{}
	r := func(ctx context.Context) error {
		err := run(ctx)
		if err != nil {
			return err
		}

		done <- struct{}{}
		return nil
	}

	// 对 fallback 函数进行二次包装,只有当内部 fallback 执行成功时才调用 done <- struct{}{}
	f := func(ctx context.Context, e error) error {
		err := fallback(ctx, e)
		if err != nil {
			return err
		}

		done <- struct{}{}
		return nil
	}

	// 最后都是统一调用 GoC 函数
	var errChan chan error
	if fallback == nil {
		errChan = GoC(ctx, name, r, nil)
	} else {
		errChan = GoC(ctx, name, r, f)
	}

	// select 阻塞等待接收 chan 信息,成功或失败哪个 chan 信息先到达就返回哪个
	select {
	case <-done:
		return nil
	case err := <-errChan:
		return err
	}
}

可以看到,DoC 跟我们上面的示例代码差不多,帮我们处理异步逻辑,里面调用了 GoC 方法,并最终使用 select 阻塞等待完成或者失败消息。

hystrix.Go 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func Go(name string, run runFunc, fallback fallbackFunc) chan error {
	// 包装正常的执行函数
	runC := func(ctx context.Context) error {
		return run()
	}

	// 当存在降级函数时,包装该函数
	var fallbackC fallbackFuncC
	if fallback != nil {
		fallbackC = func(ctx context.Context, err error) error {
			return fallback(err)
		}
	}

	// 本地统一调用 GoC
	return GoC(context.Background(), name, runC, fallbackC)
}

Go 方法最终也是调用 GoC 方法,然后返回 chan error 类型让调用方也处理异步接收逻辑。

所以接下来,我们重点看 hystrix.GoC 方法的代码即可,这才是核心逻辑。

hystrix.GoC 方法

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// GoC 通过跟踪先前的调用健康状态来运行 run 函数
// 如果你的函数开始变慢或反复的失败,我们将会阻塞对该函数新的调用,以便给依赖的服务时间进行修复
// 如果想在熔断期间执行降级操作,需要定义一个 fallback 函数
// 返回值为 chan error 类型,需要调用端自行接受
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
	// 对于每次调用新建 command 操作命令
	cmd := &command{
		run:      run,
		fallback: fallback,
		start:    time.Now(),
		errChan:  make(chan error, 1),
		finished: make(chan bool, 1),
    }

	// 根据 command name 参数获取熔断器,当获取失败时返回错误
	circuit, _, err := GetCircuit(name)
	if err != nil {
		cmd.errChan <- err
		return cmd.errChan
	}

	// 断路器赋值
	cmd.circuit = circuit

	ticketCond := sync.NewCond(cmd)

	// 标示是否执行了获取令牌操作,不管获取失败与否
	ticketChecked := false

	// 当调用者从返回的 errChan 中获取到错误,那么假定 ticket 已返回给 executorPool。
	// 因此,在 cmd.errorWithFallback() 之后不能运行 returnTicket()。
	//
	// returnTicket 为归还令牌函数
	returnTicket := func() {
		cmd.Lock()
		// 阻塞等待,避免在获取到令牌之前就执行归还操作
		for !ticketChecked {
			ticketCond.Wait()
		}
		// 归还令牌
		cmd.circuit.executorPool.Return(cmd.ticket)
		cmd.Unlock()
	}

	// returnOnce 在下面的两个 goroutine 之间共享,用来确保只有执行的快的 goroutine 才能运行 errWithFallback() 和 reportAllEvent() 函数。
	returnOnce := &sync.Once{}

	// 打包上报当前 command 所有事件
	reportAllEvent := func() {
		err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
		if err != nil {
			log.Printf(err.Error())
		}
	}

	// 创建协程用于检查断路器状态、获取令牌、上报事件、执行用户函数
	go func() {
		// 执行成功后,写入 finish channel
		defer func() { cmd.finished <- true }()

		// 当最近的执行出现很高的错误率时,断路器就会打开,拒绝新的执行来使后端得以恢复,并且断路器在感觉健康状态已恢复时将允许新的请求。

		// 当断路器不允许新的请求时(此时断路器已经打开)
		if !cmd.circuit.AllowRequest() {
			cmd.Lock()
			// 当另一个 goroutine 提前执行释放令牌时依然安全
			ticketChecked = true

			// 唤醒一个正在等待的协程
			ticketCond.Signal()
			cmd.Unlock()
			returnOnce.Do(func() {
				returnTicket()                             // 归还令牌
				cmd.errorWithFallback(ctx, ErrCircuitOpen) // 上报当前错误,并尝试执行 fallback
				reportAllEvent()                           // 上报事件
			})
			return
		}

		// 加锁获取令牌
		cmd.Lock()
		select {
		case cmd.ticket = <-circuit.executorPool.Tickets: // 获取令牌成功
			ticketChecked = true
			ticketCond.Signal()
			cmd.Unlock()
		default: // 获取令牌失败(当超过 command 允许的最大并发量时)
			ticketChecked = true
			ticketCond.Signal()
			cmd.Unlock()
			returnOnce.Do(func() {
				returnTicket()
				cmd.errorWithFallback(ctx, ErrMaxConcurrency)
				reportAllEvent()
			})
			return
		}

		runStart := time.Now()

		// 真正执行用户正常的 run 函数
		runErr := run(ctx)

		returnOnce.Do(func() {
			defer reportAllEvent()                 // 上报保存的所有事件
			cmd.runDuration = time.Since(runStart) // 计算耗时
			returnTicket()                         // 归还令牌
			if runErr != nil {                     // 检查是否错误
				cmd.errorWithFallback(ctx, runErr) // 保存错误事件,并尝试执行 fallback
				return
			}
			cmd.reportEvent("success") // 保存执行成功事件
		})
	}()

	// 创建协程用于检查当前操作是否超时
	go func() {
		// 根据全局超时事件创建定时器
		timer := time.NewTimer(getSettings(name).Timeout)
		defer timer.Stop()

		select {
		case <-cmd.finished: // 检查是否已经完成
			// returnOnce 已经在另外的协程运行,这里不再执行
		case <-ctx.Done(): // 检查 context 自定义超时
			returnOnce.Do(func() {
				returnTicket()
				cmd.errorWithFallback(ctx, ctx.Err())
				reportAllEvent()
			})
			return
		case <-timer.C: // 检查 command 初始化定义超时
			returnOnce.Do(func() {
				returnTicket()
				cmd.errorWithFallback(ctx, ErrTimeout)
				reportAllEvent()
			})
			return
		}
	}()

	return cmd.errChan
}

GoC 方法代码还是比较长的,这里做了相应的注释,方便阅读。

GoC 对于每次调用都会创建一个 command 对象,用于保存当前调用的相关信息,看下该结构体:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type command struct {
	sync.Mutex

	ticket      *struct{}       // 保存获取到的令牌
	start       time.Time       // 调用开始时间
	errChan     chan error      // 错误信息 channel
	finished    chan bool       // 完成信息 channel
	circuit     *CircuitBreaker // 断路器
	run         runFuncC        // 封装正常运行的方法
	fallback    fallbackFuncC   // 封装降级运行的方法
	runDuration time.Duration   // 调用耗时
	events      []string        // 保存当前调用的所有事件,方便最终统一上报
}

command 对象提供了一些方法,这里先看下定义,具体源码实现我们放到文章底部,这里不影响主流程阅读:

1
2
3
4
5
6
// reportEvent 对于产生的事件先临时放入 event 字段,后面会统一调用 reportAllEvent 上报所有事件
func (c *command) reportEvent(eventType string)
// errorWithFallback 保存相关事件和执行 fallback
func (c *command) errorWithFallback(ctx context.Context, err error) 
// tryFallback 尝试执行 fallback,并保存执行结果事件
func (c *command) tryFallback(ctx context.Context, err error) error

回到 GoC 执行流程,接下来就是通过 GetCircuit(name) 获取断路器对象,该方法会在断路器不存在时新建一个,并赋值给 cmd.circuit。当获取断路器失败时会直接将错误信息写入 cmd.errChan 并直接停止执行后续逻辑。

接下来定义 returnTicket 闭包函数用于归还令牌:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// returnTicket 为归还令牌函数
returnTicket := func() {
    cmd.Lock()
    // 阻塞等待,避免在获取到令牌之前就执行归还操作
    for !ticketChecked {
        ticketCond.Wait()
    }
    // 归还令牌
    cmd.circuit.executorPool.Return(cmd.ticket)
    cmd.Unlock()
}

真正执行归还令牌的地方是 cmd.circuit.executorPool.Return(cmd.ticket),该方法定义在 CircuitBreaker 结构体里面,我们以后文章再分析,这里只聚焦主流程。

接下来是定义 reportAllEvent 真正上报所有事件的闭包函数:

1
2
3
4
5
6
7
// 统一上报当前 command 所有事件
reportAllEvent := func() {
    err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
    if err != nil {
        log.Printf(err.Error())
    }
}

reportAllEvent 内部也是调用了 CircuitBreaker 的 ReportEvent 方法,这里依然不去看其实现细节,不影响主流程源码阅读。

接下来定义了两个异步协程,我们分别来看下。

协程 1:用于检查断路器状态、获取令牌、执行用户正常逻辑、上报事件等操作

首先,通过 cmd.circuit.AllowRequest() 方法通过计算当前的错误率来判断是否允许调用执行,如果不允许(即发生熔断)则会执行归还令牌、执行 fallback 降级操作、上报所有事件,然后直接返回。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
if !cmd.circuit.AllowRequest() {
    cmd.Lock()
    // 当另一个 goroutine 提前执行释放令牌时依然安全
    ticketChecked = true

    // 唤醒一个正在等待的协程
    ticketCond.Signal()
    cmd.Unlock()
    returnOnce.Do(func() {
        returnTicket()                             // 归还令牌
        cmd.errorWithFallback(ctx, ErrCircuitOpen) // 上报当前错误,并尝试执行 fallback
        reportAllEvent()                           // 上报事件
    })
    return
}

然后,加锁获取令牌,一旦获取失败说明达到了支持的最大并发量,会触发限流,则会执行归还令牌、执行 fallback 降级操作、上报所有事件,然后直接返回。

只有成功获取令牌后,才能继续往下执行真正的用户代码:

1
2
// 真正执行用户正常的 run 函数
runErr := run(ctx)

如果 run(ctx) 执行失败,依然会执行归还令牌、执行 fallback 降级操作、上报所有事件,然后直接返回流程。

只有 run(ctx) 运行成功,才会触发上报 success 事件和写入 cmd.finished <- true

1
2
3
cmd.reportEvent("success") // 保存执行成功事件
// 执行成功后,写入 finish channel
defer func() { cmd.finished <- true }()
协程 2:用于检查当前调用是否超时
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 创建协程用于检查当前操作是否超时
go func() {
    // 根据全局超时事件创建定时器
    timer := time.NewTimer(getSettings(name).Timeout)
    defer timer.Stop()

    select {
    case <-cmd.finished: // 检查是否已经完成
        // returnOnce 已经在另外的协程运行,这里不再执行
    case <-ctx.Done(): // 检查 context 自定义超时
        returnOnce.Do(func() {
            returnTicket()
            cmd.errorWithFallback(ctx, ctx.Err())
            reportAllEvent()
        })
        return
    case <-timer.C: // 检查 command 初始化定义超时
        returnOnce.Do(func() {
            returnTicket()
            cmd.errorWithFallback(ctx, ErrTimeout)
            reportAllEvent()
        })
        return
    }
}()

该协程源码比较简单,主要用于判断当前调用是否发生全局 hystrix 超时和用户自定义 context 超时。

command 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// reportEvent 对于产生的事件先临时放入 event 字段,后面会统一调用 reportAllEvent 上报所有事件
func (c *command) reportEvent(eventType string) {
	c.Lock()
	defer c.Unlock()

	c.events = append(c.events, eventType)
}

// errorWithFallback 保存相关事件和执行 fallback
func (c *command) errorWithFallback(ctx context.Context, err error) {
	eventType := "failure"
	if err == ErrCircuitOpen {
		eventType = "short-circuit"
	} else if err == ErrMaxConcurrency {
		eventType = "rejected"
	} else if err == ErrTimeout {
		eventType = "timeout"
	} else if err == context.Canceled {
		eventType = "context_canceled"
	} else if err == context.DeadlineExceeded {
		eventType = "context_deadline_exceeded"
	}

	// 保存事件
	c.reportEvent(eventType)

	// 尝试执行 fallback,并上报 fallback 执行结果事件
	fallbackErr := c.tryFallback(ctx, err)

	// fallback 执行失败时,将错误信息写入 error channel
	if fallbackErr != nil {
		c.errChan <- fallbackErr
	}
}

// tryFallback 尝试执行 fallback,并保存执行结果事件
func (c *command) tryFallback(ctx context.Context, err error) error {
	// 如果未定义 fallback 直接返回原始的错误
	if c.fallback == nil {
		return err
	}

	// 执行 fallback
	fallbackErr := c.fallback(ctx, err)
	// fallback 执行失败时保存错误事件
	if fallbackErr != nil {
		c.reportEvent("fallback-failure")
		return fmt.Errorf("fallback failed with '%v'. run error was '%v'", fallbackErr, err)
	}

	// 保存 fallback 执行成功事件
	c.reportEvent("fallback-success")

	return nil
}

存在问题

阅读 hystrix-go 源码过程中,也发现了一些设计不合理的地方:

  • 没有对 go 协程进行 panic recover 捕获,一旦发生 panic 服务就会挂掉
  • 没有复用内存对象,比如 command 对象对于每次调用都会执行新申请、GC 回收的过程
  • 没有缓存 time 对象和时间戳信息,由于内部大量用到 time 信息,添加本地缓存性能会更好
  • 内存占用过高,对于统计控制器内的一些统计信息可以做下针对性优化

相比起来,sentinel-go 库对上面提到的问题都有针对性的优化方案,可以参考学习下。

小结

本篇文章主要介绍了 hystrix-go 如何使用,以及阅读了内部实现的主流程,最后总结了当前实现存在的问题,对于主流程依赖的旁支组件我们在后续文章中单独分析。

参考

https://github.com/afex/hystrix-go