我怎么从来没见过 sync.Cond

2022-09-01 15:08:51 浏览数 (1)

sync.Cond 作为 go 标准库提供的一个并发原语,但是可能你从来没听过,可见它使用场景挺少的,但是我们需要有这个知识储备,只有储备了之后才能在需要用的时候用出来。

其实如果你之前和我一样接触过 java,那么其实对于这个并发原语其实应该很熟悉,其实就是常说的等待通知机制,也就是 wait 方法和 notify 方法。

使用

我们首先从使用的角度的出发,先来看看 cond 是如何使用的

三个方法

首先我用最白话的方式描述一下 cond 的三个方法

  • Wait 当前调用者等待执行,直到被唤醒,调用该方法时需要加锁
  • Signal 唤醒一个调用者
  • Broadcast 唤醒所有调用者

一把锁一个队列

cond 初始化需要传入一个锁,用于并发控制,调用 wait 的时候需要加锁

cond 内部维护着一个队列,等待调用者排队等待

使用

我们创建两个 goroutine 使用 cond 等待执行任务,然后使用 signal 方法唤醒试试

代码语言:javascript复制
package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   cond := sync.NewCond(&sync.Mutex{})
   go func() {
      cond.L.Lock()
      fmt.Println("a is waiting...")
      cond.Wait()
      fmt.Println("a was awakened")
      cond.L.Unlock()
   }()
   go func() {
      cond.L.Lock()
      fmt.Println("b is waiting...")
      cond.Wait()
      fmt.Println("b was awakened")
      cond.L.Unlock()
   }()
   time.Sleep(time.Second)
   cond.Signal()
   time.Sleep(time.Second)
   cond.Signal()
   time.Sleep(time.Second)
}
代码语言:javascript复制
output: 
a is waiting...
b is waiting...
a was awakened
b was awakened

当然你也可以使用功能 Broadcast 方法全部一次性唤醒,输出也是一样的。

这里埋一个伏笔,我们这里两个 goroutine 都 阻塞在了 wait 方法,都没有 unlock 这里的互斥锁,但是我们看到 waiting 都打印出来了,那为什么可以这样做呢?

这个使用的给你的感觉是什么?我第一次看到 cond 的时候就给我的感觉是 waitgroup 的反向操作。

我们知道 waitgroup 可以描述为将一个大任务拆分成多个小任务,每次拆成一个任务就 add 一次,每一次任务完成就 done 一次,然后有人 wait 直到所有的任务都完成。而 cond 是不是刚好反了一下,是一堆人在等着执行,等着被唤醒执行,但是好像又不太一样。

源码分析

在看源码之前还是带着几个问题去看:

  1. wait 之前为什么需要 lock?
  2. signal 次数大于当前等待对象数量会有问题吗?
  3. broadcast 之后还能继续 wait 吗?

结构

代码语言:javascript复制
type Cond struct {
   noCopy noCopy

   // L is held while observing or changing the condition
   L Locker

   notify  notifyList
   checker copyChecker
}

type notifyList struct {
	// wait is the ticket number of the next waiter. It is atomically
	// incremented outside the lock.
	wait uint32

	// notify is the ticket number of the next waiter to be notified. It can
	// be read outside the lock, but is only written to with lock held.
	//
	// Both wait & notify can wrap around, and such cases will be correctly
	// handled as long as their "unwrapped" difference is bounded by 2^31.
	// For this not to be the case, we'd need to have 2^31  goroutines
	// blocked on the same condvar, which is currently not possible.
	notify uint32

	// List of parked waiters.
	lock mutex
	head *sudog
	tail *sudog
}

可以看到结构非常简单,noCopy 和 checker 保证 cond 不能被 copy,否则会 panic,而且是个运行时检查。

剩下的就是一把锁一个队列了

方法

代码语言:javascript复制
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
   return &Cond{L: l}
}

创建没啥好说的,就是传入一个锁赋值就可以了

代码语言:javascript复制
func (c *Cond) Signal() {
   c.checker.check()
   runtime_notifyListNotifyOne(&c.notify)
}
代码语言:javascript复制
func (c *Cond) Broadcast() {
   c.checker.check()
   runtime_notifyListNotifyAll(&c.notify)
}

Signal 和 Broadcast 都是 check 一下 cond 有没有被复制,然后就直接通过 sema 的 notify 方法将队列传入唤醒了

代码语言:javascript复制
func (c *Cond) Wait() {
   c.checker.check()
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()
}

wait 方法也是类似,不过这里需要注意的一点是,这里首先 unlock 了一次,然后再开始 wait,这也就是解释了之前那个伏笔,并且也引出了为什么 wait 之前必须 lock,因为不 lock 的话直接 unlock 肯定报错

runtime_notifyListWait

首先我们来看 runtime_notifyListAdd

代码语言:javascript复制
// notifyListAdd adds the caller to a notify list such that it can receive
// notifications. The caller must eventually call notifyListWait to wait for
// such a notification, passing the returned ticket number.
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
   // This may be called concurrently, for example, when called from
   // sync.Cond.Wait while holding a RWMutex in read mode.
   return atomic.Xadd(&l.wait, 1) - 1
}

