golang源码分析(4):select

2022-08-02 16:20:58 浏览数 (1)

那么select的实现在go的源码包runtime中,路径为:./src/runtime/select.go。下面,我们先来看select的使用方式:

select {

case c1 <-1:

// TODO somethings

case <-c2:

// TODO somethings

default:

// TODO somethings

}

其实上述代码真正做了下面几件事:

创建select –> 注册case –> 执行select –> 释放select

然后,我们先说一说,select主要是用到的函数:

runtime.reflect_rselect()

runtime.newselect()

runtime.selectsend()

runtime.selectrecv()

runtime.selectdefault()

runtime.selectgo()

先来看定义的几个常量:

const (

// scase.kind

caseNil = iota // 0 :表示case 为nil;在send 或者 recv 发生在一个 nil channel 上,就有可能出现这种情况

caseRecv // 1 : 表示case 为接收通道 <- ch

caseSend // 2 :表示case 为发送通道 ch <-

caseDefault // 3 :表示 default 语句块

)

然后接着来看几个重要的结构体:

/**

定义select 结构

*/

type hselect struct {

tcase uint16 // total count of scase[] 总的case数目

ncase uint16 // currently filled scase[] 目前已经注册的case数目

pollorder *uint16 // case poll order 【超重要】 轮询的case序号

lockorder *uint16 // channel lock order 【超重要】chan的锁定顺序

// case 数组,为了节省一个指针的 8 个字节搞成这样的结构

// 实际上要访问后面的值,还是需要进行指针移动

// 指针移动使用 runtime 内部的 add 函数

scase [1]scase // one per case (in order of appearance) 【超重要】保存当前case操作的chan (按照轮询顺序)

}

/**

select 中每一个case的定义

*/

type scase struct {

elem unsafe.Pointer // data element 数据指针

c *hchan // chan 当前case所对应的chan引用

pc uintptr // return pc (for race detector / msan) 和汇编中的pc同义,表示 程序计数器,用于指示当前将要执行的下一条机器指令的内存地址

kind uint16 // 通道的类型

receivedp *bool // pointer to received bool, if any

releasetime int64

}

我们可以看出,hselect中scase字段其实是一个单元素的数组,那么我们肯定很奇怪。只用来保存当前case所操作的case?那么其他case操作的chan都在哪里呢?通过参考文章:https://ninokop.github.io/2017/11/07/Go-Select的实现/ 及 http://www.debugrun.com/a/vJjDJmZ.html

那么具体是什么时候去做上面我们所说的:创建select –> 注册case –> 执行select –> 释放select ?我们会看到下面

//go:linkname reflect_rselect reflect.rselect

func reflect_rselect(cases []runtimeSelect) (chosen int, recvOK bool) {

// flagNoScan is safe here, because all objects are also referenced from cases.

size := selectsize(uintptr(len(cases)))

sel := (*hselect)(mallocgc(size, nil, true))

newselect(sel, int64(size), int32(len(cases)))

r := new(bool)

for i := range cases {

rc := &cases[i]

switch rc.dir {

case selectDefault:

selectdefault(sel)

case selectSend:

selectsend(sel, rc.ch, rc.val)

case selectRecv:

selectrecv(sel, rc.ch, rc.val, r)

}

}

chosen = selectgo(sel)

recvOK = *r

return

}

可以看得出来,其实该函数的真正外部触发是在 reflect包中的rselct() 函数【具体为什么请自行研究下 //go:linkname 的使用】。而当前这个函数呢其实是先实例化一个 select 实例,然后根据 入参的 cases 逐个的去注册对应的 chan操作,最后调用 selectgo(sel) 去执行case。下面我们再来看看实例化select:

实例化select:

