libuv源码分析之stream第一篇

2020-05-07 16:05:10 浏览数 (2)

流的实现在libuv里占了很大篇幅,今天分析一下流的实现。首先看数据结构。流在libuv里用uv_stream_s表示,他属于handle族。继承于uv_handle_s。

代码语言:c 复制
struct uv_stream_s {
  // uv_handle_s的字段
  void* data;        
  // 所属事件循环   
  uv_loop_t* loop;  
  // handle类型    
  uv_handle_type type;  
  // 关闭handle时的回调
  uv_close_cb close_cb; 
  // 用于插入事件循环的handle队列
  void* handle_queue[2];
  union {               
    int fd;             
    void* reserved[4];  
  } u;      
  // 用于插入事件循环的closing阶段对应的队列
  uv_handle_t* next_closing; 
  // 各种标记 
  unsigned int flags;
  // 流拓展的字段
  // 用户写入流的字节大小,流缓存用户的输入,然后等到可写的时候才做真正的写
  size_t write_queue_size; 
  // 分配内存的函数,内存由用户定义,主要用来保存读取的数据               
  uv_alloc_cb alloc_cb;  
  // 读取数据的回调                 
  uv_read_cb read_cb; 
  // 连接成功后,执行connect_req的回调(connect_req在uv__xxx_connect中赋值)
  uv_connect_t *connect_req; 
  // 关闭写端的时候,发送完缓存的数据,执行shutdown_req的回调(shutdown_req在uv_shutdown的时候赋值)    
  uv_shutdown_t *shutdown_req;
  // 流对应的io观察者,即文件描述符 一个文件描述符事件触发时执行的回调   
  uv__io_t io_watcher;  
  // 流缓存下来的,待写的数据         
  void* write_queue[2];       
  // 已经完成了数据写入的队列   
  void* write_completed_queue[2];
  // 完成三次握手后,执行的回调
  uv_connection_cb connection_cb;
  // 操作流时出错码
  int delayed_error;  
  // accept返回的通信socket对应的文件描述符           
  int accepted_fd;    
  // 同上,用于缓存更多的通信socket对应的文件描述符           
  void* queued_fds;
}

流的实现中,最核心的字段是io观察者,其余的字段是和流的性质相关的。io观察者封装了流对应的文件描述符和文件描述符事件触发时的回调。比如读一个流,写一个流,关闭一个流,连接一个流,监听一个流,在uv_stream_s中都有对应的字段去支持。但是本质上是靠io观察者去驱动的。 1 读一个流,就是io观察者中的文件描述符。可读事件触发时,执行用户的读回调。 2 写一个流,先把数据写到流中,然后io观察者中的文件描述符。可写事件触发时,执行最后的写入,并执行用户的写完成回调。 3 关闭一个流,就是io观察者中的文件描述符。可写事件触发时,如果待写的数据已经写完(比如发送完),然后执行关闭流的写端。接着执行用户的回调。 4 连接一个流,比如作为客户端去连接服务器。就是io观察者中的文件描述符。可读事件触发时(建立三次握手成功),执行用户的回调。 5 监听一个流,就是io观察者中的文件描述符。可读事件触发时(有完成三次握手的连接),执行用户的回调。 今天我们具体分析一下流读写操作的实现。首先我们看一下如何初始化一个流。

代码语言:c 复制
// 初始化流
void uv__stream_init(uv_loop_t* loop,
                     uv_stream_t* stream,
                     uv_handle_type type) {
  int err;
  // 记录handle的类型
  uv__handle_init(loop, (uv_handle_t*)stream, type);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  stream->close_cb = NULL;
  stream->connection_cb = NULL;
  stream->connect_req = NULL;
  stream->shutdown_req = NULL;
  stream->accepted_fd = -1;
  stream->queued_fds = NULL;
  stream->delayed_error = 0;
  QUEUE_INIT(&stream->write_queue);
  QUEUE_INIT(&stream->write_completed_queue);
  stream->write_queue_size = 0;
  // 这个逻辑看起来是为了拿到一个备用的文件描述符,如果以后触发UV_EMFILE错误(打开的文件太多)时,使用这个备用的fd
  if (loop->emfile_fd == -1) {
    err = uv__open_cloexec("/dev/null", O_RDONLY);
    if (err < 0)
        err = uv__open_cloexec("/", O_RDONLY);
    if (err >= 0)
      loop->emfile_fd = err;
  }
  // 初始化io观察者,把文件描述符(这里还没有,所以是-1)和回调uv__stream_io记录在io_watcher上
  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

我们看到流的初始化没有太多逻辑。主要是初始化一些字段。接着我们看一下如何打开(激活)一个流。

代码语言:c 复制
// 关闭nagle,开启长连接,保存fd 
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {

  // 还没有设置fd或者设置的同一个fd则继续,否则返回busy
  if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
    return UV_EBUSY;

  // 设置流的标记
  stream->flags |= flags;

  if (stream->type == UV_TCP) {
    // 关闭nagle算法
    if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
      return UV__ERR(errno);

    // 开启SO_KEEPALIVE,使用tcp长连接,一定时间后没有收到数据包会发送心跳包
    if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
        uv__tcp_keepalive(fd, 1, 60)) {
      return UV__ERR(errno);
    }
  }
   // 保存socket对应的文件描述符到io观察者中,libuv会在io poll阶段监听该文件描述符
  stream->io_watcher.fd = fd;

  return 0;
}

