流的实现在libuv里占了很大篇幅,今天分析一下流的实现。首先看数据结构。流在libuv里用uv_stream_s表示,他属于handle族。继承于uv_handle_s。
代码语言:c 复制struct uv_stream_s {
// uv_handle_s的字段
void* data;
// 所属事件循环
uv_loop_t* loop;
// handle类型
uv_handle_type type;
// 关闭handle时的回调
uv_close_cb close_cb;
// 用于插入事件循环的handle队列
void* handle_queue[2];
union {
int fd;
void* reserved[4];
} u;
// 用于插入事件循环的closing阶段对应的队列
uv_handle_t* next_closing;
// 各种标记
unsigned int flags;
// 流拓展的字段
// 用户写入流的字节大小,流缓存用户的输入,然后等到可写的时候才做真正的写
size_t write_queue_size;
// 分配内存的函数,内存由用户定义,主要用来保存读取的数据
uv_alloc_cb alloc_cb;
// 读取数据的回调
uv_read_cb read_cb;
// 连接成功后,执行connect_req的回调(connect_req在uv__xxx_connect中赋值)
uv_connect_t *connect_req;
// 关闭写端的时候,发送完缓存的数据,执行shutdown_req的回调(shutdown_req在uv_shutdown的时候赋值)
uv_shutdown_t *shutdown_req;
// 流对应的io观察者,即文件描述符 一个文件描述符事件触发时执行的回调
uv__io_t io_watcher;
// 流缓存下来的,待写的数据
void* write_queue[2];
// 已经完成了数据写入的队列
void* write_completed_queue[2];
// 完成三次握手后,执行的回调
uv_connection_cb connection_cb;
// 操作流时出错码
int delayed_error;
// accept返回的通信socket对应的文件描述符
int accepted_fd;
// 同上,用于缓存更多的通信socket对应的文件描述符
void* queued_fds;
}
流的实现中,最核心的字段是io观察者,其余的字段是和流的性质相关的。io观察者封装了流对应的文件描述符和文件描述符事件触发时的回调。比如读一个流,写一个流,关闭一个流,连接一个流,监听一个流,在uv_stream_s中都有对应的字段去支持。但是本质上是靠io观察者去驱动的。 1 读一个流,就是io观察者中的文件描述符。可读事件触发时,执行用户的读回调。 2 写一个流,先把数据写到流中,然后io观察者中的文件描述符。可写事件触发时,执行最后的写入,并执行用户的写完成回调。 3 关闭一个流,就是io观察者中的文件描述符。可写事件触发时,如果待写的数据已经写完(比如发送完),然后执行关闭流的写端。接着执行用户的回调。 4 连接一个流,比如作为客户端去连接服务器。就是io观察者中的文件描述符。可读事件触发时(建立三次握手成功),执行用户的回调。 5 监听一个流,就是io观察者中的文件描述符。可读事件触发时(有完成三次握手的连接),执行用户的回调。 今天我们具体分析一下流读写操作的实现。首先我们看一下如何初始化一个流。
代码语言:c 复制// 初始化流
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
// 记录handle的类型
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;
// 这个逻辑看起来是为了拿到一个备用的文件描述符,如果以后触发UV_EMFILE错误(打开的文件太多)时,使用这个备用的fd
if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/dev/null", O_RDONLY);
if (err < 0)
err = uv__open_cloexec("/", O_RDONLY);
if (err >= 0)
loop->emfile_fd = err;
}
// 初始化io观察者,把文件描述符(这里还没有,所以是-1)和回调uv__stream_io记录在io_watcher上
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
我们看到流的初始化没有太多逻辑。主要是初始化一些字段。接着我们看一下如何打开(激活)一个流。
代码语言:c 复制// 关闭nagle,开启长连接,保存fd
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
// 还没有设置fd或者设置的同一个fd则继续,否则返回busy
if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
return UV_EBUSY;
// 设置流的标记
stream->flags |= flags;
if (stream->type == UV_TCP) {
// 关闭nagle算法
if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
return UV__ERR(errno);
// 开启SO_KEEPALIVE,使用tcp长连接,一定时间后没有收到数据包会发送心跳包
if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
uv__tcp_keepalive(fd, 1, 60)) {
return UV__ERR(errno);
}
}
// 保存socket对应的文件描述符到io观察者中,libuv会在io poll阶段监听该文件描述符
stream->io_watcher.fd = fd;
return 0;
}
打开一个流,本质上就是给这个流关联一个文件描述符。还有一些属性的设置。有了文件描述符,后续就可以操作这个流了。下面我们逐个操作分析。 1 读 我们在一个流上执行uv_read_start。流的数据(如果有的话)就会源源不断地流向调用方。
代码语言:c 复制int uv_read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
// 流已经关闭,不能读
if (stream->flags & UV_HANDLE_CLOSING)
return UV_EINVAL;
// 流不可读,说明可能是只写流
if (!(stream->flags & UV_HANDLE_READABLE))
return -ENOTCONN;
// 标记正在读
stream->flags |= UV_HANDLE_READING;
// 记录读回调,有数据的时候会执行这个回调
stream->read_cb = read_cb;
// 分配内存函数,用于存储读取的数据
stream->alloc_cb = alloc_cb;
// 注册读事件
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
// 激活handle,有激活的handle,事件循环不会退出
uv__handle_start(stream);
return 0;
}
执行uv_read_start本质上是给流对应的文件描述符在epoll中注册了一个可读事件。并且给一些字段赋值,比如读回调函数,分配内存的函数。打上正在做读取操作的标记。然后在可读事件触发的时候,读回调就会被执行,这个逻辑我们后面分析。除了开始读取数据,还有一个读操作就是停止读取。对应的函数是uv_read_stop。
代码语言:c 复制int uv_read_stop(uv_stream_t* stream) {
// 是否正在执行读取操作,如果不是,则没有必要停止
if (!(stream->flags & UV_HANDLE_READING))
return 0;
// 清除 正在读取 的标记
stream->flags &= ~UV_HANDLE_READING;
// 撤销 等待读事件
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
// 对写事件也不感兴趣,停掉handle。允许事件循环退出
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
return 0;
}
和start相反,start是注册等待可读事件和打上正在读取这个标记,stop就是撤销等待可读事件和清除这个标记。另外还有一个辅助函数,判断流是否设置了可读属性。
代码语言:c 复制int uv_is_readable(const uv_stream_t* stream) {
return !!(stream->flags & UV_HANDLE_READABLE);
}
2 写 我们在流上执行uv_write就可以往流中写入数据。
代码语言:c 复制int uv_write(
// 一个写请求,记录了需要写入的数据和信息。数据来自下面的const uv_buf_t bufs[]
uv_write_t* req,
// 往哪个流写
uv_stream_t* handle,
// 需要写入的数据
const uv_buf_t bufs[],
// 个数
unsigned int nbufs,
// 写完后执行的回调
uv_write_cb cb
) {
return uv_write2(req, handle, bufs, nbufs, NULL, cb);
}
uv_write是直接调用uv_write2。第四个参数是NULL。代表是一般的写数据,不传递文件描述符。
代码语言:c 复制int uv_write2(
uv_write_t* req,
uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
// 需要传递的文件描述符所在的流,这里不分析,在分析unix的时候再分析
uv_stream_t* send_handle,
uv_write_cb cb
)
{
int empty_queue;
// 是不可写流
if (!(stream->flags & UV_HANDLE_WRITABLE))
return -EPIPE;
// 流中缓存的数据大小是否为0
empty_queue = (stream->write_queue_size == 0);
// 初始化一个写请求
uv__req_init(stream->loop, req, UV_WRITE);
// 写完后执行的回调
req->cb = cb;
// 往哪个流写
req->handle = stream;
// 写出错的错误码,初始化为0
req->error = 0;
QUEUE_INIT(&req->queue);
// 默认buf
req->bufs = req->bufsml;
// 不够则扩容
if (nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
// 把需要写入的数据填充到req中
memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
// 需要写入的buf个数
req->nbufs = nbufs;
// 目前写入的buf个数,初始化是0
req->write_index = 0;
// 更新流中待写数据的总长度,就是每个buf的数据大小加起来
stream->write_queue_size = uv__count_bufs(bufs, nbufs);
// 插入待写队列
QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
/*
stream->connect_req非空说明是作为客户端,并且正在建立三次握手,建立成功会置connect_req为NULL。
这里非空说明还没有建立成功或者不是作为客户端(不是连接流)。即没有用到connect_req这个字段。
*/
if (stream->connect_req) {
/* Still connecting, do nothing. */
}
else if (empty_queue) {
// 待写队列为空,则直接触发写动作,即操作文件描述符
uv__write(stream);
}
else {
/*
队列非空,说明往底层写,uv__write中不一样会注册等待可写事件,所以这里注册一下
给流注册等待可写事件,触发的时候,把数据消费掉
*/
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
}
return 0;
}
uv_write2的主要逻辑就是封装一个写请求,插入到流的待写队列。然后根据当前流的情况。看是直接写入还是等待会再写入。架构大致如下。
我们继续看真正的写操作。
代码语言:c 复制static void uv__write(uv_stream_t* stream) {
struct iovec* iov;
QUEUE* q;
uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
int err;
start:
// 待写队列为空,没得写
if (QUEUE_EMPTY(&stream->write_queue))
return;
// 遍历待写队列,把每个节点的数据写入底层
q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
/*
struct iovec {
ptr_t iov_base; // 数据首地址
size_t iov_len; // 数据长度
};
iovec和bufs结构体的定义一样
*/
// 转成iovec格式发送
iov = (struct iovec*) &(req->bufs[req->write_index]);
// 待写的buf个数,nbufs是总数,write_index是当前已写的个数
iovcnt = req->nbufs - req->write_index;
// 最多能写几个
iovmax = uv__getiovmax();
// 取最小值
if (iovcnt > iovmax)
iovcnt = iovmax;
// 有需要传递的描述符
if (req->send_handle) {
// 需要传递文件描述符的逻辑,分析unix域的时候再分析
} else { // 单纯发送数据,则直接写
do {
if (iovcnt == 1) {
n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
} else {
n = writev(uv__stream_fd(stream), iov, iovcnt);
}
} while (n == -1 && errno == EINTR);
}
// 发送出错
if (n < 0) {
// 发送失败的逻辑,我们不具体分析
} else {
// 写成功,n是写成功的字节数
while (n >= 0) {
// 本次待写数据的首地址
uv_buf_t* buf = &(req->bufs[req->write_index]);
// 某个buf的数据长度
size_t len = buf->len;
// len如果大于n说明本buf的数据部分被写入
if ((size_t)n < len) {
// 更新指针,指向下次待发送的数据首地址
buf->base = n;
// 更新待发送数据的长度
buf->len -= n;
// 更新待写数据的总长度
stream->write_queue_size -= n;
n = 0;
// 设置了一直写标记,则继续写
if (stream->flags & UV_HANDLE_BLOCKING_WRITES) {
goto start;
} else {
// 否则等待可写事件触发的时候再写
break;
}
} else {
// 本buf的数据完成被写入,更新下一个待写入的buf位置
req->write_index ;
n -= len;
// 更新待写数据总长度
stream->write_queue_size -= len;
// 如果写完了全部buf,触发回调
if (req->write_index == req->nbufs) {
// 写完了本请求的数据,做后续处理
uv__write_req_finish(req);
return;
}
}
}
}
// 到这说明数据还没有完全被写入,注册等待可写事件,等待继续写
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
return;
error:
// 写出错
req->error = err;
uv__write_req_finish(req);
// 撤销等待可写事件
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
// 没有注册了等待可读事件,则停掉流
if (!uv__io_active(&stream->io_watcher, POLLIN))
uv__handle_stop(stream);
}
我们看一下写完一个请求后,libuv如何处理他。逻辑在uv__write_req_finish函数。
代码语言:c 复制// 把buf的数据写入完成或写出错后触发的回调
static void uv__write_req_finish(uv_write_t* req) {
uv_stream_t* stream = req->handle;
// 移出队列
QUEUE_REMOVE(&req->queue);
// 写入成功了
if (req->error == 0) {
/*
bufsml是默认的buf数,如果不够,则bufs指向新的内存,
然后再储存数据。两者不等说明申请了额外的内存,需要free掉
*/
if (req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
}
// 插入写完成队列
QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
// 插入pending队列,在pending阶段执行回调
uv__io_feed(stream->loop, &stream->io_watcher);
}
uv__write_req_finish的逻辑比较简单,就是把节点从待写队列中移除。然后插入写完成队列。最后把io 观察者插入pending队列。在pending节点会知道io观察者的回调(uv__stream_io)。流模块的逻辑比较多,今天先分析到这里。后续继续分析其他操作。