nginx upstream模块完整逻辑源码分析

2021-01-25 10:34:02 浏览数 (1)

nginx访问上游服务器upstream分为几个阶段:

1.启动upstream。 2.连接上游服务器。 3.向上游发送请求。 4.接收上游响应(包头/包体)。 5.结束请求。

主要数据结构:

ngx_http_upstream_t

ngx_http_upstream_conf_t

流程图和数据结构

upstream处理流程图:

typedef struct ngx_http_upstream_s ngx_http_upstream_t;

struct ngx_http_upstream_s {

ngx_http_upstream_handler_pt read_event_handler; // 处理读事件的回调方法

ngx_http_upstream_handler_pt write_event_handler; // 处理写事件的回调方法

ngx_peer_connection_t peer; // 主动向上游发起的连接,稍后会详细分析

ngx_event_pipe_t *pipe; // 当开启缓存配置,会用pipe来转发响应,

需要http模块在使用upstream机制前构造pipe构体

ngx_chain_t *request_bufs; // 用链表将ngx_buf_t缓冲区链接起来,表示所有 需 要发送到上游的请求内容,

// create_request回调在于构造request_buf链表

ngx_output_chain_ctx_t output; // 向下游发送响应的方式,稍后会详细分析

ngx_chain_writer_ctx_t writer; // 向下游发送响应的方式,稍后会详细分析

ngx_http_upstream_conf_t *conf; // upstream相关的配置信息

#if (NGX_HTTP_CACHE)

ngx_array_t *caches; // 缓存数组,稍后会单独介绍缓存相关内容

#endif

ngx_http_upstream_headers_in_t headers_in; // 当直接转发时,process_header将解析的头部适 配为http头部,同时将包头信息放在headers_in中

ngx_http_upstream_resolved_t *resolved; //用于解析主机域名,后面会详细介绍

ngx_buf_t from_client;

ngx_buf_t buffer; // 接收上游服务器响应包头的缓存区,当不需要直 接响应或buffering为0时,也作为转发包体缓冲区

off_t length; //来自上游服务器的响应包体的长度

ngx_chain_t *out_bufs; //使用时再具体介绍,不同场景下有不同意义

ngx_chain_t *busy_bufs; //当buffering为0时,表示上一次向下游转发响应时

没有发送完成的内容

ngx_chain_t *free_bufs; //当buffering为0时,用于回收out_bufs中已经发送给

下游的ngx_buf_t结构体

ngx_int_t (*input_filter_init)(void *data); //处理包体前的初始化方法,其中data用于传递用户

数据结构,即下方的input_filter_ctx

ngx_int_t (*input_filter)(void *data, ssize_t bytes) //处理包体的方法,bytes表示本次接收到的

包体长度,data同上

void *input_filter_ctx; // 传递http模块的自定义的数据结构

#if (NGX_HTTP_CACHE)

ngx_int_t (*create_key)(ngx_http_request_t *r); // cache部分,后面再分析

#endif

ngx_int_t (*create_request)(ngx_http_request_t *r); // 用于构造发往上游服务器的请求

ngx_int_t (*reinit_request)(ngx_http_request_t *r); // 与上游通讯失败,需要重新发起连接时,用该方法 重新初始化请求信息

ngx_int_t (*process_header)(ngx_http_request_t *r); // 解析上游服务器返回响应的包头,

NGX_AGAIN接收不完整,NGX_OK解析到完整包头

void (*abort_request)(ngx_http_request_t *r); // 暂时没有用到

void (*finalize_request)(ngx_http_request_t *r,ngx_int_t rc); // 请求结束时会调用,目前没有实际作用

ngx_int_t (*rewrite_redirect)(ngx_http_request_t *r, ngx_table_elt_t *h, size_t prefix);// 上游返回响应中含Location或Refresh时,process_header会调用http模块实现的该方法

ngx_int_t (*rewrite_cookie)(ngx_http_request_t *r, // 同上,当响应中含Set-Cookie时,会调用http模块实现的该方法

ngx_table_elt_t *h);

ngx_msec_t timeout; // 暂时没有用到

ngx_http_upstream_state_t *state; // 用于表示上游响应的错误码、包体长度等信息

ngx_str_t method; // 用于文件缓存,稍后再进行分析

ngx_str_t schema; // 记录日志时使用

ngx_str_t uri; // 记录日志时使用

ngx_http_cleanup_pt *cleanup; // 用于标识是否需要清理资源,相当于一个标志位,实际不会调用该方法

unsigned store:1; // 是否指定文件缓存路径的标志位

unsigned cacheable:1; // 是否启用文件缓存

unsigned accel:1; // 目前没有用到

unsigned ssl:1; // 是否基于SSL协议访问上游服务器

unsigned buffering:1; // 向下游转发响应包体时,是否开启更大内存及临时磁盘文件用于缓存来不及发送到下游的响应包体

unsigned keepalive:1; // 标识与后端是否开启keepalive ?

unsigned upgrade:1; // 是否存在upgrade header

unsigned request_sent:1; // 是否向上游服务器发送了请求

unsigned header_sent:1; // 为1时,表示包头已经转发给客户端了

}

