滑动时间窗口设计方法
导语:系统做出一系列调度要基于系统运行的统计指标,例如熔断(基于请求数、并发数、请求延迟、异常比例等),本文解析基于滑动时间窗口的统计结构设计办法。
什么是滑动时间窗口
固定窗口:一个固定长度的格子,这个格子里的所有事件元素就是统计目标
滑动窗口:滑动窗口将固定窗口等分为多个小的窗口,统计时可以圈定若干个连续窗口,统计落入其内的事件元素。显然滑动窗口可以做更细粒度上的统计。
滑动时间窗口:应用指标统计很重要一点是要与时间对齐,比如流控可能希望的是拿到前一秒的失败请求比例,所以在我们统计的指标都是需要与时间对齐。滑动时间窗口就是把一段时间片分为多个窗口,然后计算对应的时间落在那个窗口上,来对数据统计。
滑动时间窗口怎么运行
通过上面对滑动事件窗口的描述,我们可以知道滑动时间窗口有如下特点:
- 每个小窗口的大小均等
- 滑动窗口的个数及大小可以根据实际应用进行控制
那么对应的滑动时间窗口有两个重要设置:
- 滑动窗口的统计周期:表示滑动窗口的统计周期,一个滑动窗口包含有一个或多个小窗口
- 滑动窗口中每个小窗口长度:每个小窗口的统计周期
如上图,滑动窗口的统计周期是1000ms,每个小窗口的统计周期是200ms,一个滑动窗口有5个小滑动窗口
那么怎么做具体的统计呢?
如上,
- 每个小窗口都是一个具体的数据结构,里面做一些统计相关的结构设计,用户可以自定义这些结构
- 每个小窗口都有1个开始时间和1个结束时间,事件发生的时间落在哪个小窗口格子的起始区间内,那么对事件的统计就要落在这个小窗口内
接下来就是怎么计算事件落在哪个小窗口内呢?
假设事件发生的时间是now, 小窗口的统计周期也就是长度是bucketLength, 滑动窗口的统计周期也就是长度是windowLength, 那么小窗口的index计算如下:
举例如下:
如上图所示的滑动事件窗口参数,bucketLength=200ms, windowLength=1000ms,
- 当前时间是1ms, 按上述公式计算得index=0, 也就是会落入第1个小窗口格子内;
- 当前时间是300ms, 按上述公式计算得index=1,也就是会落入第2个小窗口格子内;
- 当前时间1001ms, 按上述公式计算得index=0,也就是会落入第1个小窗口格子内;
换句话说,我们知道事件发生的时间,就能知道事件落入哪个格子内,那么就能对事件做出相应统计计算。
从上述的计算不难看出,随着时间的推移,这个格子的计算就是在长度固定的数组内循环移动,或者在一个环形队列上循环移动。
那么如果已经过了第一遍循环,新的时间格子循环移动到前面,也就是进入到新的统计周期后,要做哪些操作呢?
- 把整个滑动窗口的起始时间设置为新的起始时间
- 把小窗口内数据结构重置后再进行新的统计
滑动时间窗口两个参数的实际意义
通过上述描述,我们已经知道滑动时间窗口的运行原理和使用方法,那么滑动时间窗口的两个参数对实际运行结果会产生怎样的影响呢?这对我们如何设置两个参数会产生决定作用。
滑动时间窗口的两个参数:
- 小窗口格子统计周期长度:bucketLength
- 滑动窗口统计周期长度: windowLength
小窗口格子越小,也就是实际记录事件的时间划分越细致,那我们得到的统计结果就会越精确,但与此同时,我们就会面对更大的存储计算和并发压力;
滑动窗口越大,也就是我们的统计周期变得更长,那么得到的统计结果就更平均,也就是图线就越平滑,相反的,窗口越小则观察统计周期就越小,那么统计结果的差异就会越大,也就是脉冲和毛刺就会越强烈,但是与实际情况会更贴近。
所以实际实现和运行中,我们要综合考虑系统的抗脉冲能力和并发能力,做出合理的设置。
代码示例
参考sentinel滑动窗口代码,简化最基础的实现部分并注释如下。
组件由两部分组成:
- 定长数组来表示滑动窗口,数组每个元素就是一个小窗口格子,格子是一个抽象对象,可以存储任意统计结构。
- 模拟时间滑动的算法,保证数据按照时间推移而做出正确滑动
package sliding_window
import (
"errors"
"sync"
"sync/atomic"
)
// 小窗口格式
type Bucket struct {
// 小窗口起始统计位置
Start uint64
// 存储实际自定义统计结构
Value atomic.Value
}
// 用于模拟滑动窗口的定长数组
type BucketArray struct {
// 数组长度
length int
// 实际存储的小窗口数组
data []*Bucket
// 保证并发安全的锁
mutex sync.Mutex
}
// 初始化滑动窗口数组
func NewBucketArray(sampleCount int, bucketLength uint, now uint64, gen BucketItemGenerator) *BucketArray {
ret := &BucketArray{
length: sampleCount,
data: make([]*Bucket, sampleCount),
}
// 这里首先会根据当前时间now计算出其所对在的格子在数组中的下标idx以及该格子的统计起始时间startTime
// startTime的意义是对齐时间
idx := int((now / uint64(bucketLength)) % uint64(sampleCount))
startTime := now - (now % uint64(bucketLength))
// 从[idx, sampleCount-1] 会预先分配每个格子结构*Bucket,并且基于计算出的startTime依次往后填入对应格子的开始时间
for i := idx; i <= sampleCount-1; i {
b := &Bucket{
Start: startTime,
Value: atomic.Value{},
}
b.Value.Store(gen.NewEmptyBucket())
ret.data[i] = b
startTime = uint64(bucketLength)
}
// 从[0,idx-1] 这个区间也会预先分配每个格子结构*Bucket,不过需要注意的是,[0,idx-1] 里面的格子预先分配的时间也是未来的时间
for i := 0; i < idx; i {
b := &Bucket{
Start: startTime,
Value: atomic.Value{},
}
b.Value.Store(gen.NewEmptyBucket())
ret.data[i] = b
startTime = uint64(bucketLength)
}
return ret
}
// 根据索引获取bucket
func (ba *BucketArray) get(idx int) *Bucket {
ba.mutex.Lock()
defer ba.mutex.Unlock()
return ba.data[idx]
}
// 基于时间做位置划分的滑动窗口
type SlidingWindowArray struct {
// bucket长度,这里是基于时间划分,那么bucket长度就是时间间隔,例如200ms
bucketLength uint
// 一个滑动窗口包含的小窗口个数,bucket长度和包含的小窗口个数决定了滑动窗口的统计周期也就是长度
sampleCount uint
// 一个滑动时间窗口长度,由上述两个参数确定,如sampleCount=5,bucketLength=200ms, 则interval=1000ms
interval uint
// 实际存储bucket的数组结构
array *BucketArray
// 用于并发安全操作的锁
mutex sync.Mutex
}
// 初始化滑动时间窗口
func NewSlidingWindowArray(sampleCount uint, bucketLength uint,
nowInMs uint64, gen BucketItemGenerator) *SlidingWindowArray {
return &SlidingWindowArray{
bucketLength: bucketLength,
sampleCount: sampleCount,
interval: bucketLength * sampleCount,
array: NewBucketArray(int(sampleCount), bucketLength, nowInMs, gen),
}
}
// 给定一个时间点,返回所属的bucket,从而可以对bucket做事件统计记录或者查询, 这个实际上就是模拟了全部的滑动过程
func (swa *SlidingWindowArray) CurrentBucketOfTime(now uint64, bg BucketItemGenerator) (*Bucket, error) {
timeId := now / uint64(swa.bucketLength)
idx := int(timeId) % swa.array.length
bucketStart := now - (now % uint64(swa.bucketLength))
old := swa.array.get(idx)
if bucketStart == atomic.LoadUint64(&old.Start) {
return old, nil
} else if bucketStart > atomic.LoadUint64(&old.Start) {
// 实际上已经进入新的时间格子循环了
swa.mutex.Lock()
// 进入新的循环是要清空原先统计数据的
old = bg.ResetBucket(old, bucketStart)
swa.mutex.Unlock()
return old, nil
} else {
// 获取到未来时间格子,暂时不可能发生,记做失败
return nil, errors.New("future bucket")
}
}
func (swa *SlidingWindowArray) ValuesConditional(now uint64, predicate func(uint64) bool) []*Bucket {
if now <= 0 {
return make([]*Bucket, 0)
}
ret := make([]*Bucket, 0, swa.array.length)
for i := 0; i < swa.array.length; i {
sb := swa.array.get(i)
if sb == nil || (now-atomic.LoadUint64(&sb.Start)) > uint64(swa.interval) || !predicate(atomic.
LoadUint64(&sb.Start)) {
continue
}
ret = append(ret, sb)
}
return ret
}
// Bucket实际结构的生成接口
type BucketItemGenerator interface {
// 生成格子里的实际统计结构
NewEmptyBucket() interface{}
// 用于格子统计数据的重置清空
ResetBucket(b *Bucket, startTime uint64) *Bucket
}
使用示例:
代码语言:javascript复制// 统计qps
// 用于统计QPS的实际结构
type MetricBucketItem struct {
Counter int64
}
type SlidingWindowMetric struct {
data *SlidingWindowArray
}
func (swm *SlidingWindowMetric) NewEmptyBucket() interface{} {
return &MetricBucketItem{Counter: 0}
}
func (swm *SlidingWindowMetric) ResetBucket(bw *Bucket, startTime uint64) *Bucket {
atomic.StoreUint64(&bw.Start, startTime)
bw.Value.Store(&MetricBucketItem{Counter: 0})
return bw
}
func NewSlidingWindowMetric(sampleCount uint32, bucketLength uint32) *SlidingWindowMetric {
nowInMs := time.Now().UnixNano() / 1e6
swm := &SlidingWindowMetric{}
swm.data = NewSlidingWindowArray(uint(sampleCount), uint(bucketLength), uint64(nowInMs), swm)
return swm
}
// 有访问就添加计数
func (swm *SlidingWindowMetric) AddCount(count int64) {
nowInMs := time.Now().UnixNano() / 1e6
// 获取当前时间应当落入的格子
curBucket, err := swm.data.CurrentBucketOfTime(uint64(nowInMs), swm)
if err != nil {
fmt.Printf("get bucket from SlidingWindowArray err, %v", err)
return
}
if curBucket == nil {
fmt.Printf("current bucket is nil")
return
}
mb := curBucket.Value.Load()
if mb == nil {
fmt.Printf("nil bucket")
return
}
b, ok := mb.(*MetricBucketItem)
if !ok {
fmt.Printf("fail to type assert")
return
}
// 将相应统计结构计数做累加
atomic.AddInt64(&b.Counter, count)
}
func (swm *SlidingWindowMetric) GetQPS() float64 {
now := uint64(time.Now().UnixNano() / 1e6)
startTime := now - (now % uint64(swm.data.bucketLength))
end := startTime
start := end - uint64(swm.data.interval) uint64(swm.data.bucketLength)
satisfiedBuckets := swm.data.ValuesConditional(now, func(ws uint64) bool {
return ws >= start && ws <= end
})
// 获取符合条件的bucket内所有计数
ret := int64(0)
for _, sb := range satisfiedBuckets {
mb := sb.Value.Load()
if mb == nil {
fmt.Printf("nil BucketWrap")
continue
}
counter, ok := mb.(*MetricBucketItem)
if !ok {
fmt.Printf("type assert failed")
continue
}
ret = counter.Counter
}
// 假设interval 长度设为ms
return float64(ret) / (float64(swm.data.interval) / 1000.00)
}