func newselect(sel *hselect, selsize int64, size int32) {

if selsize != int64(selectsize(uintptr(size))) {

print("runtime: bad select size ", selsize, ", want ", selectsize(uintptr(size)), "n")

throw("bad select size")

}

sel.tcase = uint16(size)

sel.ncase = 0

// hselect 这个结构体的是长下面这样:

// 【超重要】 header |scase[0]|scase[1]|scase[2]|lockorder[0]|lockorder[1]|lockorder[2]|pollorder[0]|pollorder[1]|pollorder[2]

// 各个 channel 加锁的顺序

sel.lockorder = (*uint16)(add(unsafe.Pointer(&sel.scase), uintptr(size)*unsafe.Sizeof(hselect{}.scase[0])))

// case 轮询的顺序初始化

sel.pollorder = (*uint16)(add(unsafe.Pointer(sel.lockorder), uintptr(size)*unsafe.Sizeof(*hselect{}.lockorder)))

if debugSelect {

print("newselect s=", sel, " size=", size, "n")

}

}

可以看出来,newselect就是在内存上创建一个选择器。其实就是初始化了某些内存空间。然后紧接着就是遍历所有的case通过判断反射得到的case中的类型去注册相应的注册函数。

具体的我们再来看看hselect,hselect的最后是一个[1]scase表示select中只保存了一个case的空间,说明hselect在内存只是个头部,select后面保存了所有的scase,这段Scases的大小就是tcase。在 go runtime实现中经常看到这种 头部 连续内存 的方式。

此图就是整个具备 6 个case 的选择器的内存模型了,这一整块内存结构其实也是由 头部结构 数据结构 组成,头部就是 Select那一部分,对应上面提到的 hselect,数据结构就是 由数组组成。

可以看到 lockorder、pollorder以及scase 【黑色部分】都是6个单元的数组,黑色部分的scase的第一个单元位于Select头部结构内存空间中,这个单元就是 hselect结构体中的 [1]scase的内容了。在开辟上图的这块内存时,从头部开始这一整块内存是由一次类malloc(为什么是类malloc,因为Go有自己的内存管理接口,不是采用的普通malloc)调用分配的,然后再将Select头部结构中的lockorder和pollorder两个指针分别指向正确的位置即可。当然,在一口气分配这块内存前,是事先算好了所有需要的内存的大小的。在malloc分配这块内存的时候,scase就只需要分配一个单元就可以了,所以上图中可以看出只是多加了5个scase的 存储单元。这样一来,scase字段和lockorder、pollorder就没有什么两样了,殊途同归。然后这三者的内容的填充都是在注册case中完成。

好了,现在我们知道其实在go内部是用了连续内存的方式来存储select及case中的内容。

注册case:

select中注册case channel有三种,分别是:selectsend、selectrecv、selectdefault 分别对应着不同的case。他们的注册方式一致,都是 ncase 1 (记录目前已经注册的case数目),然后按照当前的 index 填充scases域的scase数组的相关字段,主要是用case中的chan和case类型填充c和kind字段。代码如下:

/**

主要针对 case send 通道

如:

select {

case ch<-1:

}

*/

func selectsend(sel *hselect, c *hchan, elem unsafe.Pointer) {

pc := getcallerpc(unsafe.Pointer(&sel))

i := sel.ncase

if i >= sel.tcase {

throw("selectsend: too many cases")

}

// 注册数量加一

sel.ncase = i 1

if c == nil {

return

}

// 初始化对应的 scase 结构

cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))

cas.pc = pc

cas.c = c

cas.kind = caseSend

cas.elem = elem

if debugSelect {

print("selectsend s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "n")

}

}

/**

主要针对 case recv 通道

如:

select {

case <-ch: ==> 这时候就会调用 selectrecv; case ,ok <- ch: 也可以这样写

}

在 ch 被关闭时,这个 case 每次都可能被轮询到

*/

func selectrecv(sel *hselect, c *hchan, elem unsafe.Pointer, received *bool) {

pc := getcallerpc(unsafe.Pointer(&sel))

i := sel.ncase

if i >= sel.tcase {

throw("selectrecv: too many cases")

}

// 注册数量加一

sel.ncase = i 1

if c == nil {

return

}

// 初始化对应的 scase 结构

cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))

