滑动时间窗口设计

2022-02-18 18:06:29 浏览数 (1)

滑动时间窗口设计方法

导语:系统做出一系列调度要基于系统运行的统计指标,例如熔断(基于请求数、并发数、请求延迟、异常比例等),本文解析基于滑动时间窗口的统计结构设计办法。

什么是滑动时间窗口

固定窗口:一个固定长度的格子,这个格子里的所有事件元素就是统计目标

滑动窗口:滑动窗口将固定窗口等分为多个小的窗口,统计时可以圈定若干个连续窗口,统计落入其内的事件元素。显然滑动窗口可以做更细粒度上的统计。

滑动时间窗口:应用指标统计很重要一点是要与时间对齐,比如流控可能希望的是拿到前一秒的失败请求比例,所以在我们统计的指标都是需要与时间对齐。滑动时间窗口就是把一段时间片分为多个窗口,然后计算对应的时间落在那个窗口上,来对数据统计。

滑动时间窗口怎么运行

通过上面对滑动事件窗口的描述,我们可以知道滑动时间窗口有如下特点:

  1. 每个小窗口的大小均等
  2. 滑动窗口的个数及大小可以根据实际应用进行控制

那么对应的滑动时间窗口有两个重要设置:

  1. 滑动窗口的统计周期:表示滑动窗口的统计周期,一个滑动窗口包含有一个或多个小窗口
  2. 滑动窗口中每个小窗口长度:每个小窗口的统计周期

如上图,滑动窗口的统计周期是1000ms,每个小窗口的统计周期是200ms,一个滑动窗口有5个小滑动窗口

那么怎么做具体的统计呢?

如上,

  1. 每个小窗口都是一个具体的数据结构,里面做一些统计相关的结构设计,用户可以自定义这些结构
  2. 每个小窗口都有1个开始时间和1个结束时间,事件发生的时间落在哪个小窗口格子的起始区间内,那么对事件的统计就要落在这个小窗口内

接下来就是怎么计算事件落在哪个小窗口内呢?

假设事件发生的时间是now, 小窗口的统计周期也就是长度是bucketLength, 滑动窗口的统计周期也就是长度是windowLength, 那么小窗口的index计算如下:

举例如下:

如上图所示的滑动事件窗口参数,bucketLength=200ms, windowLength=1000ms,

  1. 当前时间是1ms, 按上述公式计算得index=0, 也就是会落入第1个小窗口格子内;
  2. 当前时间是300ms, 按上述公式计算得index=1,也就是会落入第2个小窗口格子内;
  3. 当前时间1001ms, 按上述公式计算得index=0,也就是会落入第1个小窗口格子内;

换句话说,我们知道事件发生的时间,就能知道事件落入哪个格子内,那么就能对事件做出相应统计计算。

从上述的计算不难看出,随着时间的推移,这个格子的计算就是在长度固定的数组内循环移动,或者在一个环形队列上循环移动。

那么如果已经过了第一遍循环,新的时间格子循环移动到前面,也就是进入到新的统计周期后,要做哪些操作呢?

  1. 把整个滑动窗口的起始时间设置为新的起始时间
  2. 把小窗口内数据结构重置后再进行新的统计

滑动时间窗口两个参数的实际意义

通过上述描述,我们已经知道滑动时间窗口的运行原理和使用方法,那么滑动时间窗口的两个参数对实际运行结果会产生怎样的影响呢?这对我们如何设置两个参数会产生决定作用。

滑动时间窗口的两个参数:

  1. 小窗口格子统计周期长度:bucketLength
  2. 滑动窗口统计周期长度: windowLength

小窗口格子越小,也就是实际记录事件的时间划分越细致,那我们得到的统计结果就会越精确,但与此同时,我们就会面对更大的存储计算和并发压力;

滑动窗口越大,也就是我们的统计周期变得更长,那么得到的统计结果就更平均,也就是图线就越平滑,相反的,窗口越小则观察统计周期就越小,那么统计结果的差异就会越大,也就是脉冲和毛刺就会越强烈,但是与实际情况会更贴近。

所以实际实现和运行中,我们要综合考虑系统的抗脉冲能力和并发能力,做出合理的设置。