ngx_http_upstream_conf_t:指定了upstream的运行方式,必须在启动upstream之前设置。

typedef struct {

ngx_http_upstream_srv_conf_t *upstream; // 当上面没有实现resolved成员时,用该结构体定义上游服务器的配置

ngx_msec_t connect_timeout; // 建立tcp连接的超时时间,即写事件添加到定时器中设置的超时时间

ngx_msec_t send_timeout; // 发送请求的超时时间,即写事件添加到定时器中设置的超时时间

ngx_msec_t read_timeout; // 接收响应的超时时间,即读事件添加到定时器中设置的超时时间

ngx_msec_t timeout; // 暂时没有使用

ngx_msec_t next_upstream_timeout;

size_t send_lowat; // 发送缓存区的下限,即TCP的SO_SNOLOWAT选项

size_t buffer_size; // 指定接收头部缓冲区分配的内存大小,当buffering为0时,由于上述buffer同时用于接收包体,也表示接收包体缓冲区大小

size_t limit_rate;

size_t busy_buffers_size; // 当buffering为1,且向下游转发响应时生效,会设置到 ngx_event_pipe_t结构体的busy_size中

size_t max_temp_file_size; // 指定临时文件的大小,限制ngx_event_pipe_t中的temp_file

size_t temp_file_write_size; // 将缓冲区的响应写入临时文件时,一次写入字符流的最大长度

......

ngx_bufs_t bufs; // 以缓存响应的方式转发上游服务器的包体时所使用的内存大小

ngx_uint_t ignore_headers; // 以位图的形式标识在转发时需要忽略的headers

ngx_uint_t next_upstream; // 以位图的方式表示一些错误码,当处理上游响应时发现该错误码,选择下一个上游服务器重发请求

ngx_uint_t store_access; // 表示创建的临时目录和文件的权限

ngx_uint_t next_upstream_tries;

ngx_flag_t buffering; // 为1时表示打开缓存,尽量在内存和磁盘中缓存来自上游的响应,为0时则开辟固定大小内存块作为缓存来转发响应

......

ngx_flag_t ignore_client_abort; // 为1时,表示与上游服务器交互时不检查nginx与下游服务器是否断开,即使下游主动关闭连接,也不会中断与上游交互

ngx_flag_t intercept_errors; // 详见ngx_http_upstream_intercept_errors

ngx_flag_t cyclic_temp_file; // 为1时,会尝试复用临时文件中已经使用过的空间

......

ngx_path_t *temp_path; // buffering为1的情况下转发响应时,存放临时文件的路径

ngx_hash_t hide_headers_hash; // 不转发的头部,根据hide_headers和pass_headers动态数组构造出的需要隐藏的http头部散列表

ngx_array_t *hide_headers; // 当转发上游头部给下游时,如果不希望将某些头部转发给下游,则设置到该数组中

ngx_array_t *pass_headers; // 转发头部时upstream机制默认不会转发某些头部,当确定需要转发时,需要设置到该数组中

ngx_http_upstream_local_t *local; // 连接上游服务器时,需要使用的本机地址

ngx_array_t *store_lengths; // 当需要将上游响应缓存到文件中时,表示存放路径的长度

ngx_array_t *store_values; // 当需要将上游响应缓存到文件中时,表示存放路径

......

signed store:2; // 同ngx_http_upstream_t中的store

unsigned intercept_404:1; // 如果该值设为1,当上游返回404时直接转发该错误码给下游,而不会去与error_page进行比较

unsigned change_buffering:1; // 当为1时,根据上游服务器返回的响应头部,动态决定是以上游网速优先,还是下游网速优先

......

ngx_str_t module; // 使用upstream的模块名称,仅用于记录日志。

} ngx_http_upstream_conf_t

upstream创建和初始化

upstream创建:

upstream初始化:

启动upstream