cas.pc = pc

cas.c = c

cas.kind = caseRecv

cas.elem = elem

cas.receivedp = received

if debugSelect {

print("selectrecv s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "n")

}

}

/**

主要针对 default

如:

select {

default:

}

*/

func selectdefault(sel *hselect) {

pc := getcallerpc(unsafe.Pointer(&sel))

i := sel.ncase

if i >= sel.tcase {

throw("selectdefault: too many cases")

}

// 注册数量加一

sel.ncase = i 1

// 初始化对应的 scase 结构

cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))

cas.pc = pc

cas.c = nil

cas.kind = caseDefault

if debugSelect {

print("selectdefault s=", sel, " pc=", hex(cas.pc), "n")

}

}

select的执行:

pollorder:保存的是scase的序号,乱序是为了之后执行时的随机性。

lockorder:保存了所有case中channel的地址,这里按照地址大小堆排了一下lockorder对应的这片连续内存。对chan排序是为了去重,保证之后对所有channel上锁时不会重复上锁。

select 语句执行时会对整个chanel加锁

select 语句会创建select对象 如果放在for循环中长期执行可能会频繁的分配内存

select执行过程总结如下:

通过pollorder的序号,遍历scase找出已经准备好的case。如果有就执行普通的chan读写操作。其中准备好的case是指 可以不阻塞完成读写chan的case,或者读已经关闭的chan的case。

如果没有准备好的case,则尝试defualt case。

如果以上都没有,则把当前的 G (协程)封装好挂到scase所有chan的阻塞链表中,按照chan的操作类型挂到sendq或recvq中。

这个 G 被某个chan唤醒,遍历scase找到目标case,放弃当前G在其他chan中的等待,返回。

在说执行之前我们先来看一下加锁解锁:

// 对所有 case 对应的 channel 加锁

// 需要按照 lockorder 数组中的元素索引来搞

// 否则可能有循环等待的死锁

func sellock(scases []scase, lockorder []uint16) {

var c *hchan

for _, o := range lockorder {

c0 := scases[o].c

if c0 != nil && c0 != c {

c = c0

// 其实也是用对应的hchan中的mutex去对当前hchan操作

// hchan 被定义在 ./src/runtime/chan.go 中,就是channel的定义

lock(&c.lock)

}

}

}

// 解锁,比较简单

func selunlock(scases []scase, lockorder []uint16) {

// We must be very careful here to not touch sel after we have unlocked

// the last lock, because sel can be freed right after the last unlock.

// Consider the following situation.

// First M calls runtime·park() in runtime·selectgo() passing the sel.

// Once runtime·park() has unlocked the last lock, another M makes

// the G that calls select runnable again and schedules it for execution.

// When the G runs on another M, it locks all the locks and frees sel.

// Now if the first M touches sel, it will access freed memory.

/**

我们必须非常小心,在我们解锁最后一个锁之后不要触摸sel,因为sel可以在最后一次解锁后立即释放。

考虑以下情况:

第一个M在运行时调用runtime·park()·selectgo()传递sel。

一旦runtime·park()解锁了最后一个锁,另一个M使得再次调用select runnable的G并安排它执行。

当G在另一个M上运行时,它会锁定所有锁并释放sel。

现在,如果第一个M接触sel,它将访问释放的内存。

*/

for i := len(scases) - 1; i >= 0; i-- {

c := scases[lockorder[i]].c

if c == nil {

break

}

if i > 0 && c == scases[lockorder[i-1]].c {

continue // will unlock it on the next iteration

}

// 其实也是用对应的hchan中的mutex去对当前hchan操作

// hchan 被定义在 ./src/runtime/chan.go 中,就是channel的定义

unlock(&c.lock)

}

}

/**

和 gopark 相关的

*/

