读猿码系列——6.Golang中用幂等思路解决缓存击穿的方案:singleflight

2022-12-12 18:14:48 浏览数 (1)

今天我们来看一个日常工作中会遇到的问题:实际开发中常见的做法是在查数据库前先去查缓存,如果缓存Miss(未命中)就去数据库中查到数据并放到缓存里。这是正常情况,然而缓存击穿则是指在高并发系统中,大量请求同时查询一个缓存的key,假如这个key刚好过期就会导致大量的请求都打到数据库上。在绝大多数情况下,可以考虑使用singleflight来抑制重复函数调用。

https://pkg.go.dev/golang.org/x/sync@v0.0.0-20220929204114-8fcdb60fdcc0/singleflight

上面是singleflight包的地址,其中对外提供了以下几个方法:

代码语言:javascript复制
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

func (g *Group) Forget(key string)

我们跟到源码中看下这几个方法singleflight.go。其中Group是由一个互斥锁和一个映射表组成;call结构体中保存了当前调用对应的信息。

代码语言:javascript复制
type call struct {
    wg sync.WaitGroup
    val interface{}
    err error
    dups  int
    chans []chan<- Result
}

type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}

type Result struct {
    Val    interface{}
    Err    error
    Shared bool
}

Do()方法,传入字符串key和fn回调函数,如果key相同,在同一时间只有第一次调用Do方法时才会去执行fn回调函数, 其他请求等待释放锁及执行结果。返回值v表示fn的执行结果;err表示fn返回的err;shared表示key是否是共享的。

代码语言:javascript复制
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        // 存在相同的key, 增加计数
        c.dups  
        g.mu.Unlock()
        c.wg.Wait() // 等待这个key对应的fn调用完成

        if e, ok := c.err.(*panicError); ok {
            panic(e)
        } else if c.err == errGoexit {
            runtime.Goexit()
        }
        return c.val, c.err, true
    }
    c := new(call) // 不存在key是第一个请求, 创建一个call结构体
    c.wg.Add(1)
    g.m[key] = c // 加入到映射表中
    g.mu.Unlock()

    g.doCall(c, key, fn) // 调用方法
    return c.val, c.err, c.dups > 0
}

DoChan()方法和Do()方法类似,不同的是它维护了一个chan Result通道,这使得调用者不用阻塞等待调用返回,而是通过消费通道就可以拿到fn函数的执行结果。这个chan Result通道,在返回给调用者前会先放到call结构体的维护的通知队列里,待fn函数返回结果后DoChan方法会把结果发送给通知队列中的每个通道。

代码语言:javascript复制
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups  
        c.chans = append(c.chans, ch)
        g.mu.Unlock()
        return ch
    }
    c := &call{chans: []chan<- Result{ch}}
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    go g.doCall(c, key, fn)

    return ch
}

我们通过例子来演示下singleflight包的使用方法。下面这段代码对比使用多个goroutine直接调用100次同一函数和使用singleflight包的Do()方法处理后再调用100次同一函数两者的耗时。

代码语言:javascript复制
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"

    "golang.org/x/sync/singleflight"
)

var count int32

func main() {
    var wg sync.WaitGroup
    now := time.Now()
    sg := &singleflight.Group{}
    for i := 0; i < 100; i   {
      wg.Add(1)
      go func() {
        // Getcontent(1)
        SingleGetcontent(sg, 1)
        wg.Done()
      }()
    }
    wg.Wait()

    fmt.Printf("耗时:%s", time.Since(now))
}

func Getcontent(id int) (string, error) {
    atomic.AddInt32(&count, 1)
    time.Sleep(time.Duration(count) * time.Millisecond)
    return fmt.Sprintf("获取第 %d 个内容", id), nil
}

func SingleGetcontent(sg *singleflight.Group, id int) (string, error) {
    v, err, ok := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
      return Getcontent(id)
    })
    fmt.Println(ok)
    return v.(string), err
}

得到的结果是直接调用Getcontent 100次耗时100.486554ms;使用singleflight后耗时1.782066ms。我们还在SingleGetcontent中输出了sg.Do()方法的第三个返回值,就是之前提到的shared,表示返回数据是调用 fn 得到的还是其他相同 key 调用返回的,这里输出得到的结果都是true,如果我们将循环次数改为i<1表示没有其他协程共享,就会返回false。

我们再规范下写法,使用 Context 来控制超时,通常使用方式如下:

代码语言:javascript复制
package main

import (
    "context"
    "fmt"
    "sync/atomic"
    "time"

    "golang.org/x/sync/singleflight"
)

type Result string

func find(ctx context.Context, query string) (Result, error) {
   return Result(fmt.Sprintf("result for %q", query)), nil
}

