上一篇 我们介绍了基于漏桶算法的限流器 - uber-go/ratelimit,为了应对突发流量,它做了最大松弛量的改良。本篇文章继续介绍另外一种限流器:令牌桶(Token Bucket)

什么是令牌桶

漏桶的桶空间就那么大, 其只能保证桶里的请求是匀速流出, 并不关心流入的速度, 只要没取到令牌就服务拒绝, 这可能并不符合互联网行业突发流量的使用场景。

令牌桶则是指匀速向桶中添加令牌,服务请求时需要从桶中获取令牌,令牌的数目可以按照需要消耗的资源进行相应的调整。如果没有令牌,可以选择等待,或者放弃。

令牌通面对突然爆发的流量, 可能大部分流量都被限流器给挡住了, 但是也有部分流量刚好拿到了刚刚生成的 Token, 就这样在千军万马中通过了限流器. 相对于漏桶来说, 令牌桶更适合现在互联网行业的需要, 是被广泛使用的限流算法。

令牌桶和漏桶的区别:

  • 令牌桶控制的是放入令牌的速度,漏桶控制的是流出的速度。
  • 令牌桶允许突发流量,漏桶无法很好应对突发流量。

juju/ratelimit

juju/ratelimit 是基于令牌桶实现的 Go 限流开源方案,下面会结合其用法, 剖析其实现原理。

如何使用

先看一个简单的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
lim := NewBucketWithQuantum(time.Second, 5, 5)

for i := 0; i < 10; i++ {
    fmt.Println(i, lim.TakeAvailable(1))
}

// output:
// 0 1
// 1 1
// 2 1
// 3 1
// 4 1
// 5 0
// 6 0
// 7 0
// 8 0
// 9 0

首先通过 NewBucketWithQuantum 初始化限流对象,参数表示每秒最多允许 5 个请求,然后依次请求 lim.TakeAvailable(1)) 获取一个令牌。可以看出,前五次请求是可以拿到令牌的,后五次请求拿不到令牌就会被拒绝。

初始化方法

该组件提供的初始化方法主要以下三类:

1
2
3
4
5
6
// 默认的初始化函数, 每一个周期内生成 1 个令牌, 默认 quantum = 1
NewBucket(fillInterval time.Duration, capacity int64):
// 跟 NewBucket 类似, 每个周期内生成 quantum 个令牌
NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) 
// 每秒中产生 rate 速率的令牌,fillInterval 是计算出来的
NewBucketWithRate(rate float64, capacity int64)

以上三个初始化方法分别调用的是:

1
2
3
NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket
NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket 
NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket 

以上所有初始化方法最终都是调用的这个方法:

1
NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket

*Bucket 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 返回可用的令牌数,当有消费者在等待令牌时,将为负
func (tb *Bucket) Available() int64
// 返回令牌桶的容量
func (tb *Bucket) Capacity() int64
// 返回每秒放入令牌桶的速率
func (tb *Bucket) Rate() float64
// 返回取出 count 令牌需要等待的时间,需要调用端进行 sleep 等待,注意 Take 方法不支持放回令牌
func (tb *Bucket) Take(count int64) time.Duration
// 返回可用的令牌数(不一定等于 count),如果没可用的令牌,将返回零,不会进行阻塞
func (tb *Bucket) TakeAvailable(count int64) int64
// 与 Take 类似,不同之处在于 TakeMaxDuration 仅在令牌的等待时间不大于 maxWait 时才从存储桶中提取令牌。
// 如果令牌变得比 maxWait 更长的时间才可用,则不执行任何操作并报告 false,
// 否则返回调用者应等待直到令牌实际可用的时间,并报告 true。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
// 阻塞等待直能获取 count 令牌
func (tb *Bucket) Wait(count int64)
// 与 Wait 类似,不同之处在于,WaitMaxDuration 仅在需要等待不超过 maxWait 的情况下才会从存储桶中获取令牌。
// 返回是否已从存储桶中删除了所有令牌。如果未删除任何令牌,则立即返回。
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool

大家可以根据自己的使用场景选择合适的方法,接下来看下该限流器的源码实现。

源码分析

Bucket

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type Bucket struct {
	clock Clock

	startTime time.Time         // Bucket 被创建时间
	capacity int64              // Bucket 令牌最大容量
	quantum int64               // 每次放入 quantum 个令牌
	fillInterval time.Duration  // 放入令牌的间隔

    mu sync.Mutex           // 互斥锁,保护以下变量
	availableTokens int64   // 当前可用的令牌数,可能为负
	latestTick int64        // 保存最新的 tick,也就是从创建 Bucket 到现在的时钟周期
}