func selparkcommit(gp *g, _ unsafe.Pointer) bool {

// This must not access gp's stack (see gopark). In

// particular, it must not access the *hselect. That's okay,

// because by the time this is called, gp.waiting has all

// channels in lock order.

/**

这不能访问gp的堆栈(参见gopark)。 特别是,它不能访问* hselect。 没关系,因为在调用它的时候,gp.waiting的所有通道都处于锁定状态。

*/

var lastc *hchan

for sg := gp.waiting; sg != nil; sg = sg.waitlink {

if sg.c != lastc && lastc != nil {

// As soon as we unlock the channel, fields in

// any sudog with that channel may change,

// including c and waitlink. Since multiple

// sudogs may have the same channel, we unlock

// only after we've passed the last instance

// of a channel.

/**

一旦我们解锁频道,任何具有该频道的sudog中的字段都可能发生变化,包括c和 waitlink。 由于多个sudog可能具有相同的通道,因此只有在我们通过了通道的最后一个实例后才会解锁。

*/

unlock(&lastc.lock)

}

lastc = sg.c

}

if lastc != nil {

unlock(&lastc.lock)

}

return true

}

下面我们来说一说,如何执行select吧,具体代码如下:

// selectgo 是在初始化完成之后执行 select 逻辑的函数

// 返回值是要执行的 scase 的 index

func selectgo(sel *hselect) int {

if debugSelect {

print("select: sel=", sel, "n")

}

if sel.ncase != sel.tcase {

throw("selectgo: case count mismatch")

}

// len 和 cap 就是 scase 的总数

// 这时候 ncase 和 tcase 已经是相等的了

scaseslice := slice{unsafe.Pointer(&sel.scase), int(sel.ncase), int(sel.ncase)}

scases := *(*[]scase)(unsafe.Pointer(&scaseslice))

var t0 int64

if blockprofilerate > 0 {

t0 = cputicks()

for i := 0; i < int(sel.ncase); i {

scases[i].releasetime = -1

}

}

// The compiler rewrites selects that statically have

// only 0 or 1 cases plus default into simpler constructs.

// The only way we can end up with such small sel.ncase

// values here is for a larger select in which most channels

// have been nilled out. The general code handles those

// cases correctly, and they are rare enough not to bother

// optimizing (and needing to test).

/**

编译器将静态只有0或1个案例的选择重写为更简单的构造。 我们在这里得到如此小的sel.ncase值的唯一方法是选择大多数通道已被填充的更大选择。 通用代码正确处理这些情况,并且它们非常罕见,不会打扰优化(并且需要测试)。

*/

// generate permuted order

// 生成置换顺序

pollslice := slice{unsafe.Pointer(sel.pollorder), int(sel.ncase), int(sel.ncase)}

pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))

/**

洗牌

*/

for i := 1; i < int(sel.ncase); i {

j := fastrandn(uint32(i 1))

pollorder[i] = pollorder[j]

pollorder[j] = uint16(i)

}

// sort the cases by Hchan address to get the locking order.

// simple heap sort, to guarantee n log n time and constant stack footprint.

/**

通过Hchan地址对案例进行排序以获得锁定顺序。

简单的堆排序,以保证n log n时间和不断的堆栈占用。

*/

// 按 hchan 的地址来进行排序,以生成加锁顺序

// 用堆排序来保证 nLog(n) 的时间复杂度

lockslice := slice{unsafe.Pointer(sel.lockorder), int(sel.ncase), int(sel.ncase)}

lockorder := *(*[]uint16)(unsafe.Pointer(&lockslice))

for i := 0; i < int(sel.ncase); i {

// 初始化 lockorder 数组

j := i

// Start with the pollorder to permute cases on the same channel.

// 从pollorder开始,在同一频道上置换案例。

c := scases[pollorder[i]].c

for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {

k := (j - 1) / 2

lockorder[j] = lockorder[k]

j = k

}

lockorder[j] = pollorder[i]

}

for i := int(sel.ncase) - 1; i >= 0; i-- {

o := lockorder[i]

c := scases[o].c

lockorder[i] = lockorder[0]

j := 0

for {

k := j*2 1

if k >= i {

break

}

if k 1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k 1]].c.sortkey() {

k

}

