分布式高并发系统限流原理与实践

2020-12-31 17:39:10 浏览数 (1)

分布式三大利器

随着业务的发展壮大,对后端服务的压力也会越来越大,为了打造高效稳定的系统, 产生了分布式,微服务等等系统设计,因为这个原因,设计复杂度也随之增加,基于此 诞生了高并发系统三大利器限流,缓存,降级/熔断

「限流:」 从系统的流量入口考虑,从进入的流量上进行限制,达到保护系统的作用;

「缓存:」 将数据库中的数据缓存起来,提升系统访问速度和并发度,保护数据库资源。

降级和熔断比较类似,都是属于过载保护机制,但是实现上有着如下区别

「降级:」 从系统内部的平级服务或者业务的维度考虑,流量大了,可以暂停或延迟一些非重要服务,如日志收集等等,保护其他正常使用;

「熔断:」 当某一服务出现了过载现象,为防止整个系统故障,直接关闭该服务或者保证部分请求成功,另一部分返回失败。比如10s内连续请求失败次数达到20次, 触发熔断机制,过滤60%请求。

限流的必要性

本文主要介绍第一利器限流,后续会有详细的文章介绍缓存和降级。

应对秒杀,大促等高性能压力的场景时,为了保证系统的平稳运行,必须针对超过预期的流量,通过预先设定的限流规则选择性的对某些请求进行限流。

在分布式系统中,其接口的调用来自多个系统,有的调用方可能会请求数量突增,过去争夺服务器资源, 而来自其他调用方的接口请求因此来不及响应而排队等待,微服务整体的请求响应时间变长甚至超时。 所以为了防止接口被过度调用,需要对每个调用方进行细粒度的访问限流。

限流方案

网关层限流

请求漏斗模型

上图是一个请求流量漏斗模型,执行过程依次是

  1. 用户请求经过网关将请求下发到服务层
  2. 服务层针对每条请求获取缓存数据
  3. 缓存没有命中,直接请求数据

网关作为服务的入口,承接了系统整个系统的所有流量,是整个访问链路的源头,具有"一夫当关,万夫莫开"的角色,所以是最适合做限流的的途径。

另外,引入网关层还可以解决分布式系统中的如下问题:

  • 客户端会多次请求不同的微服务,增加了客户端的复杂性
  • 存在跨域请求,在一定场景下处理相对复杂
  • 认证复杂,每个服务都需要独立认证
  • 难以重构,随着项目的迭代,可能需要重新划分微服务。例如,可能将多个服务合并成一个或者将一个服务拆分成多个。如果客户端直接与微服务通信, 那么重构将会很难实施
  • 某些微服务可能使用了防火墙 / 浏览器不友好的协议,直接访问会有一定的困难
  • 恶意请求,存在安全问题

目前主流的网关层有以软件为代表的Nginx,还有Spring Cloud中的Gateway和Zuul这类网关层组件,也有以硬件 软件为代表的F5(F5价钱贵到你怀疑人生)。 除此之外,还有许多大厂开发的组件,比如腾讯里约。

中间件限流

提到中间件首先想到的就是「消息队列(Message queue)」,简称MQ, 顾名思义,消息队列就是一个 存放“消息”的“队列”,有着 FIFO 的特性。这里的“消息”实际就是“数据”的意思,因此消息队列本身就是一个简单的数据结构——队列。

在设计模式上,「消息队列是“生产者-消费者”模式的一个经典实现」。 一般而言,用户就是请求的“生产者”,而后台服务就是请求的“消费者”,kafka、RabbitMQ,RocketMQ等等都是目前系统中常见的MQ, 在系统中可以达到异步解耦,削峰填谷的作用。

算法

限流算法常见有三种,分别是计数器、漏桶、令牌桶。

计数器(固定窗口)