打开一个流,本质上就是给这个流关联一个文件描述符。还有一些属性的设置。有了文件描述符,后续就可以操作这个流了。下面我们逐个操作分析。 1 读 我们在一个流上执行uv_read_start。流的数据(如果有的话)就会源源不断地流向调用方。

代码语言:c 复制
int uv_read_start(uv_stream_t* stream,
                  uv_alloc_cb alloc_cb,
                  uv_read_cb read_cb) {
  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
      stream->type == UV_TTY);
  // 流已经关闭,不能读
  if (stream->flags & UV_HANDLE_CLOSING)
    return UV_EINVAL;
  // 流不可读,说明可能是只写流
  if (!(stream->flags & UV_HANDLE_READABLE))
    return -ENOTCONN;
  // 标记正在读
  stream->flags |= UV_HANDLE_READING;
  // 记录读回调,有数据的时候会执行这个回调
  stream->read_cb = read_cb;
  // 分配内存函数,用于存储读取的数据
  stream->alloc_cb = alloc_cb;
  // 注册读事件
  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  // 激活handle,有激活的handle,事件循环不会退出
  uv__handle_start(stream);

  return 0;
}

执行uv_read_start本质上是给流对应的文件描述符在epoll中注册了一个可读事件。并且给一些字段赋值,比如读回调函数,分配内存的函数。打上正在做读取操作的标记。然后在可读事件触发的时候,读回调就会被执行,这个逻辑我们后面分析。除了开始读取数据,还有一个读操作就是停止读取。对应的函数是uv_read_stop。

代码语言:c 复制
int uv_read_stop(uv_stream_t* stream) {
  // 是否正在执行读取操作,如果不是,则没有必要停止
  if (!(stream->flags & UV_HANDLE_READING))
    return 0;
  // 清除 正在读取 的标记
  stream->flags &= ~UV_HANDLE_READING;
  // 撤销 等待读事件
  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
  // 对写事件也不感兴趣,停掉handle。允许事件循环退出
  if (!uv__io_active(&stream->io_watcher, POLLOUT))
    uv__handle_stop(stream);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  return 0;
}

和start相反,start是注册等待可读事件和打上正在读取这个标记,stop就是撤销等待可读事件和清除这个标记。另外还有一个辅助函数,判断流是否设置了可读属性。

代码语言:c 复制
int uv_is_readable(const uv_stream_t* stream) {
  return !!(stream->flags & UV_HANDLE_READABLE);
}

2 写 我们在流上执行uv_write就可以往流中写入数据。

代码语言:c 复制
int uv_write(
       // 一个写请求,记录了需要写入的数据和信息。数据来自下面的const uv_buf_t bufs[]
         uv_write_t* req,
         // 往哪个流写
       uv_stream_t* handle,
       // 需要写入的数据
       const uv_buf_t bufs[],
       // 个数
       unsigned int nbufs,
       // 写完后执行的回调
       uv_write_cb cb
) {
  return uv_write2(req, handle, bufs, nbufs, NULL, cb);
}

uv_write是直接调用uv_write2。第四个参数是NULL。代表是一般的写数据,不传递文件描述符。

代码语言:c 复制
int uv_write2(
    uv_write_t* req,
    uv_stream_t* stream,
    const uv_buf_t bufs[],
    unsigned int nbufs,
    // 需要传递的文件描述符所在的流,这里不分析,在分析unix的时候再分析
    uv_stream_t* send_handle,
    uv_write_cb cb
) 
{
  int empty_queue;
  // 是不可写流
  if (!(stream->flags & UV_HANDLE_WRITABLE))
    return -EPIPE;
  // 流中缓存的数据大小是否为0
  empty_queue = (stream->write_queue_size == 0);

  // 初始化一个写请求
  uv__req_init(stream->loop, req, UV_WRITE);
  // 写完后执行的回调
  req->cb = cb;
  // 往哪个流写
  req->handle = stream;
  // 写出错的错误码,初始化为0
  req->error = 0;
  QUEUE_INIT(&req->queue);
  // 默认buf
  req->bufs = req->bufsml;
  // 不够则扩容
  if (nbufs > ARRAY_SIZE(req->bufsml))
    req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
  // 把需要写入的数据填充到req中
  memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
  // 需要写入的buf个数
  req->nbufs = nbufs;
  // 目前写入的buf个数,初始化是0
  req->write_index = 0;
  // 更新流中待写数据的总长度,就是每个buf的数据大小加起来
  stream->write_queue_size  = uv__count_bufs(bufs, nbufs);

  // 插入待写队列
  QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
  /*
   stream->connect_req非空说明是作为客户端,并且正在建立三次握手,建立成功会置connect_req为NULL。
   这里非空说明还没有建立成功或者不是作为客户端(不是连接流)。即没有用到connect_req这个字段。
  */
  if (stream->connect_req) {
    /* Still connecting, do nothing. */
  }
  else if (empty_queue) { 
    // 待写队列为空,则直接触发写动作,即操作文件描述符
    uv__write(stream);
  }
  else {
    /*
        队列非空,说明往底层写,uv__write中不一样会注册等待可写事件,所以这里注册一下
        给流注册等待可写事件,触发的时候,把数据消费掉
    */
    uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
  }

  return 0;
}