if c.sortkey() < scases[lockorder[k]].c.sortkey() {

lockorder[j] = lockorder[k]

j = k

continue

}

break

}

lockorder[j] = o

}

/*

for i := 0; i 1 < int(sel.ncase); i {

if scases[lockorder[i]].c.sortkey() > scases[lockorder[i 1]].c.sortkey() {

print("i=", i, " x=", lockorder[i], " y=", lockorder[i 1], "n")

throw("select: broken sort")

}

}

*/

// lock all the channels involved in the select

// 锁定选择中涉及的所有通道

// 对涉及到的所有 channel 都加锁

sellock(scases, lockorder)

var (

gp *g

done uint32

sg *sudog

c *hchan

k *scase

sglist *sudog

sgnext *sudog

qp unsafe.Pointer

nextp **sudog

)

/**

走到这里,说明各种准备工作已经完成,要开始真正干活了

*/

loop:

// pass 1 - look for something already waiting

// 第一部分: 寻找已经等待的东西

var dfli int

var dfl *scase

var casi int

var cas *scase

// 虽然看着是一个 for 循环

// 但实际上有 case ready 的时候

// 直接就用 goto 跳出了

for i := 0; i < int(sel.ncase); i {

// 按 pollorder 的顺序进行遍历

casi = int(pollorder[i])

cas = &scases[casi]

c = cas.c

switch cas.kind {

case caseNil:

continue

case caseRecv:

// <- ch 的情况

// 根据有没有等待的 goroutine 队列执行不同的操作

sg = c.sendq.dequeue()

if sg != nil {

goto recv

}

if c.qcount > 0 {

goto bufrecv

}

if c.closed != 0 {

goto rclose

}

case caseSend:

// ch <- 1 的情况,也是一些基本的 channel 操作

if raceenabled {

racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)

}

if c.closed != 0 {

goto sclose

}

sg = c.recvq.dequeue()

if sg != nil {

goto send

}

if c.qcount < c.dataqsiz {

goto bufsend

}

case caseDefault:

dfli = casi

dfl = cas

}

}

// 这里是在前面进了 caseDefault 才会走到

if dfl != nil {

selunlock(scases, lockorder)

casi = dfli

cas = dfl

goto retc

}

// pass 2 - enqueue on all chans

// 第二部分:在所有的chan上排队

// 没有任何一个 case 满足,且没有 default

// 这种情况下需要把当前的 goroutine 入队所有 channel 的等待队列里

gp = getg()

done = 0

if gp.waiting != nil {

throw("gp.waiting != nil")

}

nextp = &gp.waiting

// 按照加锁的顺序把 gorutine 入每一个 channel 的等待队列

for _, casei := range lockorder {

casi = int(casei)

cas = &scases[casi]

if cas.kind == caseNil {

continue

}

c = cas.c

sg := acquireSudog()

sg.g = gp

// Note: selectdone is adjusted for stack copies in stack1.go:adjustsudogs

// selectdone在stack1.go:adjustsudogs中针对堆栈副本进行了调整

sg.selectdone = (*uint32)(noescape(unsafe.Pointer(&done)))

// No stack splits between assigning elem and enqueuing

// sg on gp.waiting where copystack can find it.

// 在gp.waiting上分配elem和排队sg之间没有堆栈分割,copystack可以在其中找到它。

sg.elem = cas.elem

sg.releasetime = 0

if t0 != 0 {

sg.releasetime = -1

}

sg.c = c

// Construct waiting list in lock order.

// 按锁定顺序构造等待列表。

*nextp = sg

nextp = &sg.waitlink

switch cas.kind {

case caseRecv:

// recv 的情况进 recvq

c.recvq.enqueue(sg)

case caseSend:

// send 的情况进 sendq

c.sendq.enqueue(sg)

}

}

// wait for someone to wake us up

// 等待有人叫醒我们

gp.param = nil

// 当前 goroutine 进入休眠,同时解锁channel,等待被唤醒(selparkcommit这里实现的)