非常简单就是将 notifyList 的中的 wait 1,并且这是一个原子操作

runtime_notifyListWait

然后来看 runtime_notifyListWait 这里的第二个参数 t 就是上一个 Xadd 之后 -1 返回的结果

代码语言:javascript复制
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
   lockWithRank(&l.lock, lockRankNotifyList)

   // Return right away if this ticket has already been notified.
   if less(t, l.notify) {
      unlock(&l.lock)
      return
   }

   // Enqueue itself.
   s := acquireSudog()
   s.g = getg()
   s.ticket = t
   s.releasetime = 0
   t0 := int64(0)
   if blockprofilerate > 0 {
      t0 = cputicks()
      s.releasetime = -1
   }
   if l.tail == nil {
      l.head = s
   } else {
      l.tail.next = s
   }
   l.tail = s
   goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
   if t0 != 0 {
      blockevent(s.releasetime-t0, 2)
   }
   releaseSudog(s)
}

不难,说几个要点:

  • 如果当前传入的 t < notify 的话,证明已经被唤醒了,所以直接解锁返回
  • 获取一个 sudog 用于挂起
  • s.ticket = t 注意这里后面会用到,这里将 sudog 里面的 ticket 标记为当前队列长度
  • 当 tail 为 nil 证明是空队列,直接 head 赋值为 s;如果 tail 不为 nil 证明队列有元素直接链到队尾,并且将当前节点作为新的队尾
  • 然后 gopark 等着被唤醒就可以
runtime_notifyListNotifyOne
代码语言:javascript复制
// notifyListNotifyOne notifies one entry in the list.
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
   // Fast-path: if there are no new waiters since the last notification
   // we don't need to acquire the lock at all.
   if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
      return
   }

   lockWithRank(&l.lock, lockRankNotifyList)

   // Re-check under the lock if we need to do anything.
   t := l.notify
   if t == atomic.Load(&l.wait) {
      unlock(&l.lock)
      return
   }

   // Update the next notify ticket number.
   atomic.Store(&l.notify, t 1)

   // Try to find the g that needs to be notified.
   // If it hasn't made it to the list yet we won't find it,
   // but it won't park itself once it sees the new notify number.
   //
   // This scan looks linear but essentially always stops quickly.
   // Because g's queue separately from taking numbers,
   // there may be minor reorderings in the list, but we
   // expect the g we're looking for to be near the front.
   // The g has others in front of it on the list only to the
   // extent that it lost the race, so the iteration will not
   // be too long. This applies even when the g is missing:
   // it hasn't yet gotten to sleep and has lost the race to
   // the (few) other g's that we find on the list.
   for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
      if s.ticket == t {
         n := s.next
         if p != nil {
            p.next = n
         } else {
            l.head = n
         }
         if n == nil {
            l.tail = p
         }
         unlock(&l.lock)
         s.next = nil
         readyWithTime(s, 4)
         return
      }
   }
   unlock(&l.lock)
}
  • wait 和 notify 数量一致就没有人等着了,直接返回
  • lock 之后 double check 一次,并发编程的常规操作了
  • notify 的数量在原有数量上 1,因为这次唤醒一个新的了
  • 只有当 ticket 为 t 的时候证明才是下一个需要被唤醒的 sudog (上面的注释解释了这里为什么使用循环,大多数情况下就是 head 就是需要被唤醒的 sudog 了)
  • 然后就是队列出队的基本操作了
  • 最后 readyWithTime 调用 goready 唤醒对应的 sudog 执行就可以了
runtime_notifyListNotifyAll
代码语言:javascript复制
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
   // Fast-path: if there are no new waiters since the last notification
   // we don't need to acquire the lock.
   if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
      return
   }

   // Pull the list out into a local variable, waiters will be readied
   // outside the lock.
   lockWithRank(&l.lock, lockRankNotifyList)
   s := l.head
   l.head = nil
   l.tail = nil

   // Update the next ticket to be notified. We can set it to the current
   // value of wait because any previous waiters are already in the list
   // or will notice that they have already been notified when trying to
   // add themselves to the list.
   atomic.Store(&l.notify, atomic.Load(&l.wait))
   unlock(&l.lock)

   // Go through the local list and ready all waiters.
   for s != nil {
      next := s.next
      s.next = nil
      readyWithTime(s, 4)
      s = next
   }
}

看完 notify 方法然后再看 notifyAll 方法就很简单了,其实就是遍历了整个队列,对每一个 sudog 都 ready 一次就可以了

总结

总的来说 cond 的实现还是很容易理解的,并没有想的很复杂,只需要在使用的时候多加注意:wait 之前需要加锁。

和 java 比较起来,我记得一开始学的时候 notify 还是随机唤醒一个,然后后来根据不同的 jvm 有了不同的实现,hotspot 实现还是队列。

最后是使用,为什么我这么晚才写这个 cond 呢..其实拖延了很久了,因为在实际中没用过,就在最近在处理一个并发场景的时候偶发的用上了一下,就想着来补一下了。所以在实际中,可能你永远也用不到它,但是知道它,当个知识储备以防不时之需吧。

0 人点赞