internal包下也封装了一个同名的pollDesc对象

2021-10-08 14:04:50 浏览数 (2)

internal包下也封装了一个同名的pollDesc对象,不过是一个指针(关于internal有个细节就是这个包是不能被外部调用):

代码语言:javascript复制
type pollDesc struct {
	runtimeCtx uintptr
}

其实最终都是对runtime底下的调用,只不过封装了一些易用的方法,比如read,write,做了一些抽象化的处理。

代码语言:javascript复制
func runtime_pollServerInit()  //初始化
func runtime_pollOpen(fd uintptr) (uintptr, int)  //打开
func runtime_pollClose(ctx uintptr)   //关闭
func runtime_pollWait(ctx uintptr, mode int) int //等待
func runtime_pollWaitCanceled(ctx uintptr, mode int) int  //等待并(失败时)退出
func runtime_pollReset(ctx uintptr, mode int) int  //重置状态,复用
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) //设置读/写超时时间
func runtime_pollUnblock(ctx uintptr)  // 解锁 
func runtime_isPollServerDescriptor(fd uintptr) bool  
// 这里的ctx实际上是一个io fd,不是上下文
// mod 是 r 或者 w  ,io事件毕竟只有有这两种
// d 意义和time.d差不多,就是关于时间的

这些方法的具体实现都在runtime下,我们挑几个重要的看看:

代码语言:javascript复制
//将就绪好得io事件,写入就绪的grotion对列
// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r' 'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This may run while the world is stopped, so write barriers are not allowed.
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	if mode == 'r' || mode == 'r' 'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r' 'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
}
代码语言:javascript复制
//轮询时调用的方法,如果io就绪了返回ok,如果没就绪,返回flase
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	// set the gpp semaphore to pdWait
	for {
		old := *gpp
		if old == pdReady {
			*gpp = 0
			return true
		}
		if old != 0 {
			throw("runtime: double wait")
		}
		if atomic.Casuintptr(gpp, 0, pdWait) {
			break
		}
	}

	// need to recheck error states after setting gpp to pdWait
	// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
	// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
	if waitio || netpollcheckerr(pd, mode) == 0 {
	  //gopark是很重要得一个方法,本质上是让出当前协程执行权,一般是返回到g0让g0重新调度
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	// be careful to not lose concurrent pdReady notification
	old := atomic.Xchguintptr(gpp, 0)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
	return old == pdReady
}

//获取到当前io所在的协程,如果协程已关闭,直接返回nil
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	for {
		old := *gpp
		if old == pdReady {
			return nil
		}
		if old == 0 && !ioready {
			// Only set pdReady for ioready. runtime_pollWait
			// will check for timeout/cancel before waiting.
			return nil
		}
		var new uintptr
		if ioready {
			new = pdReady
		}
		if atomic.Casuintptr(gpp, old, new) {
			if old == pdWait {
				old = 0
			}
			return (*g)(unsafe.Pointer(old))
		}
	}
}

思考:

  1. a、b两个协程,b io阻塞,完成后,一直没有获取到调度权,会出现什么后果。
  2. a、b两个协程,b io阻塞,2s time out,但是a一直占用执行权,b一直没有获取到调度权,5s后才获得到,b对使用端已经超时,这时候是超时还是不超时

所以设置的timeout,不一定是真实的io waiting,可能是没有获取到执行权。

0 人点赞