gopark(selparkcommit, nil, "select", traceEvGoBlockSelect, 1)

// While we were asleep, some goroutine came along and completed

// one of the cases in the select and woke us up (called ready).

// As part of that process, the goroutine did a cas on done above

// (aka *sg.selectdone for all queued sg) to win the right to

// complete the select. Now done = 1.

//

// If we copy (grow) our own stack, we will update the

// selectdone pointers inside the gp.waiting sudog list to point

// at the new stack. Another goroutine attempting to

// complete one of our (still linked in) select cases might

// see the new selectdone pointer (pointing at the new stack)

// before the new stack has real data; if the new stack has done = 0

// (before the old values are copied over), the goroutine might

// do a cas via sg.selectdone and incorrectly believe that it has

// won the right to complete the select, executing a second

// communication and attempting to wake us (call ready) again.

//

// Then things break.

//

// The best break is that the goroutine doing ready sees the

// _Gcopystack status and throws, as in #17007.

// A worse break would be for us to continue on, start running real code,

// block in a semaphore acquisition (sema.go), and have the other

// goroutine wake us up without having really acquired the semaphore.

// That would result in the goroutine spuriously running and then

// queue up another spurious wakeup when the semaphore really is ready.

// In general the situation can cascade until something notices the

// problem and causes a crash.

//

// A stack shrink does not have this problem, because it locks

// all the channels that are involved first, blocking out the

// possibility of a cas on selectdone.

//

// A stack growth before gopark above does not have this

// problem, because we hold those channel locks (released by

// selparkcommit).

//

// A stack growth after sellock below does not have this

// problem, because again we hold those channel locks.

//

// The only problem is a stack growth during sellock.

// To keep that from happening, run sellock on the system stack.

//

// It might be that we could avoid this if copystack copied the

// stack before calling adjustsudogs. In that case,

// syncadjustsudogs would need to recopy the tiny part that

// it copies today, resulting in a little bit of extra copying.

//

// An even better fix, not for the week before a release candidate,

// would be to put space in every sudog and make selectdone

// point at (say) the space in the first sudog.

// 有故事发生,被唤醒,再次该select下全部channel加锁

systemstack(func() {

sellock(scases, lockorder)

})

sg = (*sudog)(gp.param)

gp.param = nil

// pass 3 - dequeue from unsuccessful chans

// otherwise they stack up on quiet channels

// record the successful case, if any.

// We singly-linked up the SudoGs in lock order.

// 被唤醒后执行下面的代码

/**

从不成功的chans出发,否则他们在安静的频道上叠加记录成功案例,如果有的话。 我们以锁定顺序单独链接SudoGs。

*/

casi = -1

cas = nil

sglist = gp.waiting

// Clear all elem before unlinking from gp.waiting.

// 在从gp.waiting取消链接之前清除所有元素。

for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {

sg1.selectdone = nil

sg1.elem = nil

sg1.c = nil

}

gp.waiting = nil

for _, casei := range lockorder {

k = &scases[casei]

if k.kind == caseNil {

continue

}

if sglist.releasetime > 0 {

k.releasetime = sglist.releasetime

}

if sg == sglist {

// sg has already been dequeued by the G that woke us up.

casi = int(casei)

cas = k

} else {

c = k.c

if k.kind == caseSend {

c.sendq.dequeueSudoG(sglist)

} else {

c.recvq.dequeueSudoG(sglist)

}

}

sgnext = sglist.waitlink

sglist.waitlink = nil

releaseSudog(sglist)

sglist = sgnext

}

// 如果还是没有的话再次走 loop 逻辑

if cas == nil {

// We can wake up with gp.param == nil (so cas == nil)

// when a channel involved in the select has been closed.

// It is easiest to loop and re-run the operation;

// we'll see that it's now closed.

// Maybe some day we can signal the close explicitly,

// but we'd have to distinguish close-on-reader from close-on-writer.

// It's easiest not to duplicate the code and just recheck above.

// We know that something closed, and things never un-close,

// so we won't block again.

goto loop

}

