golang sync.Cond使用和实现原理

2023-06-14 15:30:08 浏览数 (1)

Cond

sync.Cond 是基于互斥锁/读写锁实现的条件变量,用来协调想要访问共享资源的那些 Goroutine。当共享资源状态发生变化时,sync.Cond 可以用来通知等待条件发生而阻塞的 Goroutine。

假如有一个协程正在接收数据,其他协程必须等待这个协程接收完数据,才能读取到正确的数据。上述情形下,如果单纯的使用 channel 或者互斥锁,只能有一个协程可以等待,并读取到数据,没办法通知其他协程也读取数据。这个时候怎么办?

1)可以用一个全局变量标识第一个协程是否接收数据完毕,剩下的协程反复检查该变量的值,直到读取到数据。

2)也可创建多个 channel, 每个协程阻塞在一个 Channel 上,由接收数据的协程在数据接收完毕后,挨个通知,类似于msgbus实现。

然后 Go 中其实内置来一个 sync.Cond 来解决这个问题。

使用

下面的例子实现了通Cond实现通知协程的流程:

代码语言:go复制
func TestCond(t *testing.T) {
	var locker = new(sync.Mutex)
	var cond = sync.NewCond(locker)
	for i := 0; i < 10; i   {
		go func(x int) {
			cond.L.Lock()         //获取锁
			defer cond.L.Unlock() //释放锁
			cond.Wait()           //等待通知,阻塞当前goroutine
			fmt.Println(x)
		}(i)
	}

	time.Sleep(time.Second * 1)
	fmt.Println("Signal...")
	cond.Signal() // 下发一个通知给已经获取锁的goroutine
	time.Sleep(time.Second * 1)
	cond.Signal() // 3秒之后 下发一个通知给已经获取锁的goroutine
	time.Sleep(time.Second * 3)
	cond.Broadcast() //3秒之后 下发广播给所有等待的goroutine
	fmt.Println("Broadcast...")
	
	select {}
}

运行结果:
=== RUN   TestCond
Signal...
0
1
Broadcast...
2
6
4
5
7
3
9
8

Cond的主要方法有:

1)NewCond(l Locker) *Cond

NewCond 创建实例需要关联一个锁,使用方式为:

cond := sync.NewCond(&sync.Mutex{})

2)Wait()

Wait阻塞当前的 goroutine,等待唤起,在调用Wait前需要Lock,执行完Wait后的逻辑之后需要调用Unlock。

3)Signal()

Signal唤醒一个阻塞的goroutine,执行前不需要调用Lock。

4)Broadcast()

Broadcast唤起所有阻塞的 goroutine,执行前不需要调用Lock。

实现原理

数据结构

我们来看下sync.Cond的结构体,它的代码在 /sr/sync/cond.go下:

代码语言:go复制
type Cond struct {
    noCopy noCopy       // 不可复制
    L Locker            // 锁
    notify  notifyList  // 通知唤起列表
    checker copyChecker // 复制检测,禁止第一次使用的Cond被复制拷贝
}

type notifyList struct {
	wait   uint32         // 当前wait的index
	notify uint32         // 当前notify的index
	lock   uintptr        // 锁
	head   unsafe.Pointer // 队头
	tail   unsafe.Pointer // 队尾
}

每个Cond实例都会关联一个锁 L(互斥锁 Mutex,或读写锁 RWMutex),当调用Wait方法时,必须加锁。

Wait方法

Wait方法的实现为:

代码语言:go复制
func (c *Cond) Wait() {
	c.checker.check() // copy 检查
	t := runtime_notifyListAdd(&c.notify) // 向通知列表列表中加入该通知
	c.L.Unlock() // 暂时解锁
	runtime_notifyListWait(&c.notify, t) //通知操作
	c.L.Lock() //加锁,还原状态
}

//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
	// 更新wait的index
	return atomic.Xadd(&l.wait, 1) - 1
}

//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
	lockWithRank(&l.lock, lockRankNotifyList)

	// 判断需要等待notify的index是否合法
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// 获取当前的sudog
	s := acquireSudog()
	s.g = getg()
    // sudog设置ticket
    s.ticket = t
    // 加入到队尾
	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
    // gopark阻塞等待被唤醒
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
	releaseSudog(s)
}

Wait方法首先调用runtime_notifyListAdd方法将wait索引 1,然后调用runtime_notifyListWait将自己加入到等待队列中,然后释放锁,等待其他协程的唤醒。需要注意的是,Wait的使用方式最好是:

代码语言:go复制
c.L.Lock()
for !condition() {
    c.Wait()
}
//    ... make use of condition ...
c.L.Unlock()

强制调用Wait方法前需要先获取该锁。这里的原因在于调用Wait方法如果不加锁,有可能会出现竞态条件。这里假设多个协程都处于等待状态,然后一个协程调用了Broadcast唤醒了其中一个或多个协程,此时这些协程都会被唤醒。

如下,假设调用Wait方法前没有加锁的话,那么所有协程都会去调用condition方法去判断是否满足条件,然后都通过验证,执行后续操作:

代码语言:go复制
for !condition() {
    c.Wait()
}
c.L.Lock()
// 满足条件情况下,执行的逻辑
c.L.Unlock()

此时会出现的情况为,本来是需要在满足condition方法的前提下,才能执行的操作。现在有可能的效果,为前面一部分协程执行时,还是满足condition条件的;但是后面的协程,尽管不满足condition条件,还是执行了后续操作,可能导致程序出错。

正常的用法应该是,在调用Wait方法前便加锁,只会有一个协程判断是否满足condition条件,然后执行后续操作。这样子就不会出现即使不满足条件,也会执行后续操作的情况出现。

Signal方法

Signal的实现为:

代码语言:go复制
func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
	// 所有wait已经全部被notify
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)

	// Lock之后再次检查是否还有wait需要notify.
	t := l.notify
	if t == atomic.Load(&l.wait) {
		unlock(&l.lock)
		return
	}

	// 更新notify的index.
	atomic.Store(&l.notify, t 1)
    // 遍历notifyList中的sudog队列
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        // 根据wait设置的ticket找到对应的sudog
		if s.ticket == t {
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
            // 执行goready,唤醒对应的sudog
			readyWithTime(s, 4)
			return
		}
	}
	unlock(&l.lock)
}

Signal方法notify一个wait的goroutine,获取当前的notify的index,然后遍历队列,根据index找到对应的sudog,唤醒这个sudog,wait等待的goroutine就可以继续执行了。

Broadcast方法

Broadcast方法的实现为:

代码语言:go复制
func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
	// 检查是否所有的wait都已经被唤醒
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)
    // broadcast是唤醒所有的waits,可以清除等待的队列
	s := l.head
	l.head = nil
	l.tail = nil

	// 直接更新notify的index为wait,表示全部已经被notify
	atomic.Store(&l.notify, atomic.Load(&l.wait))
	unlock(&l.lock)

	// 唤醒所有的sudog
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

broadcast方法是唤醒全部wait的goroutine,实现也比较简单,就是直接循环wait队列,全部执行goready,更新notify的index为wait的index,表示全部wait的goroutine都被唤醒了。

go

0 人点赞