Cond
sync.Cond 是基于互斥锁/读写锁实现的条件变量,用来协调想要访问共享资源的那些 Goroutine。当共享资源状态发生变化时,sync.Cond 可以用来通知等待条件发生而阻塞的 Goroutine。
假如有一个协程正在接收数据,其他协程必须等待这个协程接收完数据,才能读取到正确的数据。上述情形下,如果单纯的使用 channel 或者互斥锁,只能有一个协程可以等待,并读取到数据,没办法通知其他协程也读取数据。这个时候怎么办?
1)可以用一个全局变量标识第一个协程是否接收数据完毕,剩下的协程反复检查该变量的值,直到读取到数据。
2)也可创建多个 channel, 每个协程阻塞在一个 Channel 上,由接收数据的协程在数据接收完毕后,挨个通知,类似于msgbus实现。
然后 Go 中其实内置来一个 sync.Cond 来解决这个问题。
使用
下面的例子实现了通Cond实现通知协程的流程:
代码语言:go复制func TestCond(t *testing.T) {
var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
for i := 0; i < 10; i {
go func(x int) {
cond.L.Lock() //获取锁
defer cond.L.Unlock() //释放锁
cond.Wait() //等待通知,阻塞当前goroutine
fmt.Println(x)
}(i)
}
time.Sleep(time.Second * 1)
fmt.Println("Signal...")
cond.Signal() // 下发一个通知给已经获取锁的goroutine
time.Sleep(time.Second * 1)
cond.Signal() // 3秒之后 下发一个通知给已经获取锁的goroutine
time.Sleep(time.Second * 3)
cond.Broadcast() //3秒之后 下发广播给所有等待的goroutine
fmt.Println("Broadcast...")
select {}
}
运行结果:
=== RUN TestCond
Signal...
0
1
Broadcast...
2
6
4
5
7
3
9
8
Cond的主要方法有:
1)NewCond(l Locker) *Cond
NewCond 创建实例需要关联一个锁,使用方式为:
cond := sync.NewCond(&sync.Mutex{})
2)Wait()
Wait阻塞当前的 goroutine,等待唤起,在调用Wait前需要Lock,执行完Wait后的逻辑之后需要调用Unlock。
3)Signal()
Signal唤醒一个阻塞的goroutine,执行前不需要调用Lock。
4)Broadcast()
Broadcast唤起所有阻塞的 goroutine,执行前不需要调用Lock。
实现原理
数据结构
我们来看下sync.Cond的结构体,它的代码在 /sr/sync/cond.go下:
代码语言:go复制type Cond struct {
noCopy noCopy // 不可复制
L Locker // 锁
notify notifyList // 通知唤起列表
checker copyChecker // 复制检测,禁止第一次使用的Cond被复制拷贝
}
type notifyList struct {
wait uint32 // 当前wait的index
notify uint32 // 当前notify的index
lock uintptr // 锁
head unsafe.Pointer // 队头
tail unsafe.Pointer // 队尾
}
每个Cond实例都会关联一个锁 L(互斥锁 Mutex,或读写锁 RWMutex),当调用Wait方法时,必须加锁。
Wait方法
Wait方法的实现为:
代码语言:go复制func (c *Cond) Wait() {
c.checker.check() // copy 检查
t := runtime_notifyListAdd(&c.notify) // 向通知列表列表中加入该通知
c.L.Unlock() // 暂时解锁
runtime_notifyListWait(&c.notify, t) //通知操作
c.L.Lock() //加锁,还原状态
}
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// 更新wait的index
return atomic.Xadd(&l.wait, 1) - 1
}
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// 判断需要等待notify的index是否合法
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 获取当前的sudog
s := acquireSudog()
s.g = getg()
// sudog设置ticket
s.ticket = t
// 加入到队尾
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// gopark阻塞等待被唤醒
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
Wait方法首先调用runtime_notifyListAdd方法将wait索引 1,然后调用runtime_notifyListWait将自己加入到等待队列中,然后释放锁,等待其他协程的唤醒。需要注意的是,Wait的使用方式最好是:
代码语言:go复制c.L.Lock()
for !condition() {
c.Wait()
}
// ... make use of condition ...
c.L.Unlock()
强制调用Wait方法前需要先获取该锁。这里的原因在于调用Wait方法如果不加锁,有可能会出现竞态条件。这里假设多个协程都处于等待状态,然后一个协程调用了Broadcast唤醒了其中一个或多个协程,此时这些协程都会被唤醒。
如下,假设调用Wait方法前没有加锁的话,那么所有协程都会去调用condition方法去判断是否满足条件,然后都通过验证,执行后续操作:
代码语言:go复制for !condition() {
c.Wait()
}
c.L.Lock()
// 满足条件情况下,执行的逻辑
c.L.Unlock()
此时会出现的情况为,本来是需要在满足condition方法的前提下,才能执行的操作。现在有可能的效果,为前面一部分协程执行时,还是满足condition条件的;但是后面的协程,尽管不满足condition条件,还是执行了后续操作,可能导致程序出错。
正常的用法应该是,在调用Wait方法前便加锁,只会有一个协程判断是否满足condition条件,然后执行后续操作。这样子就不会出现即使不满足条件,也会执行后续操作的情况出现。
Signal方法
Signal的实现为:
代码语言:go复制func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// 所有wait已经全部被notify
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// Lock之后再次检查是否还有wait需要notify.
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// 更新notify的index.
atomic.Store(&l.notify, t 1)
// 遍历notifyList中的sudog队列
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
// 根据wait设置的ticket找到对应的sudog
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
// 执行goready,唤醒对应的sudog
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
Signal方法notify一个wait的goroutine,获取当前的notify的index,然后遍历队列,根据index找到对应的sudog,唤醒这个sudog,wait等待的goroutine就可以继续执行了。
Broadcast方法
Broadcast方法的实现为:
代码语言:go复制func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// 检查是否所有的wait都已经被唤醒
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// broadcast是唤醒所有的waits,可以清除等待的队列
s := l.head
l.head = nil
l.tail = nil
// 直接更新notify的index为wait,表示全部已经被notify
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// 唤醒所有的sudog
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
broadcast方法是唤醒全部wait的goroutine,实现也比较简单,就是直接循环wait队列,全部执行goready,更新notify的index为wait的index,表示全部wait的goroutine都被唤醒了。