【Go】RWMutex 源码分析

2022-10-26 15:12:32 浏览数 (2)

RWMutex

读写锁相较于互斥锁有更低的粒度,它允许并发读,因此在读操作明显多于写操作的场景下能减少锁竞争的次数,提高程序效率。

代码语言:javascript复制
type RWMutex struct {
    w           Mutex  // held if there are pending writers
    writerSem   uint32 // semaphore for writers to wait for completing readers
    readerSem   uint32 // semaphore for readers to wait for completing writers
    readerCount int32  // number of pending readers
    readerWait  int32  // number of departing readers
}

RWMutex 结构体中包含五个字段,分别表示:

  • w: 复用互斥锁
  • writerSem 和 readerSem: 用于写等待读和读等待写的信号量
  • readerCount:
  • readerWait: 等待写锁释放的读者数量

读锁

RLock

当有 goroutine 写时,是不允许读的,这时会把 readerCount 设置为负,这时读 goroutine 应该被阻塞

代码语言:javascript复制
func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // 阻塞读
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}
RUnlock

读锁解锁时只需要将 readerCount - 1, 如果结果小于零,说明:

  1. 原来 readerCount == 0 || readerCount == -rwmutexMaxReaders, 对未加锁的对象执行了解锁操作
  2. 原来 readerCount < 0, 有正在执行的写操作
代码语言:javascript复制
func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r 1 == 0 || r 1 == -rwmutexMaxReaders {
        race.Enable()
        throw("sync: RUnlock of unlocked RWMutex")
    }
    // 所有读操作结束后,触发写的写信号量
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // The last reader unblocks the writer.
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

写锁

Lock
代码语言:javascript复制
func (rw *RWMutex) Lock() {
    // 获取互斥写锁
    rw.w.Lock()
    // 阻塞读
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)   rwmutexMaxReaders
    // 如果有人在读,需要等待读锁释放
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        // 阻塞等待读锁释放
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

Lock 会先通过互斥锁阻塞写操作,然后禁止读锁获取,等待已经持有读锁的 goroutine 释放读锁,这样可以避免连续的写操作使读陷入饥饿。

Unlock
代码语言:javascript复制
func (rw *RWMutex) Unlock() {
    // 重新允许读锁获取
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    // 触发等待中的写锁的信号量
    for i := 0; i < int(r); i   {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // 互斥锁解锁
    rw.w.Unlock()
}

总结

在极端情况下:

  • 如果完全没有写,读锁加锁只是将 readerCount 加一,解锁只是将其减一,不存在锁竞争。
  • 如果只有写,加锁和解锁都比互斥锁多了一个对 readerCount 取反操作

在一般情况下,读锁在获取锁前会检查 readerCount, 如果为负,说明有人在写,则进入阻塞状态,等待 readerSem 的信号,写锁获取锁在得到互斥锁后会先设置 readerCount 为负,阻止新的读者获取读锁,然后需要等待所有已经持有读锁的 goroutine 释放读锁,这里使用的是 readerWait ,当 readerCount 为负时,如果读锁被释放,该量就会减一,当 readerWait == 0 时,则说明所有在写锁获取之前获得的读锁都被释放了,最后一个释放的读锁会通过 writerSem 通知写对象。

写锁释放时,需要通过 readerSem 信号触发所有阻塞中的写对象。

0 人点赞