前面我们详细分析了 client-go 中的延迟队列的实现,接下来就是限速队列的实现,限速队列在我们日常应用中非常广泛,其原理也比较简单,利用延迟队列的特性,延迟某个元素的插入时间来达到限速的目的。
所以限速队列是扩展的延迟队列,在其基础上增加了 AddRateLimited
、Forget
、NumRequeues
3个方法。限速队列接口的定义如下所示:
// k8s.io/client-go/util/workqueue/rate_limiting_queue.go
// RateLimitingInterface 是对加入队列的元素进行速率限制的接口
type RateLimitingInterface interface {
// 延时队列
DelayingInterface
// 在限速器说ok后,将元素item添加到工作队列中
AddRateLimited(item interface{})
// 丢弃指定的元素
Forget(item interface{})
// 查询元素放入队列的次数
NumRequeues(item interface{}) int
}
很明显我们可以看出来限速队列是在延时队列基础上进行的扩展,接下来我们查看下限速队列的实现结构:
代码语言:javascript复制// k8s.io/client-go/util/workqueue/rate_limiting_queue.go
// 限速队列的实现
type rateLimitingType struct {
// 同样集成了延迟队列
DelayingInterface
// 因为是限速队列,所以在里面定义一个限速器
rateLimiter RateLimiter
}
// 通过限速器获取延迟时间,然后加入到延时队列
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
// 直接通过限速器获取元素放入队列的次数
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
// 直接通过限速器丢弃指定的元素
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
我们可以看到限速队列的实现非常简单,就是通过包含的限速器去实现各种限速的功能,所以我们要真正去了解的是限速器的实现原理。
限速器
限速器当然也是一种接口抽象,我们可以实现各种各样的限速器,甚至不限速也可以,该接口定义如下所示:
代码语言:javascript复制// k8s.io/client-go/util/workqueue/default_rate_limiter.go
// 限速器接口
type RateLimiter interface {
// 获取指定的元素需要等待的时间
When(item interface{}) time.Duration
// 释放指定元素,表示该元素已经被处理了
Forget(item interface{})
// 返回某个对象被重新入队多少次,监控用
NumRequeues(item interface{}) int
}
可以看到和上面的限速队列的扩展接口方法非常类似,索引在实现的时候我们都是直接调用限速器对应的实现方法。接下来我们来看看几种限速器的具体实现。
BucketRateLimiter
第一个要了解的限速器使用非常频繁 - BucketRateLimiter(令牌桶限速器),这是一个固定速率(qps)的限速器,该限速器是利用 golang.org/x/time/rate
库来实现的,令牌桶算法内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过,而没有得到 token 的元素处于等待状态。令牌桶算法通过控制发放 token 来达到限速目的。
比如抽奖、抢优惠、投票、报名……等场景,在面对突然到来的上百倍流量峰值,除了消息队列,预留容量以外,我们可以考虑做峰值限流。因为对于大部分营销类活动,消息限流(对被限流的消息直接丢弃,并直接回复:“系统繁忙,请稍后再试。”)并不会对营销的结果有太大影响。
令牌桶是有一个固定大小的桶,系统会以恒定的速率向桶中放 Token,桶满了就暂时不放了,而用户则从桶中取 Token,如果有剩余的 Token 就可以一直取,如果没有剩余的 Token,则需要等到系统中放置了 Token 才行。
golang 中就自带了一个令牌桶限速器的实现,我们可以使用以下方法构造一个限速器对象:
代码语言:javascript复制limiter := NewLimiter(10, 1);
该构造函数包含两个参数:
- 第一个参数是
r Limit
。代表每秒可以向 Token 桶中产生多少 token,Limit 实际上是 float64 的别名。 - 第二个参数是
b int
。b 代表 Token 桶的容量大小。
上面我们构造出的限速器含义就是,其令牌桶大小为 1,以每秒 10 个 Token 的速率向桶中放置 Token。
除了直接指定每秒产生的 Token 个数外,还可以用 Every
方法来指定向 Token 桶中放置 Token 的间隔,例如:
limit := Every(100 * time.Millisecond)
limiter := NewLimiter(limit, 1)
以上就表示每 100ms 往桶中放一个 Token,本质上也就是一秒钟产生 10 个。
Limiter 提供了三类方法供用户消费 Token,用户可以每次消费一个 Token,也可以一次性消费多个 Token。而每种方法代表了当 Token 不足时,各自不同的对应手段。
Wait/WaitN
代码语言:javascript复制func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
Wait 实际上就是 WaitN(ctx,1)
。当使用 Wait 方法消费 Token 时,如果此时桶内 Token 不足时 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 Token 满足条件,当然如果充足则直接返回。我们可以看到,Wait 方法有一个 context 参数,我们可以设置 context 的 Deadline 或者 Timeout,来决定此次 Wait 的最长时间。
Allow/AllowN
代码语言:javascript复制func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
Allow 实际上就是 AllowN(time.Now(),1)
。AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之返回不消费 Token,false。通常对应这样的线上场景,如果请求速率过快,就直接丢到某些请求。
Reserve/ReserveN
代码语言:javascript复制func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
Reserve 相当于 ReserveN(time.Now(), 1)
。ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 Token 是否充足,都会返回一个 Reservation
对象指针。
你可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间,如果等待时间为 0,则说明不用等待。必须等到等待时间之后,才能进行接下来的工作。或者,如果不想等待,可以调用 Cancel() 方法,该方法会将 Token 归还。
动态调整速率
此外 Limiter 还支持调整速率和桶大小:
SetLimit(Limit)
改变放入 Token 的速率SetBurst(int)
改变 Token 桶大小
有了这两个方法,可以根据现有环境和条件,根据我们的需求,动态的改变 Token 桶大小和速率。
令牌桶限速器实现
了解了令牌桶如何使用后,接下来就可以直接查看下令牌桶限速器是如何实现的:
代码语言:javascript复制// k8s.io/client-go/util/workqueue/default_rate_limiter.go
// 令牌桶限速器,固定速率(qps)
type BucketRateLimiter struct {
// golang 自带的 Limiter
*rate.Limiter
}
var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
// 获取需要等待的时间(延迟),而且这个延迟是一个相对固定的周期
return r.Limiter.Reserve().Delay()
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
// 固定频率,不需要重试
return 0
}
func (r *BucketRateLimiter) Forget(item interface{}) {
// 不需要重试,因此也不需要忘记
}
令牌桶限速器里面直接包装一个令牌桶 Limiter 对象,直接通过 Limiter.Reserve().Delay()
方法就可以获取元素需要延迟的时间,再使用这个限速器的时候,默认初始化参数为:
// k8s.io/client-go/util/workqueue/default_rate_limiter.go
BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}
通过 rate.NewLimiter 实例化,传入 r 和 b 两个参数,r 表示每秒往“”桶中填充 token 的数量,b 表示令牌桶的大小,这里可以看到默认的参数为速率为10,即每秒放入10个 bucket,桶容量大小为100。
ItemExponentialFailureRateLimiter
ItemExponentialFailureRateLimiter
(指数增长限速器) 是比较常用的限速器,从字面意思解释是元素错误次数指数递增限速器,他会根据元素错误次数逐渐累加等待时间,具体实现如下:
// k8s.io/client-go/util/workqueue/default_rate_limiters.go
// 当对象处理失败的时候,其再次入队的等待时间 × 2,到 MaxDelay 为止,直到超过最大失败次数
type ItemExponentialFailureRateLimiter struct {
// 修改失败次数用到的锁
failuresLock sync.Mutex
// 记录每个元素错误次数
failures map[interface{}]int
// 元素延迟基数
baseDelay time.Duration
// 元素最大的延迟时间
maxDelay time.Duration
}
// 实现限速器的When接口
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 累加错误计数
exp := r.failures[item]
r.failures[item] = r.failures[item] 1
// 通过错误次数计算延迟时间,公式是2^i * baseDelay,按指数递增
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
// 最大延迟时间
return r.maxDelay
}
// 计算后的延迟值和最大延迟值之间取最小值
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
// 元素错误次数,直接从 failures 中取
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
// 直接从 failures 删除指定元素
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
从上面 When() 函数中的算法实现来看,该限速器是出现错误后不断尝试的过程,而且随着尝试次数的增加按照指数增加延迟时间。
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter
(快慢限速器)和 ItemExponentialFailureRateLimiter
很像,都是用于错误尝试的,但是 ItemFastSlowRateLimiter
的限速策略是尝试次数超过阈值用长延迟,否则用短延迟,不过该限速器很少使用。
// k8s.io/client-go/util/workqueue/default_rate_limiters.go
// 快慢限速器,先以 fastDelay 为周期进行尝试,超过 maxFastAttempts 次数后,按照 slowDelay 为周期进行尝试
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
// 错误次数计数
failures map[interface{}]int
// 错误尝试阈值
maxFastAttempts int
// 短延迟时间
fastDelay time.Duration
// 长延迟时间
slowDelay time.Duration
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 一样累加错误计数
r.failures[item] = r.failures[item] 1
// 错误次数还未超过阈值
if r.failures[item] <= r.maxFastAttempts {
// 用短延迟
return r.fastDelay
}
// 超过了用长延迟
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
MaxOfRateLimiter
MaxOfRateLimiter 也可以叫混合限速器,他内部有多个限速器,选择所有限速器中速度最慢(延迟最大)的一种方案。比如内部有三个限速器,When() 接口返回的就是三个限速器里面延迟最大的。
代码语言:javascript复制// k8s.io/client-go/util/workqueue/default_rate_limiters.go
// 混合限速器,选择所有限速器中速度最慢的一种方案
type MaxOfRateLimiter struct {
// 限速器数组
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
// 获取所有限速器里面时间最大的延迟时间
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
// Requeues 次数也是取最大值
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
// 循环遍历 Forget
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
混合限速器的实现非常简单,而在 Kubernetes 中默认的控制器限速器初始化就是使用的混合限速器:
代码语言:javascript复制// k8s.io/client-go/util/workqueue/default_rate_limiters.go
// 实例化默认的限速器,由 ItemExponentialFailureRateLimiter 和
// BucketRateLimiter 组成的混合限速器
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket 容量
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
return &ItemExponentialFailureRateLimiter{
failures: map[interface{}]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}
到这里我们就将限速队列分析完了,接下来我们需要了解下 WorkQueue 在控制器中是如何使用的。