万字图解| 深入揭秘Golang锁结构:Mutex(下)

2024-01-25 16:06:19 浏览数 (2)

大家好,我是「云舒编程」,今天我们来聊聊Golang锁结构:Mutex。

文章首发于微信公众号:云舒编程

一、前言

    书接上回,在万字图解| 深入揭秘Golang锁结构:Mutex(上)一文中,我们已经研究了Golang mutex V1和V2版本的实现。接下来我们继续研究V3和V4版本的实现。

二、面试中遇到Mutex

    为了让剧情顺利发展,我们依旧使用万字图解| 深入揭秘Golang锁结构:Mutex(上)一文中的面试对话模式。 面试官:你现在实现的锁的确给了新来的Goroutine直接获取锁的机会,但是还不够优雅。比如说,新Goroutine尝试获取锁失败的那一刻,锁就被释放了,但是新Goroutine需要等到下一次信号量唤醒加调度才有机会再次获取锁,这样其实浪费了新Goroutine的CPU时间,你可以再优化下吗? :考虑到这种情况,可以尝试给新的Goroutine多次获取锁的机会,说白了就是允许自旋,但是需要给自旋加一些限制条件,避免最开始提到的性能问题。 面试官:需要加哪些限制条件呢? :首先需要限制自旋的次数,其次操作系统的处理器个数和Golang 调度的P个数都必须大于1,否则就会是串行,自旋就没有意义了。 面试官:不错,怎么实现呢? :我来写下:

代码语言:javascript复制
const (
    mutexLocked      = 1 // mutex is locked
    mutexWoken       = 2 
    mutexWaiterShift = 2
)

type Mutex struct {
    state int32
    sema  uint32
}

func (m *Mutex) Lock() {
    //给新来的协程直接加锁的机会
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    //上面没有加锁成功,尝试在接下来的唤醒中去竞争锁
    awoke := false //表示当前协程是不是被唤醒的
    iter := 0 //记录当前自旋的次数
    for {
        old := m.state
        new := old | mutexLocked // 设置锁标志位为1
        if old&mutexLocked != 0 {
            //判断是否满足自旋条件
            if runtime_canSpin(iter) {
    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
     atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
     awoke = true
    }

                //内部调用procyield函数,该函数也是汇编语言实现。
                //函数内部循环调用PAUSE指令。减少cpu的消耗,节省电量。
                //指令的本质功能:让加锁失败时cpu睡眠30个(about)clock,从而使得读操作的频率低很多。流水线重排的代价也会小很多。
    runtime_doSpin() 
    iter  
    continue
   }
            
            new = old   1<<mutexWaiterShift //锁没有释放,当前协程可能会阻塞在信号量上,先将waiter 1
        }
       ··· //剩下的不变
    }
}
代码语言:javascript复制
//判断是否可以自旋,同时满足以下4个条件才能自旋:
//1、自旋次数小于4次
//2、cpu核数大于1
//3、GOMAXPROCS>1
//4、running P > 1 并且 P队列为空
func sync_runtime_canSpin(i int) bool {
 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle sched.nmspinning) 1 {
  return false
 }
 if p := getg().m.p; p.runqhead != p.runqtail {
  return false
 }
 return true
}

解锁部分没有变化 面试官:通过自旋,对于临界区代码执行非常短的场景来说,这是一个非常好的优化。因为临界区的代码耗时很短,锁很快就能释放,而抢夺锁的 goroutine不用通过休眠唤醒方式等待调度,直接自旋几次,可能就获得了锁。但是这里也存在一个问题,你知道什么是【饥饿】吗? :因为新来的 goroutine 也参与竞争,有可能每次都会被新来的 goroutine 抢到获取锁的机会,在极端情况下,等待中的goroutine可能会一直获取不到锁,就会导致【饥饿】。 面试官:那怎么解决呢? :可以考虑给锁加一个标识,比如我们可以检测当一个老goroutine超过一定时间都没有获取到锁,那么他就给锁打上一个【饥饿】的标识,新来的goroutine发现存在该标识就不再通过自旋抢锁,而是直接进入信号量的等待队列的队尾。同时把老goroutine移动到信号量等待队列的队头,这样老goroutine下次就可以直接获取到锁了。 面试官:思路不错,写下代码吧。 :我们还是把state在拿出来一位表示锁是不是处于饥饿状态。

加锁部分:

代码语言:javascript复制
type Mutex struct {
 state int32
 sema  uint32
}

const (
 mutexLocked      = 1 // mutex is locked
 mutexWoken       = 2
 mutexWaiterShift = 3
 mutexStarving    = 4 //新增字段,标识
)

func (m *Mutex) Lock() {
 //给新来的协程直接加锁的机会
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  return
 }

 m.lockSlow()
}

