万字图解| 深入揭秘Golang锁结构:Mutex(上)

2024-01-25 16:05:40 浏览数 (2)

大家好,我是「云舒编程」,今天我们来聊聊Golang锁结构:Mutex。

文章首发于微信公众号:云舒编程

一、前言

   Golang的Mutex算是在日常开发中最常见的组件了,并且关于锁的知识也是面试官最喜欢问的。    曾经在一次腾讯面试中,被面试官问得体无完肤。    虽然Golang Mutex只有短短的200多行,但是已经是一个极其丰富、精炼的组件,有极其复杂的状态控制。我已经看过很多次Mutex的源码,但是总是过段时间就会又处于懵逼状态,不得其道。分析下来,猜测是缺少“历史背景”,一上来就看到的是已经经过好几轮优化的代码,但是不清楚这么优化的背景,同时也缺少一些场景,就会导致无法理解一些设计。    其实如果我们去追溯 Mutex 的演进历史,会发现,Mutex最开始是一个非常简单的实现,简单到难以置信的地步,是Go开发者们经过了好几轮的优化才变成了现在这么一个非常复杂的数据结构,这是一个逐步完善的过程。    于是我想如果我们是设计者,我们会怎么去设计去优化一个锁的实现呢?    下面我将结合我曾经的腾讯面试经历 加上 代入“设计者”的角度出发,结合Mutex 的演进历史,去分析如何设计一个功能完备的锁。希望经过本文的分析,你也可以从零设计出属于你的「Mutex」。    友情提醒:文章很长,但是绝对值得一读。

二、面试中遇到Mutex

   这里大致还原下之前的面试流程,由于当初并没有答上来,所以会加入一些下来学习后的思考,从而让剧情可以顺利开展。 面试官:对Mutex熟悉吗? :熟悉,Mutex是Golang 中的锁,主要是控制并发访问资源,保护共享资源。 面试官:那如果让你去实现Mutex,你会怎么设计? :简单,定义一个int变量,0代表没有加锁,1代表加锁了,初始状态为0。然后使用一个For循环去尝试加锁,加锁时使用Atomic的CAS尝试将值从0改为1,如果成功了就获取锁,如果没有成功就继续For循环也就是自旋尝试获取锁,直到成功为止。拿到锁的线程想解锁的话也是通过Atomic的CAS尝试将值从1改为0。

代码语言:javascript复制
type Mutex struct {
	key  int32
}

func (m *Mutex) Lock() {
	for {
		if atomic.CompareAndSwapInt32(&m.key, 0, 1) {
			return
		}
	}
}

func (m *Mutex) Unlock() {
	atomic.CompareAndSwapInt32(&m.key, 1, 0)
}

面试官:那你觉得你的这个设计有没有问题? :性能会比较差,由于加锁是通过自旋实现的,如果有很多Goroutine在竞争锁的话,会导致CPU资源被打满,同时也是浪费资源。 面试官:那怎么优化呢? :可以利用信号量去实现Goroutine的睡眠和唤醒,避免自旋浪费CPU。

代码语言:javascript复制
type Mutex struct {
	key  int32
	sema uint32
}

func (m *Mutex) Lock() {
	if atomic.AddInt32(&m.key, 1) ==  1 {  //值从0变为1,则证明加锁成功
		return
	}
	runtime.Semacquire(&m.sema) //等待信号量
}

func (m *Mutex) Unlock() {
	if atomic.AddInt32(&m.key, -1) == 0 { //值-1后,变为0证明没有协程等待锁了,可以直接返回
		return
	}
	runtime.Semrelease(&m.sema) //唤起信号量上的协程
}

面试官:回答不错,那你觉得这版的实现还有其他问题吗? :首先这版的实现解决了自旋的性能问题,其次由于信号量有一个先进先出的等待队列,所以也可以保证竞争锁的Goroutine是先到先得的,保证了公平。但是这样的公平其实在调度层面又是效率不高的。 面试官:这里的「效率不高」从何说起? :假设有三个协程分别是G1,G2,G3。G1首先加锁成功了,然后执行业务逻辑。期间G2想加锁发现加不上,就进入了信号量的等待队列,这个时候G2可能已经被调度器从M上调走了。然后G1解锁,这个时候G3想加锁发现由于G2在他前面进行了等待,所以导致G3加不上。这种情况由于G2没有获得CPU时间片,但是G3已经获得了CPU时间片,所以直接把锁给G3从整体上来说,效率会更加高些。 面试官:非常好,那应该怎么实现呢? :这里的本质是想给处于运行态的Goroutine 直接获得锁的机会,将上面的代码修改为: 这样新来的Goroutine可以有一次直接获取锁的机会

