限流器是微服务中不可缺少的组件,起着保护下游服务负载过高、保证服务稳定性的作用。

什么是限流器

Web servers typically use a central in-memory key-value database, like Redis or Aerospike, for session management. A rate limiting algorithm is used to check if the user session (or IP address) has to be limited based on the information in the session cache.

In case a client made too many requests within a given time frame, HTTP servers can respond with status code 429: Too Many Requests

这段话是摘自维基百科. 简单来说限流器是基于 KV 内存数据库的一个限速判断, 在给定的时间内, 客户端请求次数过多, 服务器就会返回状态码 429: Too Many Request。

限流器的实现方式有很多,常见的有以下几种:

  • 计数器
  • 漏桶 (Leaky Bucket)
  • 令牌桶 (Token Bucket)
  • 基于 BBR 算法的自适应限流
  • 基于 Nginx 的限流
  • 分布式限流

本篇文章会介绍基于漏桶的限流器实现。

其实从字面就很好理解. 类似生活用到的漏桶, 当客户端请求进来时,相当于水倒入漏桶,然后从下端小口慢慢匀速的流出。不管上面流量多大,下面流出的速度始终保持不变.

当水流入速度过大时, 漏桶就会溢出, 同样会造成服务拒绝. 相对于计数器的在恢复期内全部拒绝请求, 因为漏桶会以一定的速率消费请求, 这样就能够让后续的请求有机会进入到漏桶里面.

uber 开源了一套 go 语言限流组件 ratelimit ,是基于漏桶实现的,下面我们总结下该组件的使用方式和源码分析。

如何使用

看下官方给出的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

func main() {
    rl := ratelimit.New(100) // per second

    prev := time.Now()
    for i := 0; i < 10; i++ {
        now := rl.Take()
        fmt.Println(i, now.Sub(prev))
        prev = now
    }

    // Output:
    // 0 0
    // 1 10ms
    // 2 10ms
    // 3 10ms
    // 4 10ms
    // 5 10ms
    // 6 10ms
    // 7 10ms
    // 8 10ms
    // 9 10ms
}

首先,通过 ratelimit.New 创建一个限流器对象,参数 100 表示每秒允许通过的请求数,也就是每个请求的间隔为 10ms。然后依次遍历调用 Take 方法,可以看出请求间隔确实为 10ms,没问题。

New 方法

1
func New(rate int, opts ...Option) Limiter

该方法返回一个限流器对象。

第一个参数 rate 为请求限制数,默认窗口为 1 秒,会通过 perRequest: config.per / time.Duration(rate)换算成单个请求间隔。

第二个参数为可选参数,提供了几个选项:

1
2
3
WithoutSlack	// 不使用松弛量,可以理解为不容忍任何突发请求
WithClock	// 使用自定义的 Clock
Per			// Per 自定义时间窗口,默认的时间窗口为 1 秒

Take 方法

1
Take() time.Time	

Take 方法会阻塞请求,直到满足限速要求(RPS)才返回成功。真实限流中,我们也是主要通过调用 Take 方法来达到限流目的的。

实现原理

Limiter 接口

1
2
3
4
type Limiter interface {
	// Take should block to make sure that the RPS is met.
	Take() time.Time
}

Limiter 接口即为限流器的实现接口,只包含了一个方法Take,凡是实现了该方法的对象都可以作为限流器来使用。

该库包含了两个实现:

1
2
type unlimited struct{}		// 该实现不作任何限流
type atomicLimiter struct {}	// 实现了限流

下面主要研究下 atomicLimiter 是如何实现限流的。

config

1
2
3
4
5
type config struct {
	clock    Clock         // Clock 接口
	maxSlack time.Duration // 最大松弛量,默认为 10
	per      time.Duration // 限速时间窗口,默认是 1 秒
}

atomicLimiter

1
2
3
4
5
6
7
8
type atomicLimiter struct {
	state   unsafe.Pointer // 记录当前的限速状态,原子操作
	padding [56]byte       // padding 用于填充 CPU 缓存行,防止伪共享缓存

	perRequest time.Duration // 每个请求的间隔
	maxSlack   time.Duration // 每个请求的最大松弛量
	clock      Clock         // Clock 接口
}

这里容易迷惑大家的是 padding 字段,主要是用于填充 CPU 缓存行,防止伪共享缓存的。不了解的朋友可以参考这篇文章 CPU缓存体系对Go程序的影响

newAtomicBased

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// newAtomicBased 返回一个基于原子操作的限速器
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
	config := buildConfig(opts)
	l := &atomicLimiter{
		perRequest: config.per / time.Duration(rate),
		maxSlack:   -1 * config.maxSlack * time.Second / time.Duration(rate),
		clock:      config.clock,
	}

	// 初始化状态
	initialState := state{
		last:     time.Time{},
		sleepFor: 0,
	}
  
  // 原子操作
	atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
	return l
}

这里我们看出,newAtomicBased 的主要工作是初始化配置、计算请求间隔(perRequest)、最大松弛量(maxSlack)、保存初始化状态,还是比较简单的。

Take 方法实现