func (m *Mutex) lockSlow() {
 var waitStartTime int64 //表示等待了多久还没获取到锁
 starving := false       //表示当前锁是否处于饥饿状态
 awoke := false          //表示当前协程是不是被唤醒的
 iter := 0               //自旋次数
 old := m.state

 for {
  //满足以下条件才能进入自旋:
  //1、锁不是饥饿状态,并且没有获取到锁
  //2、满足自旋条件runtime_canSpin
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }

   //内部调用procyield函数,该函数也是汇编语言实现。
   //函数内部循环调用PAUSE指令。减少cpu的消耗,节省电量。
   //指令的本质功能:让加锁失败时cpu睡眠30个(about)clock,从而使得读操作的频率低很多。流水线重排的代价也会小很多。
   runtime_doSpin()
   iter  
   old = m.state
   continue
  }

  new := old

  //如果当前锁不是饥饿状态,尝试将加锁标志置为1
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }

  //锁没有释放,或者是饥饿模式,当前协程会阻塞在信号量上,将waiter 1
  if old&(mutexLocked|mutexStarving) != 0 {
   new = old   1<<mutexWaiterShift
  }

  //已经等待超过了1ms,且锁被其他协程占用,则进入饥饿模式
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }

  if awoke { //尝试清除唤醒标志
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
   new &^= mutexWoken
  }

  //这里尝试将state从old设置为new。如果代码能够执行到这步,代表了可能发生以下几种情况的一种或者多种
  //1、当前协程尝试加锁
  //2、waiter 1
  //3、清除唤醒标志
  //4、想将锁设置为饥饿模式
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
   if old&(mutexLocked|mutexStarving) == 0 {
    //不是饥饿状态,并且成功获取到锁了,返回
    break
   }

   //是否已经等待过了
   queueLifo := waitStartTime != 0
   if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
   }

   // 阻塞等待
   // queueLifo 为 true,表示加入到队列头。否则,加入到队列尾。
   // (首次加入队列加入到队尾,不是首次加入则加入队头,这样等待最久的 goroutine 优先能够获取到锁。)
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)

   // 从等待队列中唤醒,根据等待时间,判断锁是否应该进入饥饿模式。
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

   //如果锁已经是饥饿状态了,直接抢锁返回
   if old&mutexStarving != 0 {
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }

    //加锁并且等待者数量减一
    delta := int32(mutexLocked - 1<<mutexWaiterShift)

    // 清除饥饿状态的两种情况:
    //. 如果不需要进入饥饿模式(当前被唤醒的 goroutine 的等待时间小于 1ms)
    //. 原来的等待者数量为 1,说明是最后一个被唤醒的 goroutine。
    if !starving || old>>mutexWaiterShift == 1 {
     //清除饥饿状态
     delta -= mutexStarving
    }

    //设置状态,加锁
    atomic.AddInt32(&m.state, delta)
    break
   }
   // 设置唤醒标记,重新抢占锁(会与那些运行中的 goroutine 一起竞争锁)
   awoke = true
   iter = 0
  } else {
   old = m.state
  }
 }
}

解锁部分:

代码语言:javascript复制
func (m *Mutex) Unlock() {
 new := atomic.AddInt32(&m.state, -mutexLocked)
 //还有人阻塞在信号量上,需要进行唤醒
 if new != 0 {
  m.unlockSlow(new)
 }
}

func (m *Mutex) unlockSlow(new int32) {
 if (new mutexLocked)&mutexLocked == 0 {
  fatal("sync: unlock of unlocked mutex")
 }

 //非饥饿模式
 if new&mutexStarving == 0 {
  old := new
  for {
   //以下情况都直接结束,不继续往下:
   //1、如果没有人阻塞在信号量上了
   //2、其他人加锁了
   //3、已经有人唤醒协程了
   //4、锁又切换成饥饿模式了
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    return
   }

   //waiter-1 并且将唤醒标志置为1
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
    //如果cas执行成功就唤醒队头协程
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
   old = m.state
  }
 } else {
  //饥饿模式下直接唤醒队头,这里分为两种情况:
  //1、如果“mutexLocked”未设置,等待者在唤醒后会获取到锁。
  //2、如果只设置了“mutexStarving”,则仍然认为互斥锁已被锁定,因此新到来的goroutine不会获取它,唤醒的协程会获取到锁。
  runtime_Semrelease(&m.sema, true, 1)
 }
}

关于信号量相关接口:

代码语言:javascript复制
//lifo:true 加入等待队列的队头,否则加入队尾
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

//handoff=true:将当前G直接放到runq,调度器可以立即该G,并且会继承上一个G的时间片,
//这样的目的是为了使当前G可以得到更快的调度。
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

面试官:不错,这就是Golang mutex目前的设计,兼容了公平与效率的方案。

三、总结

1、互斥锁是一种常见的控制并发读写资源的手段,go 中的互斥锁实现是 sync.Mutex。 2、Mutex 的对外的接口:

  • Lock:同一时刻只能有一个 goroutine 可以获取到锁,其他goroutine会先通过自旋抢占锁,抢不到则阻塞等待。
  • Unlock:释放锁,释放锁之后,会唤醒等待队列中的下一个 goroutine。

3、使用 Mutex 需要注意以下几点:

  • Lock与Unlock需要成对出现,在 Unlock 之前,必须已经调用了 Lock,否则会 panic。
  • 不要将 Mutex 作为函数或方法的参数传递。
  • 不要在锁内部执行阻塞或耗时操作。
  • 可以通过 vet 这个命令行来检查上面的锁 copy 的问题。

4、go 的 Mutex 基于以下技术实现:

  • 信号量:操作系统层面的同步机制。
  • 队列:在协程抢锁失败后,会将这些协程放入一个 FIFO 队列中,下次唤醒会唤醒队列头的协程。
  • 原子操作:通过cas操作状态字段state,可以保证数据的完整性。

5、go Mutex 的两种模式:

  • 正常模式:运行中的 goroutine 有一定机会比等待队列中的 goroutine 先获取到锁,这种模式有更好的性能。
  • 饥饿模式:所有后来的 goroutine 都直接进入等待队列,会依次从等待队列头唤醒 goroutine。可以有效避免【饥饿】。等待队列中的 goroutine 超过了 1ms 还没有获取到锁,那么会进入饥饿模式

四、最后

    “长城不是一天建成的”,Mutex 的设计也是从简单设计到复杂处理逐渐演变的。最初Mutex设计非常简洁,不到20行代码。但是,随着使用者越来越多,需要处理的场景也越来越复杂,逐渐暴露出一些缺陷,为了弥补这些缺陷,Mutex不得不加入越来越多的设计,也逐步变得越来越复杂。

0 人点赞