在一些需要大量访问的服务中,访问量过大,处理方式无非是,① 提升后台性能(包括提升硬件性能,优化代码或者分布式架构这些的);② 限速,(在一顿时间内,限制访问者的成功率,例如某接口访问限制为1000次/s)。

在实现限速功能的时候,其中一种常用的方法是使用 token bucket 算法来实现。

我之前写过一篇比较简单的,服务器限流问题

前言

简单的描述限流就是对并发请求进行限制或对一个时间窗内的请求进行限速来保护系统,当请求到达限制时将会被拒绝或降级。

其实不管我们过去是否关注过限流,几乎都和限流打过交道,例如我们在开发中常用的各种资源池技术(线程池、数据库连接池、对象池)本质上都是限制总并发数,实际上就是通过计数器算法的限流。

维基百科的描述:

1
2
3
4
5
- A token is added to the bucket every 1/r seconds. 
- The bucket can hold at the most b tokens. If a token arrives when the bucket is full, it is discarded.
- When a packet (network layer PDU) of n bytes arrives,
- if at least n tokens are in the bucket, n tokens are removed from the bucket, and the packet is sent to the network.
- if fewer than n tokens are available, no tokens are removed from the bucket, and the packet is considered to be non-conformant.
  • 每隔 1/r 秒向 bucket 中增加一个 token。
  • 这个 bucket 最多只能存放 b 个 token。如果放置 token 时 bucket 已经满了,丢弃这个 token。
  • 当一个包含 n 个字节的数据包进来的时候,
    • 如果 bucket 中有 >= n 个 token,将从 bucket 中移除 n 个 token,然后把这个数据包发送出去。
    • 如果可用的 token < n,此时不会从 bucket 中移除任何的 token,但是这个数据包会被认为是被限制的数据包。

常见的实现方法

一般有两种实现方法。

一种是按照 token bucket 的说明,真的做放 token 的操作:

  • 后台有个线程每 1/n 秒将 bucket 中的 token 数量加一,直至达到 bucket 容量。
  • 主线程检查限速时,比较 bucket 中 token 的数量,如果少于需要的数量,表示当前被限制。 (比如,一个请求进来,检查 bucket 中的 token 数量是否 > 1,如果 > 1请求放行同时把 token 数量减 1, 如果 < 1 说明当前请求已超出速率限制,请求被拒绝。)

这种方法有一个很大的缺点,那就是因为每个 token bucket 都会有一个繁忙的后台线程在更新 token 数量,会 导致严重占用系统 CPU 出现严重的性能问题。假设我们的限速是限制为 1000/s,此时后台每隔 1ms 就会更新一次 token 数量,可以想像每个后台线程都会频繁占用 CPU,用这种方法实现的 rate limiting 处理不了几个请求就会出现 CPU 接近 100% 的情况。所以实践中一般用另一种方法来实现 token bucket。

另一种是在取 token 时计算上次取跟这次取之间按照速率会产生多少个 token 加上上次剩余的 token (不能超过 bucket 容量限制),然后比较剩余 token 数是否满足需要。

第一种算法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
type Bucket struct {
max int
ch chan struct{}
timer *time.Ticker
}

func NewBucket(n int, sec time.Duration) *Bucket {
b := &Bucket{
max: n,
ch: make(chan struct{}, n),
timer: time.NewTicker(sec),
}

go b.ticker()
return b
}

func (b *Bucket) Get() bool {
select {
case <-b.ch:
return true
default:
return false
}
}

func (b *Bucket) ticker() {
for i := 0; i < b.max; i++ {
b.ch <- struct{}{}
}
for {
select {
case <-b.timer.C:
fmt.Println("len: ", len(b.ch))
for i := len(b.ch); i < b.max; i++ {
b.ch <- struct{}{}
}
}
}
}

func main() {
bucket := NewBucket(10, 3*time.Second)

for i := 0; i < 6; i++ {
go func(b *Bucket, id int) {
for {
if b.Get() {
fmt.Println("ok: ", id)
} else {
fmt.Println("no: ", id)
}
time.Sleep(2 * time.Second)
}
}(bucket, i)
}

select {}
}

第二种算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 令牌桶限
type TokenBucket struct {
lastFillTime time.Time // 上一次填充时间
tokenPerNano float64 // 每纳秒填充的令牌个数
tokenCount float64 // 剩余令牌数量
capacity float64 // 桶容量
mutex sync.Mutex // 线程安全
}

// 创建限制速率的令牌桶
func CreateBucketForRate(rate float64) *TokenBucket {
r := TokenBucket{}
r.tokenPerNano = rate / 1e9
r.lastFillTime = time.Now()
r.tokenCount = rate
r.capacity = rate
return &r
}

// 阻塞版本
func (bucket *TokenBucket) getToken(count int) {
needWait, waitTime := bucket.getTokenWithoutBlock(count)
if needWait {
time.Sleep(waitTime)
}
}

// 非阻塞版本
func (bucket *TokenBucket) getTokenWithoutBlock(count int) (needWait bool, waitTime time.Duration) {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()

// 填充令牌
bucket.tryFillBucket()

// 令牌足够
if bucket.tokenCount >= float64(count) {
bucket.tokenCount -= float64(count)
return false, 0
}

// 需要等待的时间(浮点数不精确,取绝对值)
waitTime = time.Duration(math.Abs((float64(count) - bucket.tokenCount) / bucket.tokenPerNano))
// 更新令牌数量(先到先得)
bucket.tokenCount -= float64(count)
return true, waitTime
}

// 补充令牌
// 取 token 时计算上次取跟这次取之间按照速率会产生多少个 token
// 加上上次剩余的 token (不能超过 bucket 容量限制),
// 然后比较剩余 token 数是否满足取的需要
func (bucket *TokenBucket) tryFillBucket() {
now := time.Now()
passedTime := now.Sub(bucket.lastFillTime)

bucket.tokenCount += float64(passedTime.Nanoseconds()) * bucket.tokenPerNano
if bucket.tokenCount > bucket.capacity {
bucket.tokenCount = bucket.capacity
}

bucket.lastFillTime = now
}

此代码来自于:bigpipe

若是要修改成web服务器使用呢,则可以将其包装成一个中间服务(这里使用gin):

1
2
3
4
5
6
7
8
9
10
11
12
13
func (bucket *TokenBucket) Middleware() gin.HandlerFunc {
return func(ctx *gin.Context) {
needWait, _ := bucket.getTokenWithoutBlock(1)
if needWait {
err := errors.New("Too many requests")
ctx.AbortWithError(429, err)
} else {
ctx.Writer.Header().Set("X-RateLimit-Remaining", fmt.Sprintf("%f", bucket.tokenCount))
ctx.Writer.Header().Set("X-RateLimit-Limit", fmt.Sprintf("%f", bucket.capacity))
ctx.Next()
}
}
}

特点

通过 token bucket 的介绍以及对具体实现的了解,可以发现 token bucket 有以下特点:

  • 当 bucket 满的时候,将不再放入 token,即 token 数不能超过 bucket 容量限制。
  • 因为可以一次性从 bucket 拿出大量的 token 所以 token bucket 允许突发的峰值, 即,限速不是绝对的,而是允许存在尖峰/波峰。

参考附录

令牌桶(Token Bucket)

rate limiting 之 token bucket