计数器算法是限流算法里最简单也是最容易实现的一种算法。比如对于A接口来说,我们1分钟的访问次数不能超过100个。 我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于100并且该请求与第一个请求的间隔时间还在1分钟之内, 那么说明请求数过多;如果该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter。

在对计数器进行技术的问题,「要注意原子性,防止并发问题,导致计数不准」。 代码设计如下:

代码语言:javascript复制
// CounterLimit 计数器
type CounterLimit struct {
   counter      int64 //计数器
   limit        int64 //指定时间窗口内允许的最大请求数
   intervalNano int64 //指定的时间窗口
   unixNano     int64 //unix时间戳,单位为纳秒
}

// NewCounterLimit 初始化
func NewCounterLimit(interval time.Duration, limit int64) *CounterLimit {

   return &CounterLimit{
    counter:      0,
    limit:        limit,
    intervalNano: int64(interval),
    unixNano:     time.Now().UnixNano(),
   }
}

// Allow 判断当前时间窗口是否允许请求
func (c *CounterLimit) Allow() bool {

   now := time.Now().UnixNano()
   if now-c.unixNano > c.intervalNano { //如果当前过了当前的时间窗口,则重新进行计数
    atomic.StoreInt64(&c.counter, 0)
    atomic.StoreInt64(&c.unixNano, now)
    return true
   }

   atomic.AddInt64(&c.counter, 1)
   return c.counter < c.limit //判断是否要进行限流
}

滑动窗口

滑动窗口固定窗口的优化版本,主要解决临界问题。

如上图,假设0:59时,瞬间收到100个请求,并且1:00时候又瞬间收到了100个请求,那么其实这个服务在 1秒里面,收到了200个请求。 我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求,可以瞬间超过我们的速率限制,压垮我们的应用。

滑动窗口协议(Sliding Window Protocol),属于TCP协议的一种应用,用于网络数据传输时的流量控制,以避免拥塞的发生。 该协议允许发送方在停止并等待确认前发送多个数据分组。由于发送方不必每发一个分组就停下来等待确认。因此该协议可以加速数据的传输,提高网络吞吐量。

滑动窗口计数器是通过将窗口再细分,并且按照时间"滑动",这种算法避免了固定窗口计数器带来的双倍突发请求,但时间区间的精度越高,算法所需的空间容量就越大。

在上图中,整个红色的矩形框表示一个时间窗口,也就是一分钟。然后我们将时间窗口进行划分,每格代表的是10秒钟,总共6格。每过10秒钟,我们的时间窗口就会 往右滑动一格。每一个格子都有自己独立的计数器counter,假设一个请求 在0:25秒的时候到达,那么0:20~0:29对应的counter就会加1。

「滑动窗口怎么解决刚才的临界问题的呢?」

0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格子中。当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的 总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触发了限流。

当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

代码语言:javascript复制
var (
 limitCount  int        = 10 // 6s限频
 limitBucket int        = 6  // 滑动窗口个数
 curCount    int32      = 0  // 记录限频数量
 head        *ring.Ring      // 环形队列(链表)
)

func main() {
 tcpAddr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:9090") //获取一个tcpAddr
 checkError(err)
 listener, err := net.ListenTCP("tcp", tcpAddr) //监听一个端口
 checkError(err)
 defer listener.Close()
 // 初始化滑动窗口
 head = ring.New(limitBucket)
 for i := 0; i < limitBucket; i   {
  head.Value = 0
  head = head.Next()
 }
 // 启动执行器
 go func() {
  timer := time.NewTicker(time.Second * 1)
  for range timer.C { // 定时每隔1秒刷新一次滑动窗口数据
   subCount := int32(0 - head.Value.(int))
   newCount := atomic.AddInt32(&curCount, subCount)

   arr := [6]int{}
   for i := 0; i < limitBucket; i   { // 这里是为了方便打印
    arr[i] = head.Value.(int)
    head = head.Next()
   }
   fmt.Println("move subCount,newCount,arr", subCount, newCount, arr)
   head.Value = 0
   head = head.Next()
  }
 }()

 for {
  conn, err := listener.Accept() // 在此处阻塞,每次来一个请求才往下运行handle函数
  if err != nil {
   fmt.Println(err)
   continue
  }
  go handle(&conn) // 起一个单独的协程处理,有多少个请求,就起多少个协程,协程之间共享同一个全局变量limiting,对其进行原子操作。
 }
}

