internal包下也封装了一个同名的pollDesc对象,不过是一个指针(关于internal有个细节就是这个包是不能被外部调用):
代码语言:javascript复制type pollDesc struct {
runtimeCtx uintptr
}
其实最终都是对runtime底下的调用,只不过封装了一些易用的方法,比如read,write,做了一些抽象化的处理。
代码语言:javascript复制func runtime_pollServerInit() //初始化
func runtime_pollOpen(fd uintptr) (uintptr, int) //打开
func runtime_pollClose(ctx uintptr) //关闭
func runtime_pollWait(ctx uintptr, mode int) int //等待
func runtime_pollWaitCanceled(ctx uintptr, mode int) int //等待并(失败时)退出
func runtime_pollReset(ctx uintptr, mode int) int //重置状态,复用
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) //设置读/写超时时间
func runtime_pollUnblock(ctx uintptr) // 解锁
func runtime_isPollServerDescriptor(fd uintptr) bool
// 这里的ctx实际上是一个io fd,不是上下文
// mod 是 r 或者 w ,io事件毕竟只有有这两种
// d 意义和time.d差不多,就是关于时间的
这些方法的具体实现都在runtime下,我们挑几个重要的看看:
代码语言:javascript复制//将就绪好得io事件,写入就绪的grotion对列
// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r' 'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This may run while the world is stopped, so write barriers are not allowed.
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r' 'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r' 'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
代码语言:javascript复制//轮询时调用的方法,如果io就绪了返回ok,如果没就绪,返回flase
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == 0 {
//gopark是很重要得一个方法,本质上是让出当前协程执行权,一般是返回到g0让g0重新调度
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
//获取到当前io所在的协程,如果协程已关闭,直接返回nil
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set pdReady for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
if atomic.Casuintptr(gpp, old, new) {
if old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}
思考:
- a、b两个协程,b io阻塞,完成后,一直没有获取到调度权,会出现什么后果。
- a、b两个协程,b io阻塞,2s time out,但是a一直占用执行权,b一直没有获取到调度权,5s后才获得到,b对使用端已经超时,这时候是超时还是不超时
所以设置的timeout,不一定是真实的io waiting,可能是没有获取到执行权。