当收到请求后,http的代理模块是ngx_http_proxy_module,其NGX_HTTP_CONTENT_PHASE阶段的处理函数为ngx_http_proxy_handler。

static ngx_int_t ngx_http_proxy_handler(ngx_http_request_t *r)

{

// 创建ngx_http_upstream_t结构,并赋值给r->upstream

if (ngx_http_upstream_create(r) != NGX_OK) {

return NGX_HTTP_INTERNAL_SERVER_ERROR;

}

.....

plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module);

.....

u = r->upstream;

.....

// 给upstream的conf成员赋值,记录相关的配置信息

u->conf = &plcf->upstream;

// 设置相关的回调信息

u->create_request = ngx_http_proxy_create_request;

u->reinit_request = ngx_http_proxy_reinit_request;

u->process_header = ngx_http_proxy_process_status_line;

u->abort_request = ngx_http_proxy_abort_request;

u->finalize_request = ngx_http_proxy_finalize_request;

......

u->buffering = plcf->upstream.buffering;

.....

// 调用ngx_http_upstream_init函数

rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);

.....

return NGX_DONE;

}

创建upstream的结构并进行设置,然后设置ngx_http_upstream_conf_t配置结构体给upstream->conf。ngx_http_upstream_init函数会根据ngx_http_upstream_conf_t配置的信息初始化upstream,同时开始连接上游服务器,由此开始整个upstream的处理流程。

void ngx_http_upstream_init(ngx_http_request_t *r)

{

ngx_connection_t *c;

// 客户端的连接

c = r->connection;

......

// 当启用upstream时,需要将客户端对应的读事件从定时器中删除,此时主要关注上游的连接相关的事件

if (c->read->timer_set) {

ngx_del_timer(c->read);

}

......

ngx_http_upstream_init_request(r);

}

static void ngx_http_upstream_init_request(ngx_http_request_t *r)

{

u = r->upstream;

u->store = u->conf->store;

......

// 设置Nginx与下游客户端之间TCP连接的检查方法,注意几个条件,ignore来自之前配置属性,是否忽略客户端的连接状态

if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {

r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;

r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;

}

......

// 调用http模块实现的create_request方法,即前面注册的ngx_http_proxy_create_request函数,用于构造发到上游服务器的请求

if (u->create_request(r) != NGX_OK) {

ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);

return;

}

......

// 向当前请求的main成员指向的原始请求中的cleanup链表末尾添加一个新成员

cln = ngx_http_cleanup_add(r, 0);

// 将handler的回调方法设置为ngx_http_upstream_cleanup

cln->handler = ngx_http_upstream_cleanup;

cln->data = r;

u->cleanup = &cln->handler;

......

// 调用ngx_http_upstream_connect向上游服务器发起连接

ngx_http_upstream_connect(r, u);

}

与上游建立连接

upstream机制与上游服务器之间通过tcp建立连接,为了保证三次握手的过程中不阻塞进程,nginx采用了无阻塞的套接字来连接上游服务器。 ngx_http_upstream_connect负责发起建连动作,如果没有立即返回成功,需要在epoll中监控该套接字,当出现可写事件时,则说明连接已经建立成功。源码展开如下:

static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)

{

// 建连的动作主要由下面函数进行.....

rc = ngx_event_connect_peer(&u->peer);

....

}

ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc)

{

// 创建tcp socket套接字

s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0);

......

// 获取空闲的ngx_connection_t结构来承载连接,从ngx_cycle_t的free_connections指向的空闲连接池中获取

c = ngx_get_connection(s, pc->log);

......

// 设置连接为非阻塞的模式