func handle(conn *net.Conn) {
 defer (*conn).Close()
 n := atomic.AddInt32(&curCount, 1)
 //fmt.Println("handler n:", n)
 if n > int32(limitCount) { // 超出限频
  atomic.AddInt32(&curCount, -1) // add 1 by atomic,业务处理完毕,放回令牌
  (*conn).Write([]byte("HTTP/1.1 404 NOT FOUNDrnrnError, too many request, please try again."))
 } else {
  mu := sync.Mutex{}
  mu.Lock()
  pos := head.Prev()
  val := pos.Value.(int)
  val  
  pos.Value = val
  mu.Unlock()
  time.Sleep(1 * time.Second)                                             // 假设我们的应用处理业务用了1s的时间
  (*conn).Write([]byte("HTTP/1.1 200 OKrnrnI can change the world!")) // 业务处理结束后,回复200成功。
 }
}

// 异常报错的处理
func checkError(err error) {
 if err != nil {
  fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
  os.Exit(1)
 }
}

由于滑动窗口比较难于理解,打印了如下压测日志,压测工具地址 https://github.com/rakyll/hey

代码语言:javascript复制
$ ./hey -c 6 -n 300 -q 6 -t 80 http://localhost:9090

move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
move subCount,newCount,arr 0 6 [0 0 0 0 0 6]
move subCount,newCount,arr 0 10 [0 0 0 0 6 4]
move subCount,newCount,arr 0 10 [0 0 0 6 4 0]
move subCount,newCount,arr 0 10 [0 0 6 4 0 0]
move subCount,newCount,arr 0 10 [0 6 4 0 0 0]
move subCount,newCount,arr -6 4 [6 4 0 0 0 0]
move subCount,newCount,arr -4 6 [4 0 0 0 0 6]
move subCount,newCount,arr 0 10 [0 0 0 0 6 4]
move subCount,newCount,arr 0 10 [0 0 0 6 4 0]
move subCount,newCount,arr 0 10 [0 0 6 4 0 0]
move subCount,newCount,arr 0 10 [0 6 4 0 0 0]
move subCount,newCount,arr -6 4 [6 4 0 0 0 0]
move subCount,newCount,arr -4 3 [4 0 0 0 0 3]
move subCount,newCount,arr 0 3 [0 0 0 0 3 0]
move subCount,newCount,arr 0 3 [0 0 0 3 0 0]
move subCount,newCount,arr 0 3 [0 0 3 0 0 0]
move subCount,newCount,arr 0 3 [0 3 0 0 0 0]
move subCount,newCount,arr -3 0 [3 0 0 0 0 0]
move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
move subCount,newCount,arr 0 0 [0 0 0 0 0 0]

漏桶

漏桶算法概念如下:

  • 将每个请求视作"水滴"放入"漏桶"进行存储;
  • “漏桶"以固定速率向外"漏"出请求来执行如果"漏桶"空了则停止"漏水”;
  • 如果"漏桶"满了则多余的"水滴"会被直接丢弃。
代码语言:javascript复制
// BucketLimit 漏桶结构
type BucketLimit struct {
 rate       float64 //漏桶中水的漏出速率
 bucketSize float64 //漏桶最多能装的水大小
 unixNano   int64   //unix时间戳
 curWater   float64 //当前桶里面的水
}

// NewBucketLimit  初始化
func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit {
 return &BucketLimit{
  bucketSize: float64(bucketSize),
  rate:       rate,
  unixNano:   time.Now().UnixNano(),
  curWater:   0,
 }
}