代码示例

参考sentinel滑动窗口代码,简化最基础的实现部分并注释如下。

组件由两部分组成:

  1. 定长数组来表示滑动窗口,数组每个元素就是一个小窗口格子,格子是一个抽象对象,可以存储任意统计结构。
  2. 模拟时间滑动的算法,保证数据按照时间推移而做出正确滑动
代码语言:javascript复制
package sliding_window

import (
   "errors"
   "sync"
   "sync/atomic"
)

// 小窗口格式
type Bucket struct {
   // 小窗口起始统计位置
   Start uint64
   // 存储实际自定义统计结构
   Value atomic.Value
}

// 用于模拟滑动窗口的定长数组
type BucketArray struct {
   // 数组长度
   length int
   // 实际存储的小窗口数组
   data []*Bucket
   // 保证并发安全的锁
   mutex sync.Mutex
}

// 初始化滑动窗口数组
func NewBucketArray(sampleCount int, bucketLength uint, now uint64, gen BucketItemGenerator) *BucketArray {
   ret := &BucketArray{
      length: sampleCount,
      data:   make([]*Bucket, sampleCount),
   }
   // 这里首先会根据当前时间now计算出其所对在的格子在数组中的下标idx以及该格子的统计起始时间startTime
   // startTime的意义是对齐时间
   idx := int((now / uint64(bucketLength)) % uint64(sampleCount))
   startTime := now - (now % uint64(bucketLength))

   // 从[idx, sampleCount-1] 会预先分配每个格子结构*Bucket,并且基于计算出的startTime依次往后填入对应格子的开始时间
   for i := idx; i <= sampleCount-1; i   {
      b := &Bucket{
         Start: startTime,
         Value: atomic.Value{},
      }
      b.Value.Store(gen.NewEmptyBucket())
      ret.data[i] = b
      startTime  = uint64(bucketLength)
   }

   // 从[0,idx-1] 这个区间也会预先分配每个格子结构*Bucket,不过需要注意的是,[0,idx-1] 里面的格子预先分配的时间也是未来的时间
   for i := 0; i < idx; i   {
      b := &Bucket{
         Start: startTime,
         Value: atomic.Value{},
      }
      b.Value.Store(gen.NewEmptyBucket())
      ret.data[i] = b
      startTime  = uint64(bucketLength)
   }

   return ret
}

// 根据索引获取bucket
func (ba *BucketArray) get(idx int) *Bucket {
   ba.mutex.Lock()
   defer ba.mutex.Unlock()

   return ba.data[idx]
}

// 基于时间做位置划分的滑动窗口
type SlidingWindowArray struct {
   // bucket长度,这里是基于时间划分,那么bucket长度就是时间间隔,例如200ms
   bucketLength uint
   // 一个滑动窗口包含的小窗口个数,bucket长度和包含的小窗口个数决定了滑动窗口的统计周期也就是长度
   sampleCount uint
   // 一个滑动时间窗口长度,由上述两个参数确定,如sampleCount=5,bucketLength=200ms, 则interval=1000ms
   interval uint
   // 实际存储bucket的数组结构
   array *BucketArray
   // 用于并发安全操作的锁
   mutex sync.Mutex
}

// 初始化滑动时间窗口
func NewSlidingWindowArray(sampleCount uint, bucketLength uint,
   nowInMs uint64, gen BucketItemGenerator) *SlidingWindowArray {
   return &SlidingWindowArray{
      bucketLength: bucketLength,
      sampleCount:  sampleCount,
      interval:     bucketLength * sampleCount,
      array:        NewBucketArray(int(sampleCount), bucketLength, nowInMs, gen),
   }
}

// 给定一个时间点,返回所属的bucket,从而可以对bucket做事件统计记录或者查询, 这个实际上就是模拟了全部的滑动过程
func (swa *SlidingWindowArray) CurrentBucketOfTime(now uint64, bg BucketItemGenerator) (*Bucket, error) {
   timeId := now / uint64(swa.bucketLength)
   idx := int(timeId) % swa.array.length

   bucketStart := now - (now % uint64(swa.bucketLength))

   old := swa.array.get(idx)
   if bucketStart == atomic.LoadUint64(&old.Start) {
      return old, nil
   } else if bucketStart > atomic.LoadUint64(&old.Start) {
      // 实际上已经进入新的时间格子循环了
      swa.mutex.Lock()
      // 进入新的循环是要清空原先统计数据的
      old = bg.ResetBucket(old, bucketStart)
      swa.mutex.Unlock()
      return old, nil
   } else {
      // 获取到未来时间格子,暂时不可能发生,记做失败
      return nil, errors.New("future bucket")
   }
}