if (ngx_nonblocking(s) == -1) {

......

// 绑定地址和端口

if (pc->local) {

if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {

......

// 设置连接收发相关的回调函数

c->recv = ngx_recv;

c->send = ngx_send;

c->recv_chain = ngx_recv_chain;

c->send_chain = ngx_send_chain;

// 启用sendfile的支持

c->sendfile = 1;

......

rev = c->read;

wev = c->write;

......

pc->connection = c;

// 调用ngx_event_actions.add_conn将tcp套接字以期待可读、可写的方式添加到事件搜集器中,这里是把套接字加到epoll中

if (ngx_add_conn) {

if (ngx_add_conn(c) == NGX_ERROR) {

goto failed;

}

}

// 向上游服务器发起连接,由于非阻塞,调用会立即返回

rc = connect(s, pc->sockaddr, pc->socklen);

......

static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)

{

......

// 上面已经分析了,该函数主要进行上游服务器的连接

rc = ngx_event_connect_peer(&u->peer);

......

c = u->peer.connection;

c->data = r;

// 将上游connection上读写事件的回调,都设置为ngx_http_upstream_handler

c->write->handler = ngx_http_upstream_handler;

c->read->handler = ngx_http_upstream_handler;

// 设置upstream机制的write_event_handler和read_event_handler,具体使用见后续的ngx_upstream_handler函数

// ngx_http_upstream_send_request_handler用于向上游发送请求

u->write_event_handler = ngx_http_upstream_send_request_handler;

// ngx_http_upstream_process_header接收和解析上游服务器的响应

u->read_event_handler = ngx_http_upstream_process_header;

......

if (rc == NGX_AGAIN) {

// 当连接没有建立成功时,套接字已经在epoll中了,将写事件添加到定时器中,超时时间是 ngx_http_upstream_conf_t中的connect_timeout成员

ngx_add_timer(c->write, u->conf->connect_timeout);

return;

}

......

// 当成功建立连接时,向上游服务器发送请求,注意:此处的函数与上面设置的定时器回调的函数有所不同,下文会进行说明

ngx_http_upstream_send_request(r, u);

}

看一下connection的读写回调函数——ngx_http_upstream_handler

static void ngx_http_upstream_handler(ngx_event_t *ev)

{

......

// 由事件的data成员取得ngx_connection_t连接,该连接是nginx与上游服务器之间的连接

c = ev->data;

// 由连接的data取得ngx_http_request_t结构体

r = c->data;

// 由请求的upstream成员取的表示upstream机制的ngx_http_upstream_t结构体

u = r->upstream;

// 此处ngx_http_request_t结构中的connection成员代表的是客户端与nginx之间连接

c = r->connection;

......

if (ev->write) {

// nginx与上游服务器间的tcp连接的可写事件被触发时,该方法被调用

u->write_event_handler(r, u);

} else {

// nginx与上游服务器间的tcp连接的可读事件被触发时,该方法被调用

u->read_event_handler(r, u);

}

// 与nginx_http_request_handler相同,最后一步执行post请求

ngx_http_run_posted_requests(c);

}

发送请求刀上游服务器

ngx_http_upstream_connect中,将ngx_http_upstream_t中的write_event_handler设置为了ngx_http_upstream_send_request_handler。ngx_http_upstream_connect的最后直接调用了ngx_http_upstream_send_request发送请求,分析流程如下:

static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)

{

ngx_connection_t *c;

// 获取与上游服务器间表示连接的ngx_connection_t结构体

c = u->peer.connection;

// 当写事件的timeout被设置为1时,则代表向上游发送http请求已经超时

if (c->write->timedout) {

// 将超时错误传给next方法,next方法根据允许的重传策略决定:重新发起连接执行upstream请求,还是结束upstream请求

ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);

return;

}

......

// header_sent为1时,表示上游服务器的响应需要直接转发给客户端,而且此时响应包头已经转给客户端了

if (u->header_sent) {

// 由于此时已经收到了上游服务器的完整包头,此时不需要再向上游发送请求,因此将write回调设置为空函数

u->write_event_handler = ngx_http_upstream_dummy_handler;

// 将写事件添加到epoll中

(void) ngx_handle_write_event(c->write, 0);

return;

}

// 调用下面函数向上游发送http请求

ngx_http_upstream_send_request(r, u);

}

ngx_http_upstream_send_request_handler更多的是在检测请求的状态,而实际的发送函数是 ngx_http_upstream_send_request,分析流程如下:

static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)

{

......

rc = ngx_http_upstream_send_request_body(r, u, do_write);

// 标识已经向上游发送了请求,实际上是为了标识是否调用过ngx_output_chain,除了第一次,其他时候不需要再传送request_bufs,直接设置为NULL

u->request_sent = 1;

......

// 当写事件仍在定时器中时,先将写事件从定时器中移出,由ngx_output_chain的返回值决定是否需要向定时器中增加写事件

if (c->write->timer_set) {

ngx_del_timer(c->write);

}

// 当ngx_output_chain返回NGX_AGAIN时,说明请求还没有发完,此时需要设置写事件定时器

if (rc == NGX_AGAIN) {

ngx_add_timer(c->write, u->conf->send_timeout);

// 将写事件添加到epoll中

if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {

ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);

return;

}

// 结束ngx_http_upstream_send_request的执行,等待epoll事件触发

return;

}

/* rc == NGX_OK */