c = cas.c

if debugSelect {

print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "n")

}

if cas.kind == caseRecv {

if cas.receivedp != nil {

*cas.receivedp = true

}

}

if raceenabled {

if cas.kind == caseRecv && cas.elem != nil {

raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)

} else if cas.kind == caseSend {

raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)

}

}

if msanenabled {

if cas.kind == caseRecv && cas.elem != nil {

msanwrite(cas.elem, c.elemtype.size)

} else if cas.kind == caseSend {

msanread(cas.elem, c.elemtype.size)

}

}

selunlock(scases, lockorder)

goto retc

bufrecv:

// can receive from buffer

if raceenabled {

if cas.elem != nil {

raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)

}

raceacquire(chanbuf(c, c.recvx))

racerelease(chanbuf(c, c.recvx))

}

if msanenabled && cas.elem != nil {

msanwrite(cas.elem, c.elemtype.size)

}

if cas.receivedp != nil {

*cas.receivedp = true

}

qp = chanbuf(c, c.recvx)

if cas.elem != nil {

typedmemmove(c.elemtype, cas.elem, qp)

}

typedmemclr(c.elemtype, qp)

c.recvx

if c.recvx == c.dataqsiz {

c.recvx = 0

}

c.qcount--

selunlock(scases, lockorder)

goto retc

bufsend:

// can send to buffer

if raceenabled {

raceacquire(chanbuf(c, c.sendx))

racerelease(chanbuf(c, c.sendx))

raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)

}

if msanenabled {

msanread(cas.elem, c.elemtype.size)

}

typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)

c.sendx

if c.sendx == c.dataqsiz {

c.sendx = 0

}

c.qcount

selunlock(scases, lockorder)

goto retc

recv:

// can receive from sleeping sender (sg)

recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)

if debugSelect {

print("syncrecv: sel=", sel, " c=", c, "n")

}

if cas.receivedp != nil {

*cas.receivedp = true

}

goto retc

rclose:

// read at end of closed channel

selunlock(scases, lockorder)

if cas.receivedp != nil {

*cas.receivedp = false

}

if cas.elem != nil {

typedmemclr(c.elemtype, cas.elem)

}

if raceenabled {

raceacquire(unsafe.Pointer(c))

}

goto retc

send:

// can send to a sleeping receiver (sg)

if raceenabled {

raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)

}

if msanenabled {

msanread(cas.elem, c.elemtype.size)

}

send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)

if debugSelect {

print("syncsend: sel=", sel, " c=", c, "n")

}

goto retc

retc:

if cas.releasetime > 0 {

blockevent(cas.releasetime-t0, 1)

}

return casi

sclose:

// send on closed channel

// 向关闭 channel 发送数据

// 直接 panic

selunlock(scases, lockorder)

panic(plainError("send on closed channel"))

}

在上述代码中具体做了些什么事呢?下面我们来分析分析。

case洗牌 -> case堆排去重 -> 所有channel 上锁 -> 遍历(洗牌后的)case找出第一个可以执行的case -> 如果没有就执行default -> 如果都没有就把当前G挂起到所有case 所对应的chan等待唤醒,同时解锁所有channel -> 被唤醒后再次对所有channel上锁 -> 最后一次循环操作,获取可执行case,其余全部出队列丢弃 -> 没有的话再次走 loop 逻辑

可以用一张图表示:(图参考自:https://blog.csdn.net/u011957758/article/details/82230316)

最后:

select对所有涉及到的channel都加锁。

在对所有的channel枷锁之前,有一个对所有的channel做了一次堆排的动作,目的为了去重channel

select并不是随机选择case去执行,而是事先将所有的case进行洗牌,然后再从头到尾去遍历,选择出第一个可以执行的case。

在for {} 结构中的 select 每一次for 都会经历上述的 4各阶段,创建 -> 注册 -> 执行 -> 释放;所以select的执行是有代价的而且代价不低。

0 人点赞