这里我们可以看出,Bucket 内部使用了互斥锁来保证并发的安全性,所以在高并发场景下会有很大的性能损耗。上一篇 我们知道 uber-go/ratelimit 并没有使用锁,而是通过 for + cas 这种模式来保证无锁化编程(lock free),这样对比着分析不同的限流器实现可以更好的掌握其设计。

Take 方法

我们先从 Take 方法看起,其他几个获取令牌的方法实现大同小异。

1
2
3
4
5
6
7
// 返回取出 count 令牌需要等待的时间,需要调用端进行 sleep 等待,注意 Take 方法不支持放回令牌
func (tb *Bucket) Take(count int64) time.Duration {
	tb.mu.Lock()
	defer tb.mu.Unlock()
	d, _ := tb.take(tb.clock.Now(), count, infinityDuration)
	return d
}

可以看出 Take 方法内部实现,首先是加锁操作,保证并发安全。然后 Take 方法又调用了内部的 take 方法,第三个调用参数是 infinityDuration 表示无穷时间:

1
const infinityDuration time.Duration = 0x7fffffffffffffff

take 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// now 表示获取令牌的时间,也就是当前时间
// count 表示获取的令牌数量
// maxWait 表示等待的最长时间
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
	if count <= 0 {
		return 0, true
	}

    // 计算从 startTime 到 now 经历的 tick 时间间隔数
    tick := tb.currentTick(now)
    
    // 调整当前可用的令牌数
    // 超过令牌桶大小时,剩余的令牌数等于桶大小
    tb.adjustavailableTokens(tick)
    
    // 当 availableTokens >= count 时,直接返回 0, true
	avail := tb.availableTokens - count
	if avail >= 0 {
		tb.availableTokens = avail
		return 0, true
	}
    
    // 计算下一个放入令牌的整数倍周期,防止 int64(小于 1) 为零的情况
    endTick := tick + (-avail+tb.quantum-1)/tb.quantum
    
    // 计算下一个放入令牌的时间
    endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
    
    // 计算需要等待的时间
    waitTime := endTime.Sub(now)
    
    // 当最大等待时间 < waitTime 时,直接返回 0,false 表示无法获取到令牌数
	if waitTime > maxWait {
		return 0, false
	}
    tb.availableTokens = avail
    
    // 如果 waitTime != 0,调用方需要阻塞等待 waitTime 的时间
	return waitTime, true
}

currentTick

1
2
3
4
5
6
func (tb *Bucket) currentTick(now time.Time) int64 {
    // 由于 tb.fillInterval 表示放入令牌的时间间隔
    // 所以这里计算的是从开始到 now 时间经历了多少个放令牌的间隔数
    // 即 tick * tb.quantum 等于当前的令牌总数
	return int64(now.Sub(tb.startTime) / tb.fillInterval)
}

adjustavailableTokens

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (tb *Bucket) adjustavailableTokens(tick int64) {
	lastTick := tb.latestTick
    tb.latestTick = tick
    // 当可用的令牌数已达到桶的容量,直接返回
	if tb.availableTokens >= tb.capacity {
		return
    }
    
    // (tick - lastTick) * tb.quantum 表示从上次请求到本次请求所产生的的令牌数
    // tb.availableTokens += (tick - lastTick) * tb.quantum 表示桶内剩余的token数量 + 新产生的token数量
    tb.availableTokens += (tick - lastTick) * tb.quantum
    
    // 如果产生的令牌数量超过了桶的容量,只保留桶大小数量的令牌即可
	if tb.availableTokens > tb.capacity {
		tb.availableTokens = tb.capacity
	}
	return
}

通过源码我们可以看到:

  • jujue/ratelimit 计算出请求间隔中应该产生的token的数量, 并没有另启一个 goroutine 专门定时生产令牌
  • 桶内令牌在产生过程中是累加的, 同时减去每次调用消耗的数量
  • 初始化后桶内的令牌数量就是桶的大小

令牌桶缺陷

令牌桶算法能满足绝大部分服务器限流的需要, 是被广泛使用的限流算法, 不过其也有一些缺点:

  • 令牌桶是没有优先级的,无法让重要的请求先通过
  • 如果对服务限流进行缩容和扩容,需要人为手动去修改,运维成本比较大
  • 令牌桶只能对局部服务端的限流, 无法掌控全局资源

这时候就需要考虑系统自适应限流了,github.com/alibaba/sentinel-golang 实现了自适应限流功能,我们下次再对其实现原理进行分析。

小结

本篇文章首先介绍了什么是令牌桶限流器,接着对 juju/ratelimit 核心功能进行了源码分析,最后指出令牌桶限流器存在的缺陷,同时引出系统自适应限流的话题,我们留到下次再进行分析,感兴趣的朋友可以自行查阅相关资料阅读。

参考

限流器系列(2) – Token Bucket 令牌桶