func (swa *SlidingWindowArray) ValuesConditional(now uint64, predicate func(uint64) bool) []*Bucket {
   if now <= 0 {
      return make([]*Bucket, 0)
   }
   ret := make([]*Bucket, 0, swa.array.length)
   for i := 0; i < swa.array.length; i   {
      sb := swa.array.get(i)
      if sb == nil || (now-atomic.LoadUint64(&sb.Start)) > uint64(swa.interval) || !predicate(atomic.
         LoadUint64(&sb.Start)) {
         continue
      }
      ret = append(ret, sb)
   }
   return ret
}

// Bucket实际结构的生成接口
type BucketItemGenerator interface {
   // 生成格子里的实际统计结构
   NewEmptyBucket() interface{}
   // 用于格子统计数据的重置清空
   ResetBucket(b *Bucket, startTime uint64) *Bucket
}

使用示例:

代码语言:javascript复制
// 统计qps

// 用于统计QPS的实际结构
type MetricBucketItem struct {
   Counter int64
}

type SlidingWindowMetric struct {
   data *SlidingWindowArray
}

func (swm *SlidingWindowMetric) NewEmptyBucket() interface{} {
   return &MetricBucketItem{Counter: 0}
}

func (swm *SlidingWindowMetric) ResetBucket(bw *Bucket, startTime uint64) *Bucket {
   atomic.StoreUint64(&bw.Start, startTime)
   bw.Value.Store(&MetricBucketItem{Counter: 0})
   return bw
}

func NewSlidingWindowMetric(sampleCount uint32, bucketLength uint32) *SlidingWindowMetric {
   nowInMs := time.Now().UnixNano() / 1e6

   swm := &SlidingWindowMetric{}
   swm.data = NewSlidingWindowArray(uint(sampleCount), uint(bucketLength), uint64(nowInMs), swm)
   return swm
}

// 有访问就添加计数
func (swm *SlidingWindowMetric) AddCount(count int64) {
   nowInMs := time.Now().UnixNano() / 1e6
   // 获取当前时间应当落入的格子
   curBucket, err := swm.data.CurrentBucketOfTime(uint64(nowInMs), swm)

   if err != nil {
      fmt.Printf("get bucket from SlidingWindowArray err, %v", err)
      return
   }
   if curBucket == nil {
      fmt.Printf("current bucket is nil")
      return
   }
   mb := curBucket.Value.Load()
   if mb == nil {
      fmt.Printf("nil bucket")
      return
   }
   b, ok := mb.(*MetricBucketItem)
   if !ok {
      fmt.Printf("fail to type assert")
      return
   }

   // 将相应统计结构计数做累加
   atomic.AddInt64(&b.Counter, count)
}

func (swm *SlidingWindowMetric) GetQPS() float64 {
   now := uint64(time.Now().UnixNano() / 1e6)
   startTime := now - (now % uint64(swm.data.bucketLength))
   end := startTime
   start := end - uint64(swm.data.interval)   uint64(swm.data.bucketLength)
   satisfiedBuckets := swm.data.ValuesConditional(now, func(ws uint64) bool {
      return ws >= start && ws <= end
   })

   // 获取符合条件的bucket内所有计数
   ret := int64(0)
   for _, sb := range satisfiedBuckets {
      mb := sb.Value.Load()
      if mb == nil {
         fmt.Printf("nil BucketWrap")
         continue
      }
      counter, ok := mb.(*MetricBucketItem)
      if !ok {
         fmt.Printf("type assert failed")
         continue
      }
      ret  = counter.Counter
   }
   // 假设interval 长度设为ms
   return float64(ret) / (float64(swm.data.interval) / 1000.00)
}

0 人点赞