// 当ngx_output_chain返回NGX_OK时,表示向上游服务器发送完了所有的请求,将写事件的回调设置为空函数

......

u->write_event_handler = ngx_http_upstream_dummy_handler;

// 重新添加到epoll中

if (ngx_handle_write_event(c->write, 0) != NGX_OK) {

ngx_http_upstream_finalize_request(r, u,

NGX_HTTP_INTERNAL_SERVER_ERROR);

return;

}

// 发送完请求后,需要开始读上游返回的响应,设置读事件的超时时间

ngx_add_timer(c->read, u->conf->read_timeout);

// 当ready已经设置时,说明应答已经到位,调用process_header开始处理来自上游的响应

if (c->read->ready) {

ngx_http_upstream_process_header(r, u);

return;

}

}

static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t do_write)

{

...

// 发送u->request_bufs链表上的请求内容,next指向body,该函数会把未一次发送完的链表缓冲区保存下来,再次调用时不需要request_bufs参数

if (!u->request_sent) {

u->request_sent = 1;

out = u->request_bufs;

if (r->request_body->bufs) {

for (cl = out; cl->next; cl = cl->next) { /* void */ }

cl->next = r->request_body->bufs;

r->request_body->bufs = NULL;

}

}

for ( ;; ) {

if (do_write) {

rc = ngx_output_chain(&u->output, out);

......

}

发送逻辑结束,下面是结束请求逻辑。

结束upstream请求

upstream请求的结束的流程,主要有三个函数:

ngx_http_upstream_finalize_request

ngx_http_upstream_cleanup

ngx_http_upstream_next 其中cleanup和next真正终止upstream时还是会调用到finalize_request函数。

ngx_http_upstream_cleanup函数在启动upstream时,会挂在到请求的cleanup链表中,当HTTP框架结束http请求时一定会调用到upstream_cleanup函数。

static void ngx_http_upstream_cleanup(void *data)

{

ngx_http_request_t *r = data;

ngx_http_upstream_finalize_request(r, r->upstream, NGX_DONE);

}

upstream_cleanup其实是直接调用了ngx_http_upstream_finalize_request,这是我们期望的关闭方式。 而ngx_http_upstream_next函数,是在处理请求的的流程中出现错误才会主动调用到,该函数通过重连服务器、选取新的服务器等策略来提高服务的可用性。

目前nginx的负载均衡的功能就是通过next函数来实现的,这里没有进行详细分析,只简单说明一下。

static void ngx_http_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t ft_type)

{

......

if (status) {

u->state->status = status;

timeout = u->conf->next_upstream_timeout;

// 当tries为0时,才最终结束upstream的请求

if (u->peer.tries == 0

|| !(u->conf->next_upstream & ft_type)

|| (timeout && ngx_current_msec - u->peer.start_time >= timeout))

{

ngx_http_upstream_finalize_request(r, u, status);

return;

}

}

// 由于要发起新的连接,所以需要先关闭和上游服务器的已有连接

if (u->peer.connection) {

if (u->peer.connection->pool) {

ngx_destroy_pool(u->peer.connection->pool);

}

ngx_close_connection(u->peer.connection);

u->peer.connection = NULL;

}

// 重新发起连接

ngx_http_upstream_connect(r, u);

}

最后看一下ngx_http_upstream_finalize_request的具体实现

static void ngx_http_upstream_finalize_request(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_int_t rc)

{

// 将cleanup指向的清理资源回调方法设置为NULL

if (u->cleanup) {

*u->cleanup = NULL;

u->cleanup = NULL;

}

// 释放解析主机域名时分配的资源

if (u->resolved && u->resolved->ctx) {

ngx_resolve_name_done(u->resolved->ctx);

u->resolved->ctx = NULL;

}

......

// 调用http模块实现的finalize_request方法

u->finalize_request(r, rc);

// 释放与上游的连接

if (u->peer.connection) {

if (u->peer.connection->pool) {

ngx_destroy_pool(u->peer.connection->pool);

}

ngx_close_connection(u->peer.connection);

}

u->peer.connection = NULL;

// 删除用于缓存响应的临时文件

if (u->store && u->pipe && u->pipe->temp_file

&& u->pipe->temp_file->file.fd != NGX_INVALID_FILE)

{

if (ngx_delete_file(u->pipe->temp_file->file.name.data)

== NGX_FILE_ERROR)

......

}

......

// 最后还是调用HTTP框架提供的方法结束请求

ngx_http_finalize_request(r, rc);

}

END

0 人点赞