那么select的实现在go的源码包runtime中,路径为:./src/runtime/select.go。下面,我们先来看select的使用方式:
select {
case c1 <-1:
// TODO somethings
case <-c2:
// TODO somethings
default:
// TODO somethings
}
其实上述代码真正做了下面几件事:
创建select –> 注册case –> 执行select –> 释放select
然后,我们先说一说,select主要是用到的函数:
runtime.reflect_rselect()
runtime.newselect()
runtime.selectsend()
runtime.selectrecv()
runtime.selectdefault()
runtime.selectgo()
先来看定义的几个常量:
const (
// scase.kind
caseNil = iota // 0 :表示case 为nil;在send 或者 recv 发生在一个 nil channel 上,就有可能出现这种情况
caseRecv // 1 : 表示case 为接收通道 <- ch
caseSend // 2 :表示case 为发送通道 ch <-
caseDefault // 3 :表示 default 语句块
)
然后接着来看几个重要的结构体:
/**
定义select 结构
*/
type hselect struct {
tcase uint16 // total count of scase[] 总的case数目
ncase uint16 // currently filled scase[] 目前已经注册的case数目
pollorder *uint16 // case poll order 【超重要】 轮询的case序号
lockorder *uint16 // channel lock order 【超重要】chan的锁定顺序
// case 数组,为了节省一个指针的 8 个字节搞成这样的结构
// 实际上要访问后面的值,还是需要进行指针移动
// 指针移动使用 runtime 内部的 add 函数
scase [1]scase // one per case (in order of appearance) 【超重要】保存当前case操作的chan (按照轮询顺序)
}
/**
select 中每一个case的定义
*/
type scase struct {
elem unsafe.Pointer // data element 数据指针
c *hchan // chan 当前case所对应的chan引用
pc uintptr // return pc (for race detector / msan) 和汇编中的pc同义,表示 程序计数器,用于指示当前将要执行的下一条机器指令的内存地址
kind uint16 // 通道的类型
receivedp *bool // pointer to received bool, if any
releasetime int64
}
我们可以看出,hselect中scase字段其实是一个单元素的数组,那么我们肯定很奇怪。只用来保存当前case所操作的case?那么其他case操作的chan都在哪里呢?通过参考文章:https://ninokop.github.io/2017/11/07/Go-Select的实现/ 及 http://www.debugrun.com/a/vJjDJmZ.html
那么具体是什么时候去做上面我们所说的:创建select –> 注册case –> 执行select –> 释放select ?我们会看到下面
//go:linkname reflect_rselect reflect.rselect
func reflect_rselect(cases []runtimeSelect) (chosen int, recvOK bool) {
// flagNoScan is safe here, because all objects are also referenced from cases.
size := selectsize(uintptr(len(cases)))
sel := (*hselect)(mallocgc(size, nil, true))
newselect(sel, int64(size), int32(len(cases)))
r := new(bool)
for i := range cases {
rc := &cases[i]
switch rc.dir {
case selectDefault:
selectdefault(sel)
case selectSend:
selectsend(sel, rc.ch, rc.val)
case selectRecv:
selectrecv(sel, rc.ch, rc.val, r)
}
}
chosen = selectgo(sel)
recvOK = *r
return
}
可以看得出来,其实该函数的真正外部触发是在 reflect包中的rselct() 函数【具体为什么请自行研究下 //go:linkname 的使用】。而当前这个函数呢其实是先实例化一个 select 实例,然后根据 入参的 cases 逐个的去注册对应的 chan操作,最后调用 selectgo(sel) 去执行case。下面我们再来看看实例化select:
实例化select:
func newselect(sel *hselect, selsize int64, size int32) {
if selsize != int64(selectsize(uintptr(size))) {
print("runtime: bad select size ", selsize, ", want ", selectsize(uintptr(size)), "n")
throw("bad select size")
}
sel.tcase = uint16(size)
sel.ncase = 0
// hselect 这个结构体的是长下面这样:
// 【超重要】 header |scase[0]|scase[1]|scase[2]|lockorder[0]|lockorder[1]|lockorder[2]|pollorder[0]|pollorder[1]|pollorder[2]
// 各个 channel 加锁的顺序
sel.lockorder = (*uint16)(add(unsafe.Pointer(&sel.scase), uintptr(size)*unsafe.Sizeof(hselect{}.scase[0])))
// case 轮询的顺序初始化
sel.pollorder = (*uint16)(add(unsafe.Pointer(sel.lockorder), uintptr(size)*unsafe.Sizeof(*hselect{}.lockorder)))
if debugSelect {
print("newselect s=", sel, " size=", size, "n")
}
}
可以看出来,newselect就是在内存上创建一个选择器。其实就是初始化了某些内存空间。然后紧接着就是遍历所有的case通过判断反射得到的case中的类型去注册相应的注册函数。
具体的我们再来看看hselect,hselect的最后是一个[1]scase表示select中只保存了一个case的空间,说明hselect在内存只是个头部,select后面保存了所有的scase,这段Scases的大小就是tcase。在 go runtime实现中经常看到这种 头部 连续内存 的方式。
此图就是整个具备 6 个case 的选择器的内存模型了,这一整块内存结构其实也是由 头部结构 数据结构 组成,头部就是 Select那一部分,对应上面提到的 hselect,数据结构就是 由数组组成。
可以看到 lockorder、pollorder以及scase 【黑色部分】都是6个单元的数组,黑色部分的scase的第一个单元位于Select头部结构内存空间中,这个单元就是 hselect结构体中的 [1]scase的内容了。在开辟上图的这块内存时,从头部开始这一整块内存是由一次类malloc(为什么是类malloc,因为Go有自己的内存管理接口,不是采用的普通malloc)调用分配的,然后再将Select头部结构中的lockorder和pollorder两个指针分别指向正确的位置即可。当然,在一口气分配这块内存前,是事先算好了所有需要的内存的大小的。在malloc分配这块内存的时候,scase就只需要分配一个单元就可以了,所以上图中可以看出只是多加了5个scase的 存储单元。这样一来,scase字段和lockorder、pollorder就没有什么两样了,殊途同归。然后这三者的内容的填充都是在注册case中完成。
好了,现在我们知道其实在go内部是用了连续内存的方式来存储select及case中的内容。
注册case:
select中注册case channel有三种,分别是:selectsend、selectrecv、selectdefault 分别对应着不同的case。他们的注册方式一致,都是 ncase 1 (记录目前已经注册的case数目),然后按照当前的 index 填充scases域的scase数组的相关字段,主要是用case中的chan和case类型填充c和kind字段。代码如下:
/**
主要针对 case send 通道
如:
select {
case ch<-1:
}
*/
func selectsend(sel *hselect, c *hchan, elem unsafe.Pointer) {
pc := getcallerpc(unsafe.Pointer(&sel))
i := sel.ncase
if i >= sel.tcase {
throw("selectsend: too many cases")
}
// 注册数量加一
sel.ncase = i 1
if c == nil {
return
}
// 初始化对应的 scase 结构
cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
cas.pc = pc
cas.c = c
cas.kind = caseSend
cas.elem = elem
if debugSelect {
print("selectsend s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "n")
}
}
/**
主要针对 case recv 通道
如:
select {
case <-ch: ==> 这时候就会调用 selectrecv; case ,ok <- ch: 也可以这样写
}
在 ch 被关闭时,这个 case 每次都可能被轮询到
*/
func selectrecv(sel *hselect, c *hchan, elem unsafe.Pointer, received *bool) {
pc := getcallerpc(unsafe.Pointer(&sel))
i := sel.ncase
if i >= sel.tcase {
throw("selectrecv: too many cases")
}
// 注册数量加一
sel.ncase = i 1
if c == nil {
return
}
// 初始化对应的 scase 结构
cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
cas.pc = pc
cas.c = c
cas.kind = caseRecv
cas.elem = elem
cas.receivedp = received
if debugSelect {
print("selectrecv s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "n")
}
}
/**
主要针对 default
如:
select {
default:
}
*/
func selectdefault(sel *hselect) {
pc := getcallerpc(unsafe.Pointer(&sel))
i := sel.ncase
if i >= sel.tcase {
throw("selectdefault: too many cases")
}
// 注册数量加一
sel.ncase = i 1
// 初始化对应的 scase 结构
cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
cas.pc = pc
cas.c = nil
cas.kind = caseDefault
if debugSelect {
print("selectdefault s=", sel, " pc=", hex(cas.pc), "n")
}
}
select的执行:
pollorder:保存的是scase的序号,乱序是为了之后执行时的随机性。
lockorder:保存了所有case中channel的地址,这里按照地址大小堆排了一下lockorder对应的这片连续内存。对chan排序是为了去重,保证之后对所有channel上锁时不会重复上锁。
select 语句执行时会对整个chanel加锁
select 语句会创建select对象 如果放在for循环中长期执行可能会频繁的分配内存
select执行过程总结如下:
通过pollorder的序号,遍历scase找出已经准备好的case。如果有就执行普通的chan读写操作。其中准备好的case是指 可以不阻塞完成读写chan的case,或者读已经关闭的chan的case。
如果没有准备好的case,则尝试defualt case。
如果以上都没有,则把当前的 G (协程)封装好挂到scase所有chan的阻塞链表中,按照chan的操作类型挂到sendq或recvq中。
这个 G 被某个chan唤醒,遍历scase找到目标case,放弃当前G在其他chan中的等待,返回。
在说执行之前我们先来看一下加锁解锁:
// 对所有 case 对应的 channel 加锁
// 需要按照 lockorder 数组中的元素索引来搞
// 否则可能有循环等待的死锁
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != nil && c0 != c {
c = c0
// 其实也是用对应的hchan中的mutex去对当前hchan操作
// hchan 被定义在 ./src/runtime/chan.go 中,就是channel的定义
lock(&c.lock)
}
}
}
// 解锁,比较简单
func selunlock(scases []scase, lockorder []uint16) {
// We must be very careful here to not touch sel after we have unlocked
// the last lock, because sel can be freed right after the last unlock.
// Consider the following situation.
// First M calls runtime·park() in runtime·selectgo() passing the sel.
// Once runtime·park() has unlocked the last lock, another M makes
// the G that calls select runnable again and schedules it for execution.
// When the G runs on another M, it locks all the locks and frees sel.
// Now if the first M touches sel, it will access freed memory.
/**
我们必须非常小心,在我们解锁最后一个锁之后不要触摸sel,因为sel可以在最后一次解锁后立即释放。
考虑以下情况:
第一个M在运行时调用runtime·park()·selectgo()传递sel。
一旦runtime·park()解锁了最后一个锁,另一个M使得再次调用select runnable的G并安排它执行。
当G在另一个M上运行时,它会锁定所有锁并释放sel。
现在,如果第一个M接触sel,它将访问释放的内存。
*/
for i := len(scases) - 1; i >= 0; i-- {
c := scases[lockorder[i]].c
if c == nil {
break
}
if i > 0 && c == scases[lockorder[i-1]].c {
continue // will unlock it on the next iteration
}
// 其实也是用对应的hchan中的mutex去对当前hchan操作
// hchan 被定义在 ./src/runtime/chan.go 中,就是channel的定义
unlock(&c.lock)
}
}
/**
和 gopark 相关的
*/
func selparkcommit(gp *g, _ unsafe.Pointer) bool {
// This must not access gp's stack (see gopark). In
// particular, it must not access the *hselect. That's okay,
// because by the time this is called, gp.waiting has all
// channels in lock order.
/**
这不能访问gp的堆栈(参见gopark)。 特别是,它不能访问* hselect。 没关系,因为在调用它的时候,gp.waiting的所有通道都处于锁定状态。
*/
var lastc *hchan
for sg := gp.waiting; sg != nil; sg = sg.waitlink {
if sg.c != lastc && lastc != nil {
// As soon as we unlock the channel, fields in
// any sudog with that channel may change,
// including c and waitlink. Since multiple
// sudogs may have the same channel, we unlock
// only after we've passed the last instance
// of a channel.
/**
一旦我们解锁频道,任何具有该频道的sudog中的字段都可能发生变化,包括c和 waitlink。 由于多个sudog可能具有相同的通道,因此只有在我们通过了通道的最后一个实例后才会解锁。
*/
unlock(&lastc.lock)
}
lastc = sg.c
}
if lastc != nil {
unlock(&lastc.lock)
}
return true
}
下面我们来说一说,如何执行select吧,具体代码如下:
// selectgo 是在初始化完成之后执行 select 逻辑的函数
// 返回值是要执行的 scase 的 index
func selectgo(sel *hselect) int {
if debugSelect {
print("select: sel=", sel, "n")
}
if sel.ncase != sel.tcase {
throw("selectgo: case count mismatch")
}
// len 和 cap 就是 scase 的总数
// 这时候 ncase 和 tcase 已经是相等的了
scaseslice := slice{unsafe.Pointer(&sel.scase), int(sel.ncase), int(sel.ncase)}
scases := *(*[]scase)(unsafe.Pointer(&scaseslice))
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
for i := 0; i < int(sel.ncase); i {
scases[i].releasetime = -1
}
}
// The compiler rewrites selects that statically have
// only 0 or 1 cases plus default into simpler constructs.
// The only way we can end up with such small sel.ncase
// values here is for a larger select in which most channels
// have been nilled out. The general code handles those
// cases correctly, and they are rare enough not to bother
// optimizing (and needing to test).
/**
编译器将静态只有0或1个案例的选择重写为更简单的构造。 我们在这里得到如此小的sel.ncase值的唯一方法是选择大多数通道已被填充的更大选择。 通用代码正确处理这些情况,并且它们非常罕见,不会打扰优化(并且需要测试)。
*/
// generate permuted order
// 生成置换顺序
pollslice := slice{unsafe.Pointer(sel.pollorder), int(sel.ncase), int(sel.ncase)}
pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))
/**
洗牌
*/
for i := 1; i < int(sel.ncase); i {
j := fastrandn(uint32(i 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
/**
通过Hchan地址对案例进行排序以获得锁定顺序。
简单的堆排序,以保证n log n时间和不断的堆栈占用。
*/
// 按 hchan 的地址来进行排序,以生成加锁顺序
// 用堆排序来保证 nLog(n) 的时间复杂度
lockslice := slice{unsafe.Pointer(sel.lockorder), int(sel.ncase), int(sel.ncase)}
lockorder := *(*[]uint16)(unsafe.Pointer(&lockslice))
for i := 0; i < int(sel.ncase); i {
// 初始化 lockorder 数组
j := i
// Start with the pollorder to permute cases on the same channel.
// 从pollorder开始,在同一频道上置换案例。
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := int(sel.ncase) - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 1
if k >= i {
break
}
if k 1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k 1]].c.sortkey() {
k
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
/*
for i := 0; i 1 < int(sel.ncase); i {
if scases[lockorder[i]].c.sortkey() > scases[lockorder[i 1]].c.sortkey() {
print("i=", i, " x=", lockorder[i], " y=", lockorder[i 1], "n")
throw("select: broken sort")
}
}
*/
// lock all the channels involved in the select
// 锁定选择中涉及的所有通道
// 对涉及到的所有 channel 都加锁
sellock(scases, lockorder)
var (
gp *g
done uint32
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
/**
走到这里,说明各种准备工作已经完成,要开始真正干活了
*/
loop:
// pass 1 - look for something already waiting
// 第一部分: 寻找已经等待的东西
var dfli int
var dfl *scase
var casi int
var cas *scase
// 虽然看着是一个 for 循环
// 但实际上有 case ready 的时候
// 直接就用 goto 跳出了
for i := 0; i < int(sel.ncase); i {
// 按 pollorder 的顺序进行遍历
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch cas.kind {
case caseNil:
continue
case caseRecv:
// <- ch 的情况
// 根据有没有等待的 goroutine 队列执行不同的操作
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
case caseSend:
// ch <- 1 的情况,也是一些基本的 channel 操作
if raceenabled {
racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)
}
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
case caseDefault:
dfli = casi
dfl = cas
}
}
// 这里是在前面进了 caseDefault 才会走到
if dfl != nil {
selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}
// pass 2 - enqueue on all chans
// 第二部分:在所有的chan上排队
// 没有任何一个 case 满足,且没有 default
// 这种情况下需要把当前的 goroutine 入队所有 channel 的等待队列里
gp = getg()
done = 0
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
// 按照加锁的顺序把 gorutine 入每一个 channel 的等待队列
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {
continue
}
c = cas.c
sg := acquireSudog()
sg.g = gp
// Note: selectdone is adjusted for stack copies in stack1.go:adjustsudogs
// selectdone在stack1.go:adjustsudogs中针对堆栈副本进行了调整
sg.selectdone = (*uint32)(noescape(unsafe.Pointer(&done)))
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
// 在gp.waiting上分配elem和排队sg之间没有堆栈分割,copystack可以在其中找到它。
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
// 按锁定顺序构造等待列表。
*nextp = sg
nextp = &sg.waitlink
switch cas.kind {
case caseRecv:
// recv 的情况进 recvq
c.recvq.enqueue(sg)
case caseSend:
// send 的情况进 sendq
c.sendq.enqueue(sg)
}
}
// wait for someone to wake us up
// 等待有人叫醒我们
gp.param = nil
// 当前 goroutine 进入休眠,同时解锁channel,等待被唤醒(selparkcommit这里实现的)
gopark(selparkcommit, nil, "select", traceEvGoBlockSelect, 1)
// While we were asleep, some goroutine came along and completed
// one of the cases in the select and woke us up (called ready).
// As part of that process, the goroutine did a cas on done above
// (aka *sg.selectdone for all queued sg) to win the right to
// complete the select. Now done = 1.
//
// If we copy (grow) our own stack, we will update the
// selectdone pointers inside the gp.waiting sudog list to point
// at the new stack. Another goroutine attempting to
// complete one of our (still linked in) select cases might
// see the new selectdone pointer (pointing at the new stack)
// before the new stack has real data; if the new stack has done = 0
// (before the old values are copied over), the goroutine might
// do a cas via sg.selectdone and incorrectly believe that it has
// won the right to complete the select, executing a second
// communication and attempting to wake us (call ready) again.
//
// Then things break.
//
// The best break is that the goroutine doing ready sees the
// _Gcopystack status and throws, as in #17007.
// A worse break would be for us to continue on, start running real code,
// block in a semaphore acquisition (sema.go), and have the other
// goroutine wake us up without having really acquired the semaphore.
// That would result in the goroutine spuriously running and then
// queue up another spurious wakeup when the semaphore really is ready.
// In general the situation can cascade until something notices the
// problem and causes a crash.
//
// A stack shrink does not have this problem, because it locks
// all the channels that are involved first, blocking out the
// possibility of a cas on selectdone.
//
// A stack growth before gopark above does not have this
// problem, because we hold those channel locks (released by
// selparkcommit).
//
// A stack growth after sellock below does not have this
// problem, because again we hold those channel locks.
//
// The only problem is a stack growth during sellock.
// To keep that from happening, run sellock on the system stack.
//
// It might be that we could avoid this if copystack copied the
// stack before calling adjustsudogs. In that case,
// syncadjustsudogs would need to recopy the tiny part that
// it copies today, resulting in a little bit of extra copying.
//
// An even better fix, not for the week before a release candidate,
// would be to put space in every sudog and make selectdone
// point at (say) the space in the first sudog.
// 有故事发生,被唤醒,再次该select下全部channel加锁
systemstack(func() {
sellock(scases, lockorder)
})
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
// 被唤醒后执行下面的代码
/**
从不成功的chans出发,否则他们在安静的频道上叠加记录成功案例,如果有的话。 我们以锁定顺序单独链接SudoGs。
*/
casi = -1
cas = nil
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
// 在从gp.waiting取消链接之前清除所有元素。
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.selectdone = nil
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
for _, casei := range lockorder {
k = &scases[casei]
if k.kind == caseNil {
continue
}
if sglist.releasetime > 0 {
k.releasetime = sglist.releasetime
}
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
} else {
c = k.c
if k.kind == caseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
// 如果还是没有的话再次走 loop 逻辑
if cas == nil {
// We can wake up with gp.param == nil (so cas == nil)
// when a channel involved in the select has been closed.
// It is easiest to loop and re-run the operation;
// we'll see that it's now closed.
// Maybe some day we can signal the close explicitly,
// but we'd have to distinguish close-on-reader from close-on-writer.
// It's easiest not to duplicate the code and just recheck above.
// We know that something closed, and things never un-close,
// so we won't block again.
goto loop
}
c = cas.c
if debugSelect {
print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "n")
}
if cas.kind == caseRecv {
if cas.receivedp != nil {
*cas.receivedp = true
}
}
if raceenabled {
if cas.kind == caseRecv && cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
} else if cas.kind == caseSend {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
}
if msanenabled {
if cas.kind == caseRecv && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
} else if cas.kind == caseSend {
msanread(cas.elem, c.elemtype.size)
}
}
selunlock(scases, lockorder)
goto retc
bufrecv:
// can receive from buffer
if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
}
raceacquire(chanbuf(c, c.recvx))
racerelease(chanbuf(c, c.recvx))
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
if cas.receivedp != nil {
*cas.receivedp = true
}
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
bufsend:
// can send to buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
racerelease(chanbuf(c, c.sendx))
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount
selunlock(scases, lockorder)
goto retc
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncrecv: sel=", sel, " c=", c, "n")
}
if cas.receivedp != nil {
*cas.receivedp = true
}
goto retc
rclose:
// read at end of closed channel
selunlock(scases, lockorder)
if cas.receivedp != nil {
*cas.receivedp = false
}
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
goto retc
send:
// can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: sel=", sel, " c=", c, "n")
}
goto retc
retc:
if cas.releasetime > 0 {
blockevent(cas.releasetime-t0, 1)
}
return casi
sclose:
// send on closed channel
// 向关闭 channel 发送数据
// 直接 panic
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}
在上述代码中具体做了些什么事呢?下面我们来分析分析。
case洗牌 -> case堆排去重 -> 所有channel 上锁 -> 遍历(洗牌后的)case找出第一个可以执行的case -> 如果没有就执行default -> 如果都没有就把当前G挂起到所有case 所对应的chan等待唤醒,同时解锁所有channel -> 被唤醒后再次对所有channel上锁 -> 最后一次循环操作,获取可执行case,其余全部出队列丢弃 -> 没有的话再次走 loop 逻辑
可以用一张图表示:(图参考自:https://blog.csdn.net/u011957758/article/details/82230316)
最后:
select对所有涉及到的channel都加锁。
在对所有的channel枷锁之前,有一个对所有的channel做了一次堆排的动作,目的为了去重channel
select并不是随机选择case去执行,而是事先将所有的case进行洗牌,然后再从头到尾去遍历,选择出第一个可以执行的case。
在for {} 结构中的 select 每一次for 都会经历上述的 4各阶段,创建 -> 注册 -> 执行 -> 释放;所以select的执行是有代价的而且代价不低。