// refresh 更新当前桶的容量
func (b *BucketLimit) refresh() {
 now := time.Now().UnixNano()
 //时间差, 把纳秒换成秒
 diffSec := float64(now-b.unixNano) / 1000 / 1000 / 1000
 b.curWater = math.Max(0, b.curWater-diffSec*b.rate)
 b.unixNano = now
 return
}

// Allow 允许请求,是否超过桶的容量
func (b *BucketLimit) Allow() bool {
 b.refresh()
 if b.curWater < b.bucketSize {
  b.curWater = b.curWater   1
  return true
 }

 return false
}

漏桶算法也存在着明显缺陷,当短时间内有大量的突发请求时,即便此时服务器没有任何负载,每个请求也都得在队列中等待一段时间才能被响应。

令牌桶

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。 令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

代码语言:javascript复制
// TokenBucket 令牌桶
type TokenBucket struct {
 rate      int // 令牌放入速度
 tokenSize int // 令牌桶的容量
 curNum    int // 当前桶中token
}

// NewTokenBucket 初始化
func NewTokenBucket(rate, tokenSize int) *TokenBucket {
 return &TokenBucket{
  rate:      rate,
  tokenSize: tokenSize,
 }
}

// PushToken 在桶中存放token
func (t *TokenBucket) PushToken() chan struct{} {
 tb := make(chan struct{}, t.tokenSize)
 ticker := time.NewTicker(time.Duration(1000) * time.Microsecond)
 //初始化token
 for i := 0; i < t.tokenSize; i   {
  tb <- struct{}{}
 }
 t.curNum = t.tokenSize

 // 指定速率放入token
 go func() {
  for {
   for i := 0; i < t.rate; i   {
    tb <- struct{}{}
   }
   t.curNum  = t.rate
   if t.curNum > t.tokenSize {
    t.curNum = t.tokenSize
   }
   <-ticker.C
  }
 }()
 return tb
}

// popToken 取出token
func (t *TokenBucket) PopToken(bucket chan struct{}, n int) {
 for i := 0; i < n; i   {
  _, ok := <-bucket
  if ok {
   t.curNum -= 1
   fmt.Println("get  token  success")
  } else {
   fmt.Println("get  token  fail")
  }
 }
}

四种算法比较

算法

确定参数

空间复杂度

时间复杂度

限制突发流量

平滑限流

分布式环境下实现难度

固定窗口

计数周期T、周期内最大访问数N

低O(1)(记录周期内访问次数及周期开始时间)

低O(1)

滑动窗口

计数周期T、周期内最大访问数N

高O(N)(记录每个小周期中的访问数量)

中O(N)

相对实现。滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑

漏桶

漏桶流出速度r、漏桶容量N

低O(1)(记录当前漏桶中容量)

高O(N)

令牌桶

令牌产生速度r、令牌桶容量N

低O(1)(记录当前令牌桶中令牌数)

高O(N)

总结

文中涉及代码已上传至代码库,参见 https://github.com/MerlinFeng/codenote/tree/main/rate_limit

在实际的系统设计中,令牌桶和漏桶算法的应用是比较广泛的,分别有着不同的使用场景。

令牌桶可以用来保护自己,主要用来对上游调用者频率进行限流,为的是让自己不被打垮。 所以如果自己本身有处理能力的时候,如果流量突发(实际消费能力强于配置的流量限制),那么实际处理速率可以超过配置的限制。

漏桶算法,用来保护他人,也就是保护他所调用的下游系统。主要场景是,当调用的第三方系统本身没有保护机制,或者有流量限制的时候, 我们的调用速度不能超过他的限制,由于我们不能更改第三方系统,所以只有在主调方控制。这个时候,即使流量突发,也必须舍弃。 因为消费能力是第三方决定的。

0 人点赞