func main() {
    var sg singleflight.Group
    const n = 5
    waited := int32(n)
    done := make(chan struct{})
    key := "http://cqcoding.site/#/"
    for i := 0; i < n; i   {
      go func(j int) {
        v, _, shared := sg.Do(key, func() (interface{}, error) {
          ret, err := find(context.Background(), key)
          return ret, err
        })
        if atomic.AddInt32(&waited, -1) == 0 {
          close(done)
        }
        fmt.Printf("index: %d, val: %v, shared: %vn", j, v, shared)
      }(i)
    }
    select {
    case <-done:
    case <-time.After(time.Second):
      fmt.Println("Do hangs")
    }
}

// index: 4, val: result for "http://cqcoding.site/#/", shared: false
// index: 0, val: result for "http://cqcoding.site/#/", shared: true
// index: 2, val: result for "http://cqcoding.site/#/", shared: true
// index: 3, val: result for "http://cqcoding.site/#/", shared: false
// index: 1, val: result for "http://cqcoding.site/#/", shared: false

如果函数执行一切正常,则所有请求都能顺利获得正确的数据。相反,如果函数执行遇到问题呢?由于 singleflight 是以阻塞读的方式来控制向下游请求的并发量,在第一个下游请求没有返回之前,所有请求都将被阻塞。

假设服务正常情况下处理能力为 1w QPS,每次请求会发起 3 次 下游调用,其中一个下游调用使用 singleflight 获取控制并发获取数据,请求超时时间为 3s。那么在出现请求超时的情况下,会出现以下几个问题:

  • 协程暴增,最小协程数为3w(1w/s * 3s)
  • 内存暴增,内存总大小为:协程内存大小 1w/s 3s (3 1)次 * (请求包 响应包)大小
  • 大量超时报错:1w/s * 3s
  • 后续请求耗时增加(调度等待)

如果类似问题出现在重要程度高的接口上,例如:读取游戏配置、获取博主信息等关键接口,那么问题将是非常致命的。出现该情况的根本原因有以下两点:

  • 阻塞读:缺少超时控制,难以快速失败;
  • 单并发:控制了并发量,但牺牲了成功率。

对于上述问题,其中阻塞读可以使用DoChan()方法异步调用,通过channel返回结果;使用select语句实现超时控制。

至于单并发问题,在一些对可用性要求极高的场景下,往往需要一定的请求饱和度来保证业务的最终成功率。一次请求还是多次请求,对于下游服务而言并没有太大区别,此时使用 singleflight 只是为了降低请求的数量级,那么使用 Forget() 提高下游请求的并发:

代码语言:javascript复制
v, _, shared := g.Do(key, func() (interface{}, error) {
    go func() {
        time.Sleep(10 * time.Millisecond)
        fmt.Printf("Deleting key: %vn", key)
        g.Forget(key)
    }()
    ret, err := find(context.Background(), key)
    return ret, err
})
代码语言:javascript复制
func (g *Group) Forget(key string) {
    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()
}

当有一个并发请求超过 10ms,那么将会有第二个请求发起,此时只有 10ms 内的请求最多发起一次请求,即最大并发:100 QPS。单次请求失败的影响大大降低。


除了缓存击穿这类缓存Miss缓解数据库压力的应用场景,singleflight还可以被用于查询DNS记录,Go语言的net标准库里使用的lookupGroup结构,就是Go扩展库提供的原语singleflight.Group。它的作用是将对相同域名的DNS记录查询合并成一个查询,使用的是异步查询的方法DoChan。如果有兴趣可以去GitHub上看一下完整的源码,访问链接可直接定位到这部分的源码:

https://github.com/golang/go/blob/master/src/net/lookup.go#L261

总结

SingleFlight的作用是在处理多个goroutine同时调用同一个函数的时候,只让一个goroutine去实际调用这个函数,等到这个goroutine返回结果的时,再把结果返回给其他几个同时调用了相同函数的goroutine,这样可以减少并发调用的数量。在实际应用中也是,它能够在一个服务中减少对下游的并发重复请求。对于单次的失败无法容忍的情况,在高并发的场景下更好的处理方案是:

  1. 放弃使用同步请求,牺牲数据更新的实时性。
  2. “缓存” 存储准实时的数据 “异步更新” 数据到缓存。

本文章涉及代码我放到了gitlab上:

https://gitlab.com/893376179/daily-golang-package/-/tree/main/singleflight

参考:

https://pkg.go.dev/golang.org/x/sync@v0.0.0-20220929204114-8fcdb60fdcc0/singleflight

https://www.cyningsun.com/01-11-2021/golang-concurrency-singleflight.html

https://tern.cc/9v6KQ5

END

0 人点赞