并发编程--用SingleFlight合并重复请求

2020-12-31 10:13:07 浏览数 (1)

大家好啊,今天网管想给大家介绍一下Gosingleflight包,当然它不是直译过来的单飞的意思~~!SingleFlightGo语言sync扩展库提供的另一种并发原语,那么SingleFlight是用于解决什么问题的呢?官方文档里的解释是:

Package singleflight provides a duplicate function call suppression mechanism. 翻译过来就是:singleflight包提供了一种抑制重复函数调用的机制。

具体到Go程序运行的层面来说,SingleFlight的作用是在处理多个goroutine同时调用同一个函数的时候,只让一个goroutine去实际调用这个函数,等到这个goroutine返回结果的时候,再把结果返回给其他几个同时调用了相同函数的goroutine,这样可以减少并发调用的数量。在实际应用中也是,它能够在一个服务中减少对下游的并发重复请求。还有一个比较常见的使用场景是用来防止缓存击穿。

Go提供的SingleFlight

Go扩展库里用singleflight.Group结构体类型提供了SingleFlight并发原语的功能。

singleflight.Group类型提供了三个方法:

代码语言:javascript复制
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

func (g *Group) Forget(key string)
  • Do方法,接受一个字符串Key和一个待调用的函数,会返回调用函数的结果和错误。使用Do方法的时候,它会根据提供的Key判断是否去真正调用fn函数。同一个 key,在同一时间只有第一次调用Do方法时才会去执行fn函数,其他并发的请求会等待调用的执行结果。
  • DoChan方法:类似Do方法,只不过是一个异步调用。它会返回一个通道,等fn函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。
  • Forget方法:在SingleFlight中删除一个Key。这样一来,之后这个Key的Do方法调用会执行fn函数,而不是等待前一个未完成的fn 函数的结果。

应用场景

了解了Go语言提供的 SingleFlight并发原语都有哪些方法可以调用后 ,下面介绍两个它的应用场景。

查询DNS记录

Go语言的net标准库里使用的lookupGroup结构,就是Go扩展库提供的原语singleflight.Group

代码语言:javascript复制
type Resolver struct {
  ......
 // 源码地址 https://github.com/golang/go/blob/master/src/net/lookup.go#L151
 // lookupGroup merges LookupIPAddr calls together for lookups for the same
 // host. The lookupGroup key is the LookupIPAddr.host argument.
 // The return values are ([]IPAddr, error).
 lookupGroup singleflight.Group
}

它的作用是将对相同域名的DNS记录查询合并成一个查询,下面是net库提供的DNS记录查询方法LookupIp使用lookupGroup这个SingleFlight进行合并查询的相关源码,它使用的是异步查询的方法DoChan

代码语言:javascript复制
func LookupIP(host string) ([]IP, error) {
 addrs, err := DefaultResolver.LookupIPAddr(context.Background(), host)
  ......
}

func (r *Resolver) lookupIPAddr(ctx context.Context, network, host string) ([]IPAddr, error) {
  ......
  // 使用SingleFlight的DoChan合并多个查询请求
 ch, called := r.getLookupGroup().DoChan(lookupKey, func() (interface{}, error) {
  defer dnsWaitGroup.Done()
  return testHookLookupIP(lookupGroupCtx, resolverFunc, network, host)
 })
 if !called {
  dnsWaitGroup.Done()
 }
 
 select {
 case <-ctx.Done():
  ......
 case r := <-ch:
  lookupGroupCancel()
  if trace != nil && trace.DNSDone != nil {
   addrs, _ := r.Val.([]IPAddr)
   trace.DNSDone(ipAddrsEface(addrs), r.Shared, r.Err)
  }
  return lookupIPReturn(r.Val, r.Err, r.Shared)
 }
}

上面的源码做了很多删减,只留了SingleFlight合并查询的部分,如果有兴趣可以去GitHub上看一下完整的源码,访问链接https://github.com/golang/go/blob/master/src/net/lookup.go#L261 ,可直接定位到这部分的源码。

网管是不是很贴心,记得三连啊~!

防止缓存击穿

在项目里使用缓存时,一个常见的用法是查询一个数据先去查询缓存,如果没有就去数据库里查到数据并缓存到Redis里。那么缓存击穿问题是指,高并发的系统中,大量的请求同时查询一个缓存Key 时,如果这个 Key 正好过期失效,就会导致大量的请求都打到数据库上,这就是缓存击穿。用 SingleFlight 来解决缓存击穿问题再合适不过,这个时候只要这些对同一个 Key 的并发请求的其中一个到数据库中查询就可以了,这些并发的请求可以共享同一个结果。

下面是一个模拟用SingleFlight并发原语合并查询Redis缓存的程序,你可以自己动手测试一下,开10个goroutine去查询一个固定的Key,观察一下返回结果就会发现最终只执行了一次Redis查询。

代码语言:javascript复制
// 模拟一个Redis客户端
type client struct {
 // ... 其他的配置省略
 requestGroup singleflight.Group
}

