与其他语言的网络IO强调异步非阻塞不同,GOLANG里的网络IO模型是:创建多个goroutine,每个goroutine的网络IO都是阻塞的,这样的代码非常直观
但低层,所有的网络IO实际上都是非阻塞的
以net.Dial为例子,其他的Read/Write机制类似
Read的原理:
for {
fd.read()
if err == EAGAIN {
pollserver.WaitRead(fd)
continue
}
break }
网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,我们选择忽略所有的错误。这样可以使得代码看起来更为简单。 而且我们只关注tcp协议实现,udp和unix socket不是我们关心的。
代码语言:javascript复制func Listen(net, laddr string) (Listener, error) {
la, err := resolveAddr("listen", net, laddr, noDeadline)
......
switch la := la.toAddr().(type) {
case *TCPAddr:
l, err = ListenTCP(net, la)
case *UnixAddr:
......
}
......
}
// 对于tcp协议,返回的的是TCPListener
func ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) {
......
fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")
......
return &TCPListener{fd}, nil
}
func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) {
......
return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)
}
func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) {
// 创建底层socket,设置属性为O_NONBLOCK
s, err := sysSocket(family, sotype, proto)
......
setDefaultSockopts(s, family, sotype, ipv6only)
// 创建新netFD结构
fd, err = newFD(s, family, sotype, net)
......
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// 调用底层listen监听创建的套接字
fd.listenStream(laddr, listenerBacklog)
return fd, nil
case syscall.SOCK_DGRAM:
......
}
}
}
// 最终调用该函数来创建一个socket
// 并且将socket属性设置为O_NONBLOCK
func sysSocket(family, sotype, proto int) (int, error) {
syscall.ForkLock.RLock()
s, err := syscall.Socket(family, sotype, proto)
if err == nil {
syscall.CloseOnExec(s)
}
syscall.ForkLock.RUnlock()
if err != nil {
return -1, err
}
if err = syscall.SetNonblock(s, true); err != nil {
syscall.Close(s)
return -1, err
}
return s, nil
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {
if err := setDefaultListenerSockopts(fd.sysfd)
if lsa, err := laddr.sockaddr(fd.family); err != nil {
return err
} else if lsa != nil {
// Bind绑定至该socket
if err := syscall.Bind(fd.sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
}
// 监听该socket
if err := syscall.Listen(fd.sysfd, backlog);
// 这里非常关键:初始化socket与异步IO相关的内容
if err := fd.init(); err != nil {
return err
}
lsa, _ := syscall.Getsockname(fd.sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
我们这里看到了如何实现Listen。流程基本都很简单,但是因为我们使用了异步编程,因此,我们在Listen完该socket后,还必须将其添加到监听队列中,以后该socket有事件到来时能够及时通知到。
对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那我们看对一个监听socket,是如何将其添加到epoll的监听队列中呢?
代码语言:javascript复制func (fd *netFD) init() error {
if err := fd.pd.Init(fd); err != nil {
return err
}
return nil
}
func (pd *pollDesc) Init(fd *netFD) error {
// 利用了Once机制,保证一个进程只会执行一次
// runtime_pollServerInit:
// TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0
// JMP runtime·netpollServerInit(SB)
serverInit.Do(runtime_pollServerInit)
// runtime_pollOpen:
// TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0
// JMP runtime·netpollOpen(SB)
ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
if errno != 0 {
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
这里就是socket异步编程的关键:
netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;
代码语言:javascript复制func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 {
return
}
epfd = epollcreate(1024)
if epfd >= 0 {
closeonexec(epfd)
return
}
......
}
netpollOpen则在socket被创建出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen。
代码语言:javascript复制func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
OK,看到这里,我们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,然后将该socket添加到 epoll的事件监听队列中。
Accept
既然我们描述的重点的tcp协议,因此,我们看看TCPListener的Accept方法是怎么实现的:
代码语言:javascript复制func (l *TCPListener) Accept() (Conn, error) {
c, err := l.AcceptTCP()
......
}
func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
......
fd, err := l.fd.accept()
......
// 返回给调用者一个新的TCPConn
return newTCPConn(fd), nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
// 为什么对该函数加读锁?
if err := fd.readLock(); err != nil {
return nil, err
}
defer fd.readUnlock()
......
for {
// 这个accept是golang包装的系统调用
// 用来处理跨平台
s, rsa, err = accept(fd.sysfd)
if err != nil {
if err == syscall.EAGAIN {
// 如果没有可用连接,WaitRead()阻塞该协程
// 后面会详细分析WaitRead.
if err = fd.pd.WaitRead(); err == nil {
continue
}
} else if err == syscall.ECONNABORTED {
// 如果连接在Listen queue时就已经被对端关闭
continue
}
}
break
}
netfd, err = newFD(s, fd.family, fd.sotype, fd.net)
......
// 这个前面已经分析,将该fd添加到epoll队列中
err = netfd.init()
......
lsa, _ := syscall.Getsockname(netfd.sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
从前面的编程事例中我们知道,一般在主协程中会accept新的connection,使用异步编程我们知道,如果没有 新连接到来,该协程会一直被阻塞,直到新连接到来有人唤醒了该协程。
一般在主协程中调用accept,如果返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来时被唤醒,WaitRead以及唤醒过程我们会在后面仔细分析。
Read
代码语言:javascript复制
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
return c.fd.Read(b)
}
func (fd *netFD) Read(p []byte) (n int, err error) {
// 为什么对函数调用加读锁
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
// 这个又是干嘛?
if err := fd.pd.PrepareRead(); err != nil {
return 0, &OpError{"read", fd.net, fd.raddr, err}
}
for {
n, err = syscall.Read(int(fd.sysfd), p)
if err != nil {
n = 0
// 如果返回EAGIN,阻塞当前协程直到有数据可读被唤醒
if err == syscall.EAGAIN {
if err = fd.pd.WaitRead(); err == nil {
continue
}
}
}
// 检查错误,封装io.EOF
err = chkReadErr(n, err, fd)
break
}
if err != nil && err != io.EOF {
err = &OpError{"read", fd.net, fd.raddr, err}
}
return
}
func chkReadErr(n int, err error, fd *netFD) error {
if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
return io.EOF
}
return err
}
每次Read不能保证可以读到想读的那么多内容,比如缓冲区大小是10,而实际可能只读到5,应用程序需要能够处理这种情况。
Write
代码语言:javascript复制func (fd *netFD) Write(p []byte) (nn int, err error) {
// 为什么这里加写锁
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
// 这个是干什么?
if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
// nn记录总共写入的数据量,每次Write可能只能写入部分数据
for {
var n int
n, err = syscall.Write(int(fd.sysfd), p[nn:])
if n > 0 {
nn = n
}
// 如果数组数据已经全部写完,函数返回
if nn == len(p) {
break
}
// 如果写入数据时被block了,阻塞当前协程
if err == syscall.EAGAIN {
if err = fd.pd.WaitWrite(); err == nil {
continue
}
}
if err != nil {
n = 0
break
}
// 如果返回值为0,代表了什么?
if n == 0 {
err = io.ErrUnexpectedEOF
break
}
}
if err != nil {
err = &OpError{"write", fd.net, fd.raddr, err}
}
return nn, err
}
注意Write语义与Read不一样的地方:
Write尽量将用户缓冲区的内容全部写入至底层socket,如果遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时立即返回,可能会导致读取的数据量少于用户缓冲区的大小; 为什么会在实现上有此不同,我想可能read的优先级比较高吧,应用程序可能一直在等着,我们不能等到数据一直读完才返回,会阻塞用户。 而写不一样,优先级相对较低,而且用户一般也不着急写立即返回,所以可以将所有的数据全部写入,而且这样 也能简化应用程序的写法。
当系统调用返回EAGAIN时,会调用WaitRead/WaitWrite来阻塞当前协程,现在我们接着聊。
WaitRead/WaitWrite
代码语言:javascript复制func (pd *pollDesc) Wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res)
}
func (pd *pollDesc) WaitRead() error {
return pd.Wait('r')
}
func (pd *pollDesc) WaitWrite() error {
return pd.Wait('w')
}
最终runtime_pollWait走到下面去了:
代码语言:javascript复制TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0
JMP runtime·netpollWait(SB)
我们仔细考虑应该明白:netpollWait的主要作用是:等待关心的socket是否有事件(其实后面我们知道只是等待一个标记位是否发生改变),如果没有事件,那么就将当前的协程挂起,直到有通知事件发生,我们接下来看看到底如何实现:
代码语言:javascript复制func netpollWait(pd *pollDesc, mode int) int {
// 先检查该socket是否有error发生(如关闭、超时等)
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris uses level-triggered IO.
if GOOS == "solaris" {
onM(func() {
netpollarm(pd, mode)
})
}
// 循环等待netpollblock返回值为true
// 如果返回值为false且该socket未出现任何错误
// 那该协程可能被意外唤醒,需要重新被挂起
// 还有一种可能:该socket由于超时而被唤醒
// 此时netpollcheckerr就是用来检测超时错误的
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 首先将轮询状态设置为pdWait
// 为什么要使用for呢?因为casuintptr使用了自旋锁
// 为什么使用自旋锁就要加for循环呢?
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
gothrow("netpollblock: double wait")
}
// 将socket轮询相关的状态设置为pdWait
if casuintptr(gpp, 0, pdWait) {
break
}
}
// 如果未出错将该协程挂起,解锁函数是netpollblockcommit
if waitio || netpollcheckerr(pd, mode) == 0 {
f := netpollblockcommit
gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
}
// 可能是被挂起的协程被唤醒
// 或者由于某些原因该协程压根未被挂起
// 获取其当前状态记录在old中
old := xchguintptr(gpp, 0)
if old > pdWait {
gothrow("netpollblock: corrupted state")
}
return old == pdReady
}
从上面的分析我们看到,如果无法读写,golang会将当前协程挂起,在协程被唤醒的时候,该标记位应该会被置位。 我们接下来看看这些挂起的协程何时会被唤醒。
事件通知
golang运行库在系统运行过程中存在socket事件检查点,目前,该检查点主要位于以下几个地方:
runtime·startTheWorldWithSema(void):在完成gc后;
findrunnable():这个暂时不知道何时会触发?
sysmon:golang中的监控协程,会周期性检查就绪socket
TODO: 为什么是在这些地方检查socket就绪事件呢?
接下来我们看看如何检查socket就绪事件,在socket就绪后又是如何唤醒被挂起的协程?主要调用函数runtime-netpoll()
我们只关注epoll的实现,对于epoll,上面的方法具体实现是netpoll_epoll.go中的netpoll
代码语言:javascript复制func netpoll(block bool) (gp *g) {
if epfd == -1 {
return
}
waitms := int32(-1)
if !block {
// 如果调用者不希望block
// 设置waitsm为0
waitms = 0
}
var events [128]epollevent
retry:
// 调用epoll_wait获取就绪事件
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
...
}
goto retry
}
for i := int32(0); i < n; i {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode = 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode = 'w'
}
// 对每个事件,调用了netpollready
// pd主要记录了与该socket关联的等待协程
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
}
}
// 如果调用者同步等待且本次未获取到就绪socket
// 继续重试
if block && gp == nil {
goto retry
}
return gp
}
这个函数主要调用epoll_wait(当然,golang封装了系统调用)来获取就绪socket fd,对每个就绪的fd,调用netpollready()作进一步处理。这个函数的最终返回值就是一个已经就绪的协程(g)链表。
netpollready主要是将该socket fd标记为IOReady,并唤醒等待在该fd上的协程g,将其添加到传入的g链表中。
代码语言:javascript复制// make pd ready, newly runnable goroutines (if any) are returned in rg/wg
func netpollready(gpp **g, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r' 'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r' 'w' {
wg = netpollunblock(pd, 'w', true)
}
// 将就绪协程添加至链表中
if rg != nil {
rg.schedlink = *gpp
*gpp = rg
}
if wg != nil {
wg.schedlink = *gpp
*gpp = wg
}
}
// 将pollDesc的状态置为pdReady并返回就绪协程
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
if casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}
疑问:一个fd会被多个协程同时进行IO么?比如一个协程读,另外一个协程写?或者多个协程同时读?此时返回的是哪个协程就绪呢?
一个socket fd可支持并发读写,因为对于tcp协议来说,是全双工。读写操作的是不同缓冲区,但是不支持并发读和并发写,因为这样会错乱的。所以上面的netFD.RWLock()就是干这个作用的。
runtime中的epoll事件驱动抽象层其实在进入net库后,又被封装了一次,这一次封装从代码上看主要是为了方便在纯Go语言环境进行操作,net库中的这次封装实现在poll/fd_poll_runtime.go文件中,主要是通过pollDesc对象来实现的: (ps: 这里对应的版本是go1.9.1 的版本)
代码语言:javascript复制type pollDesc struct {
runtimeCtx uintptr
}
注意:此处的pollDesc对象不是上文提到的runtime中的PollDesc,相反此处pollDesc对象的runtimeCtx成员才是指向的runtime的PollDesc实例。pollDesc对象主要就是将runtime的事件驱动抽象层给再封装了一次,供网络fd对象使用。
代码语言:javascript复制func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
pollDesc对象最需要关注的就是其Init方法,这个方法通过一个sync.Once变量来调用了runtime_pollServerInit函数,也就是创建epoll实例的函数。 意思就是runtime_pollServerInit函数在整个进程生命周期内只会被调用一次,也就是只会创建一次epoll实例。epoll实例被创建后,会调用runtime_pollOpen函数将fd添加到epoll中。
网络编程中的所有socket fd都是通过netFD对象实现的,netFD是对网络IO操作的抽象,linux的实现在文件net/fd_unix.go中。netFD对象实现有自己的init方法,还有完成基本IO操作的Read和Write方法,当然除了这三个方法以外,还有很多非常有用的方法供用户使用。
/src/net/fd_unix.go
代码语言:javascript复制// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool
net string
laddr Addr
raddr Addr
}
通过netFD对象的定义可以看到每个fd都关联了一个pollDesc实例,通过上文我们知道pollDesc对象最终是对epoll的封装。
代码语言:javascript复制func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
netFD对象的init函数仅仅是调用了pollDesc实例的Init函数,作用就是将fd添加到epoll中,如果这个fd是第一个网络socket fd的话,这一次init还会担任创建epoll实例的任务。要知道在Go进程里,只会有一个epoll实例来管理所有的网络socket fd,这个epoll实例也就是在第一个网络socket fd被创建的时候所创建。
/src/net/fd_unix.go Read()函数:
代码语言:javascript复制// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
重点关注这个for循环中的syscall.Read调用的错误处理。当有错误发生的时候,会检查这个错误是否是syscall.EAGAIN,如果是,则调用WaitRead将当前读这个fd的goroutine给park住,直到这个fd上的读事件再次发生为止。 当这个socket上有新数据到来的时候,WaitRead调用返回,继续for循环的执行。这样的实现,就让调用netFD的Read的地方变成了同步“阻塞”方式编程,不再是异步非阻塞的编程方式了。netFD的Write方法和Read的实现原理是一样的,都是在碰到EAGAIN错误的时候将当前goroutine给park住直到socket再次可写为止。
本文只是将网络库的底层实现给大体上引导了一遍,知道底层代码大概实现在什么地方,方便结合源码深入理解。Go语言中的高并发、同步阻塞方式编程的关键其实是”goroutine和调度器”,针对网络IO的时候,我们需要知道EAGAIN这个非常关键的调度点,掌握了这个调度点,即使没有调度器,自己也可以在epoll的基础上配合协程等用户态线程实现网络IO操作的调度,达到同步阻塞编程的目的。
最后,为什么需要同步阻塞的方式编程?只有看多、写多了异步非阻塞代码的时候才能够深切体会到这个问题。真正的高大上绝对不是——“别人不会,我会;别人写不出来,我写得出来。”
EAGAIN:
代码语言:javascript复制ET还是LT?
LT的处理过程:
. accept一个连接,添加到epoll中监听EPOLLIN事件
. 当EPOLLIN事件到达时,read fd中的数据并处理
. 当需要写出数据时,把数据write到fd中;如果数据较大,无法一次性写出,那么在epoll中监听EPOLLOUT事件
. 当EPOLLOUT事件到达时,继续把数据write到fd中;如果数据写出完毕,那么在epoll中关闭EPOLLOUT事件
ET的处理过程:
. accept一个一个连接,添加到epoll中监听EPOLLIN|EPOLLOUT事件
. 当EPOLLIN事件到达时,read fd中的数据并处理,read需要一直读,直到返回EAGAIN为止
. 当需要写出数据时,把数据write到fd中,直到数据全部写完,或者write返回EAGAIN
. 当EPOLLOUT事件到达时,继续把数据write到fd中,直到数据全部写完,或者write返回EAGAIN
从ET的处理过程中可以看到,ET的要求是需要一直读写,直到返回EAGAIN,否则就会遗漏事件。而LT的处理过程中,直到返回EAGAIN不是硬性要求,但通常的处理过程都会读写直到返回EAGAIN,但LT比ET多了一个开关EPOLLOUT事件的步骤
LT的编程与poll/select接近,符合一直以来的习惯,不易出错
ET的编程可以做到更加简洁,某些场景下更加高效,但另一方面容易遗漏事件,容易产生bug