代码语言:javascript复制
type Mutex struct {
	key  int32
	sema uint32
}

func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&m.key, 0, 1) {
		return
	}
	for {
		if atomic.CompareAndSwapInt32(&m.key, 0, 1) {
			return
		}
		runtime.Semacquire(&m.sema)
	}
}

func (m *Mutex) Unlock() {
	if atomic.CompareAndSwapInt32(&m.key, 1, 0) {
		runtime.Semrelease(&m.sema)
	}
}

面试官:这样写,有一个明显的问题,你知道「惊群效应」吗? :哦对,假设先有三个协程G1,G2,G3。G1加锁成功,G2,G3进入了信号量的等待队列。然后G1解锁,G2被唤醒,这个时候来了个新协程G4,G2和G4一起竞争锁,G4竞争成功。然后G4很快执行完进行解锁,然后G3就被唤醒了,这个时候就存在G2,G3一起竞争锁的场景。如果在高并发场景,这样的唤醒竞争还会更加激烈。同时也违背了G2,G3在信号量上先来先得的设计。 面试官:对,那应该怎么解决呢? :可以考虑增加一个标识,如果发现已经有协程被唤醒了,后来的协程就只是解锁,但是不唤醒协程。 面试官:还有一个问题,如果是最后一个协程,那他解锁的时候还需要进行runtime.Semrelease(&m.sema)吗? :哦对,这里也需要考虑。总结下我们的锁还需要以下额外能力:          1、信号量:阻塞唤醒协程          2、知道当前有多少协程阻塞在信号量上          3、避免「惊群效应」 面试官:嗯,差不多了。说说你的设计思路吧 :按照一般的设计我们需要使用三个变量控制,类似下面这样

代码语言:javascript复制
type Mutex struct {
	key  int32  //控制是否加锁成功,1:加锁 0:未加锁
    woken bool  //是否已经有协程被唤醒了
    waiter int32 //记录当前有多少协程阻塞在信号量上
	sema uint32 //信号量
}

面试官:可以这样设计,不过可以优化下吗?很多框架组件设计都会充分利用变量的高位低位,你这里可以这样设计吗? :哦对,我再优化下,定义int32的变量state将其分为三部分:

代码语言:javascript复制
type Mutex struct {
	state int32
	sema  uint32
}

那么加锁逻辑就变为:

代码语言:javascript复制
const (
	mutexLocked      = 1 // mutex is locked
	mutexWoken       = 2 
	mutexWaiterShift = 2
)

type Mutex struct {
	state int32
	sema  uint32
}

func (m *Mutex) Lock() {
	//给新来的协程直接加锁的机会
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}

	//上面没有加锁成功,尝试在接下来的唤醒中去竞争锁
	awoke := false //表示当前协程是不是被唤醒的
	for {
		old := m.state
		new := old | mutexLocked // 设置锁标志位为1
		if old&mutexLocked != 0 {
			new = old   1<<mutexWaiterShift //锁没有释放,当前协程可能会阻塞在信号量上,先将waiter 1
		}
		if awoke { //尝试清除唤醒标志
			new &^= mutexWoken
		}

		//这里尝试将state从old设置为new。如果代码能够执行到这步,代表了可能发生以下几种情况的一种或者多种
		//1、当前协程尝试加锁
		//2、waiter 1
		//3、清除唤醒标志
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old & mutexLocked == 0{
				//成功获取到锁了,返回
				break
			}
			
			//没有获取到锁,阻塞在信号量上
			runtime.Semacquire(&m.sema)
			awoke = true //执行到这步,证明是被信号量唤醒的,设置唤醒标志
		}
	}
}