uv_write2的主要逻辑就是封装一个写请求,插入到流的待写队列。然后根据当前流的情况。看是直接写入还是等待会再写入。架构大致如下。

我们继续看真正的写操作。

代码语言:c 复制
static void uv__write(uv_stream_t* stream) {
  struct iovec* iov;
  QUEUE* q;
  uv_write_t* req;
  int iovmax;
  int iovcnt;
  ssize_t n;
  int err;

start:
  // 待写队列为空,没得写
  if (QUEUE_EMPTY(&stream->write_queue))
    return;
  // 遍历待写队列,把每个节点的数据写入底层
  q = QUEUE_HEAD(&stream->write_queue);
  req = QUEUE_DATA(q, uv_write_t, queue);
  /*
    struct iovec {
        ptr_t iov_base; // 数据首地址
        size_t iov_len; // 数据长度
    };  
    iovec和bufs结构体的定义一样
  */
  // 转成iovec格式发送
  iov = (struct iovec*) &(req->bufs[req->write_index]);
  // 待写的buf个数,nbufs是总数,write_index是当前已写的个数
  iovcnt = req->nbufs - req->write_index;
  // 最多能写几个
  iovmax = uv__getiovmax();

  // 取最小值
  if (iovcnt > iovmax)
    iovcnt = iovmax;

  // 有需要传递的描述符
  if (req->send_handle) {
    // 需要传递文件描述符的逻辑,分析unix域的时候再分析
  } else { // 单纯发送数据,则直接写
    do {
      if (iovcnt == 1) {
        n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
      } else {
        n = writev(uv__stream_fd(stream), iov, iovcnt);
      }
    } while (n == -1 && errno == EINTR);
  }
  // 发送出错
  if (n < 0) {
    // 发送失败的逻辑,我们不具体分析
  } else {
    // 写成功,n是写成功的字节数
    while (n >= 0) {
      // 本次待写数据的首地址
      uv_buf_t* buf = &(req->bufs[req->write_index]);
      // 某个buf的数据长度
      size_t len = buf->len;
      // len如果大于n说明本buf的数据部分被写入
      if ((size_t)n < len) {
        // 更新指针,指向下次待发送的数据首地址
        buf->base  = n;
        // 更新待发送数据的长度
        buf->len -= n;
        // 更新待写数据的总长度
        stream->write_queue_size -= n;
        n = 0;
        // 设置了一直写标记,则继续写
        if (stream->flags & UV_HANDLE_BLOCKING_WRITES) {
          goto start;
        } else {
          // 否则等待可写事件触发的时候再写
          break;
        }
      } else {
        // 本buf的数据完成被写入,更新下一个待写入的buf位置
        req->write_index  ;
        n -= len;
        // 更新待写数据总长度
        stream->write_queue_size -= len;
        // 如果写完了全部buf,触发回调
        if (req->write_index == req->nbufs) {
          // 写完了本请求的数据,做后续处理
          uv__write_req_finish(req);
          return;
        }
      }
    }
  }
  // 到这说明数据还没有完全被写入,注册等待可写事件,等待继续写
  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
  return;

error:
  // 写出错
  req->error = err;
  uv__write_req_finish(req);
  // 撤销等待可写事件
  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  // 没有注册了等待可读事件,则停掉流
  if (!uv__io_active(&stream->io_watcher, POLLIN))
    uv__handle_stop(stream);
}

我们看一下写完一个请求后,libuv如何处理他。逻辑在uv__write_req_finish函数。

代码语言:c 复制
// 把buf的数据写入完成或写出错后触发的回调
static void uv__write_req_finish(uv_write_t* req) {
  uv_stream_t* stream = req->handle;
  // 移出队列
  QUEUE_REMOVE(&req->queue);
  // 写入成功了
  if (req->error == 0) {
    /*
      bufsml是默认的buf数,如果不够,则bufs指向新的内存,
      然后再储存数据。两者不等说明申请了额外的内存,需要free掉
    */ 
    if (req->bufs != req->bufsml)
      uv__free(req->bufs);
    req->bufs = NULL;
  }
  // 插入写完成队列
  QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
  // 插入pending队列,在pending阶段执行回调
  uv__io_feed(stream->loop, &stream->io_watcher);
}

uv__write_req_finish的逻辑比较简单,就是把节点从待写队列中移除。然后插入写完成队列。最后把io 观察者插入pending队列。在pending节点会知道io观察者的回调(uv__stream_io)。流模块的逻辑比较多,今天先分析到这里。后续继续分析其他操作。

0 人点赞