先看下注释版的方法源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Take 使用阻塞来保证多次 Take 调用的平均时间达到给定的 RPS
func (t *atomicLimiter) Take() time.Time {
  var (
		newState state
		taken    bool
		interval time.Duration
	)
  
	for !taken {
		now := t.clock.Now()

    // 原子加载
		previousStatePointer := atomic.LoadPointer(&t.state)
		oldState := (*state)(previousStatePointer)

		newState = state{
			last:     now,			    // 保存当前时间
			sleepFor: oldState.sleepFor,// 保存上一次剩余的时间 
		}

		// 如果是首次调用,直接放行
		if oldState.last.IsZero() {
			taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
			continue
		}

		// sleepFor 通过 perRequest 和上次请求花费的时间来计算应该 sleep 多长时间
		// 由于请求的间隔可能会很长,导致 sleepFor 可能为负数,所以需要在不同的请求之间累加,用于后续请求抵消
		newState.sleepFor += t.perRequest - now.Sub(oldState.last)

		// 我们不应该让 sleepFor 负数值变得太小
		// 因为这意味着在短时间内放慢很多速度的服务将在此之后获得更高的RPS。
		if newState.sleepFor < t.maxSlack {
			newState.sleepFor = t.maxSlack
		}

		// 如果 sleepFor > 0 说明无法抵消之前请求的时间,需要休眠一段时间
		if newState.sleepFor > 0 {
			newState.last = newState.last.Add(newState.sleepFor)
            interval, newState.sleepFor = newState.sleepFor, 0
		}

		// 通过 for + cas 实现无锁化编程(lock free)
		taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
	}

	// sleep
	t.clock.Sleep(interval)
	return newState.last
}

Take 方法通过使用 for 循环 + cas 来实现无锁化编程(lock free),进而提升性能,这也是 go 标准库源码中常用的手段,可以学习下。

最大松弛量

我们讲到,传统的漏桶算法,每个请求的间隔是固定的。然而,在实际上的互联网应用中,流量经常是突发性的。对于这种情况,uber-go 对 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。

请求 1 完成后,15ms 后,请求 2 才到来,可以对请求 2 立即处理。请求 2 完成后,5ms 后,请求 3 到来,这个时候距离上次请求还不足 10ms,因此还需要等待 5ms。

但是,对于这种情况,实际上三个请求一共消耗了 25ms 才完成,并不是预期的 20ms。在 uber-go 实现的 ratelimit 中,可以把之前间隔比较长的请求的时间,匀给后面的使用,保证每秒请求数 (RPS) 即可。

对于以上 case,因为请求 2 相当于多等了 5ms,我们可以把这 5ms 移给请求 3 使用。加上请求 3 本身就是 5ms 之后过来的,一共刚好 10ms,所以请求 3 无需等待,直接可以处理。此时三个请求也恰好一共是 20ms。

如下图:

在 ratelimit 的对应实现中很简单,是把每个请求多余出来的等待时间累加起来,以给后面的抵消使用。

1
2
3
4
5
6
7
8
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
if newState.sleepFor < t.maxSlack {
  newState.sleepFor = t.maxSlack
}
if newState.sleepFor > 0 {
  newState.last = newState.last.Add(newState.sleepFor)
  interval, newState.sleepFor = newState.sleepFor, 0
}

注意:这里跟上述代码不同的是,这里是 +=。而同时 t.perRequest - now.Sub(t.last) 是可能为负值的,负值代表请求间隔时间比预期的长。

  • t.sleepFor > 0,代表此前的请求多余出来的时间,无法完全抵消此次的所需量,因此需要 sleep 相应时间, 同时将 t.sleepFor 置为 0。

  • t.sleepFor < 0,说明此次请求间隔大于预期间隔,将多出来的时间累加到 t.sleepFor 即可。

但是,对于某种情况,请求 1 完成后,请求 2 过了很久到达 (好几个小时都有可能),那么此时对于请求 2 的请求间隔 now.Sub(t.last),会非常大。以至于即使后面大量请求瞬时到达,也无法抵消完这个时间。那这样就失去了限流的意义。

为了防止这种情况,ratelimit 就引入了最大松弛量 (maxSlack) 的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现。

1
2
3
if t.sleepFor < t.maxSlack {
  t.sleepFor = t.maxSlack
}

ratelimit 中 maxSlack 的值为 -10 * time.Second / time.Duration(rate), 是十个请求的间隔大小。我们也可以理解为 ratelimit 允许的最大瞬时请求为 10。

小结

本篇主要介绍了基于漏桶算法实现的限流组件 uber-go/ratelimit 的使用和源码实现。

整体看下来,还是比较简单的,主要的知识点有:

  • 结构体中填充 CPU 缓存行,防止伪共享缓存(false sharing)
  • for + cas 实现无锁化编程(lock free)
  • 引入最大松弛量,用于后续请求的抵消操作

参考

限流器系列(1) – Leaky Bucket 漏斗桶

分布式服务限流实战,已经为你排好坑了

uber-go 漏桶限流器使用与原理分析

thanks