对上面的一些运算进行解释: 1、new := old | mutexLocked 将state的最低位设置为1,即代表想加锁; 2、new = old 1<<mutexWaiterShift,首先会执行1<<mutexWaiterShift,即将1左移两位,移位后的值在加上old。1左移两位即避开了mutexLocked和mutexWoken,然后再跟old相加,就可以实现waiter 1。 3、new &^= mutexWoken,&^ 是一个位清除运算符(bit clear operator,它的作用是将第一个操作数(左操作数)中的位与第二个操作数(右操作数)相对应的位进行比较。如果右操作数的位为 1,则将左操作数的相应位清零(设置为 0)。如果右操作数的位为 0,则左操作数的相应位保持不变。 面试官:写的不错,解锁的部分呢? :别急,马上补上

代码语言:javascript复制
func (m *Mutex) Unlock() {
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if (new mutexLocked)&mutexLocked == 0 {
		panic("sync: unlock of unlocked mutex")
	}
	
	old := new
	for{
		//以下情况都直接结束,不继续往下:
		//1、如果没有人阻塞在信号量上了
		//2、其他人加锁了
		//3、已经有人唤醒协程了
		if old>>mutexWaiterShift == 0 || old & (mutexLocked|mutexWoken) != 0{
			return
		}
		
		new = (old - 1<<mutexWaiterShift) | mutexWoken //waiter-1 并且将唤醒标志置为1
		if atomic.CompareAndSwapInt32(&m.state,old,new){
			//如果cas执行成功就唤醒一个协程
			runtime.Semacquire(&m.sema)
			return
		}
		old = m.state
	}
}

面试官:不错,你的这个设计就是 2011年 Russ Cox 的第二版的Mutex实现逻辑。

三、golang 原版 Mutex

上面的分析过程就是golang Mutex的演进历史,给大家看下golang 原版 Mutex代码。 V1版本极其简单:github地址

代码语言:javascript复制
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sync

package func cas(val *int32, old, new int32) bool

export type Mutex struct {
	key int32;
	sema int32;
}

func xadd(val *int32, delta int32) (new int32) {
	for {
		v := *val;
		if cas(val, v, v delta) {
			return v delta;
		}
	}
	panic("unreached")
}

func (m *Mutex) Lock() {
	if xadd(&m.key, 1) == 1 {
		// changed from 0 to 1; we hold lock
		return;
	}
	sys.semacquire(&m.sema);
}

func (m *Mutex) Unlock() {
	if xadd(&m.key, -1) == 0 {
		// changed from 1 to 0; no contention
		return;
	}
	sys.semrelease(&m.sema);
}

V2版本增加了点细节:github地址

代码语言:javascript复制
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package sync provides basic synchronization primitives such as mutual
// exclusion locks.  Other than the Once and WaitGroup types, most are intended
// for use by low-level library routines.  Higher-level synchronization is
// better done via channels and communication.
//
// Values containing the types defined in this package should not be copied.
package sync

import "sync/atomic"

// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
	state int32
	sema  uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock()
	Unlock()
}

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexWaiterShift = iota
)

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}

	awoke := false
	for {
		old := m.state
		new := old | mutexLocked
		if old&mutexLocked != 0 {
			new = old   1<<mutexWaiterShift
		}
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&mutexLocked == 0 {
				break
			}
			runtime_Semacquire(&m.sema)
			awoke = true
		}
	}
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if (new mutexLocked)&mutexLocked == 0 {
		panic("sync: unlock of unlocked mutex")
	}

	old := new
	for {
		// If there are no waiters or a goroutine has already
		// been woken or grabbed the lock, no need to wake anyone.
		if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
			return
		}
		// Grab the right to wake someone.
		new = (old - 1<<mutexWaiterShift) | mutexWoken
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			runtime_Semrelease(&m.sema)
			return
		}
		old = m.state
	}
}

四、结尾

Golang Mutex一共优化了四版,我们上面已经分析了V1和V2版,后续会继续分析为什么还需要V3,V4版,他们又分别解决了什么问题。 最后我想出几个问题,知道答案的兄弟们可以在评论区回复: 1、为什么不用old = atomic.LoadInt32(&m.state),而直接使用old := m.state,在并发情况下不会出现不一致吗? 2、Unlock中,为啥不直接这么判断

代码语言:javascript复制
if m.state != mutexLocked {
    panic("sync: unlock of unlocked mutex")
}

反而要这么判断:

代码语言:javascript复制
new := atomic.AddInt32(&m.state, -mutexLocked)
	if (new mutexLocked)&mutexLocked == 0 {
		panic("sync: unlock of unlocked mutex")
}

3、如果G1 通过Lock获取了锁, 在持有锁的期间,G2 调用Unlock释放这个锁, 会出现什么现象?

0 人点赞