// 普通查询
func (c *client) Get(key string) (interface{}, error) {
 fmt.Println("Querying Database")
 time.Sleep(time.Second)
 v := "Content of key"   key
 return  v, nil
}

// SingleFlight查询
func (c *client) SingleFlightGet(key string) (interface{}, error) {
 v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
  return c.Get(key)

 })
 if err != nil {
  return nil, err
 }
 return v, err
}

完整的测试源码可以点击阅读原文,去我的GitHub仓库下载

实现原理

最后我们来看一下singleflight.Group的实现原理,通过它的源码也是能学到不少用Go语言编程的技巧的。singleflight.Group由一个互斥锁sync.Mutex和一个映射表组成,每一个 singleflight.call结构体都保存了当前调用对应的信息:

代码语言:javascript复制
type Group struct {
 mu sync.Mutex
 m  map[string]*call
}

type call struct {
 wg sync.WaitGroup

 val interface{}
 err error

 dups  int
 chans []chan<- Result
}

下面我们来看看 Do 和 DoChan 方法是怎么实现的。

Do方法

SingleFlight 定义一个call结构体,每个结构体都保存了fn调用对应的信息。

Do方法的执行逻辑是每次调用Do方法都会先去获取互斥锁,随后判断在映射表里是否已经有Key对应的fn函数调用信息的call结构体。

  • 当不存在时,证明是这个Key的第一次请求,那么会初始化一个call结构体指针,增加SingleFlight内部持有的sync.WaitGroup计数器到1。释放互斥锁,然后阻塞的等待doCall方法执行fn函数的返回结果
  • 当存在时,增加call结构体内代表fn重复调用次数的计数器dups,释放互斥锁,然后使用WaitGroup等待fn函数执行完成。

call结构体的valerr 两个字段只会在 doCall方法中执行fn有返回结果后才赋值,所以当 doCall方法 和 WaitGroup.Wait返回时,函数调用的结果和错误会返回给Do方法的所有调用者。

代码语言:javascript复制

  func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
      g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
      // 存在相同的key, 增加计数
      c.dups  
      g.mu.Unlock()
      c.wg.Wait() //等待这个key对应的fn调用完成
      return c.val, c.err, true // 返回fn调用的结果
    }
    c := new(call) // 不存在key, 是第一个请求, 创建一个call结构体
    c.wg.Add(1)
    g.m[key] = c //加入到映射表中
    g.mu.Unlock()
  

    g.doCall(c, key, fn) // 调用方法
    return c.val, c.err, c.dups > 0
  }

doCall方法会去实际调用fn函数,因为call结构体初始化后forgotten字段的默认值是falsefn调用有返回后,会把对应的Key删掉。这样这轮请求都返回后,下一轮使用同一的Key的请求会重新调用执行一次fn函数。

代码语言:javascript复制

  func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    c.val, c.err = fn()
    c.wg.Done()
  

    g.mu.Lock()
    if !c.forgotten { // 已调用完,删除这个key
      delete(g.m, key)
    }
    for _, ch := range c.chans {
      ch <- Result{c.val, c.err, c.dups > 0}
    }
    g.mu.Unlock()
  }

DoChan方法

SingleFlight还提供了异步调用DoChan方法,它的执行逻辑和Do方法类似,唯一不同的是调用者不用阻塞等待调用的返回, DoChan方法会创建一个chan Result通道返回给调用者,调用者通过这个通道就能接受到fn函数的结果。这个chan Result通道,在返回给调用者前会先放到call结构体的维护的通知队列里,待fn函数返回结果后DoChan方法会把结果发送给通知队列中的每个通道。

代码语言:javascript复制
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 ch := make(chan Result, 1)
 g.mu.Lock()
 if g.m == nil {
  g.m = make(map[string]*call)
 }
 if c, ok := g.m[key]; ok {
  c.dups  
  c.chans = append(c.chans, ch)
  g.mu.Unlock()
  return ch
 }
 c := &call{chans: []chan<- Result{ch}}
 c.wg.Add(1)
 g.m[key] = c
 g.mu.Unlock()

 go g.doCall(c, key, fn)

 return ch
}

type Result struct {
 Val    interface{}
 Err    error
 Shared bool
}

总结

学会SingleFlight这个并发原语后,下次在遇到类似在高并发情况下查询DNS记录、Redis缓存这样的场景的时候就可以应用上啦。最后我给你留个思考题吧,上面用SingleFlight查询Redis缓存的例子使用的是同步阻塞方法Do,你能不能改成使用异步非阻塞的DoChan方法呢?另外能不能给SingleFlightGet增加一个超时返回错误的功能呢?

提示一下,使用上下文对象,返回的错误是ctx.Err()

可以把在留言里写下你的解决方案,最好能附上源码链接,欢迎把文章分享给你更多的朋友,一起讨论。还没关注公众号「网管叨bi叨」的抓紧关注呀,每周都会有干货技术分享。

- END -

0 人点赞