核心数据结构
两个核心数据结构:
1.每个请求都有的postponed链表。一般情况下每个链表节点保存了该请求的一个子请求。
struct ngx_http_postponed_request_s { ngx_http_request_t *request; ngx_chain_t *out; ngx_http_postponed_request_t *next; };
2.posted_requests链表,它挂载了当前需要遍历的请求(链表上的节点), 该链表保存在主请求(根节点)的posted_requests字段。
struct ngx_http_posted_request_s { ngx_http_request_t *request; ngx_http_posted_request_t *next; };
mirror配置
server { listen 10.10.10.10:8080; location / { mirror /mirror; proxy_pass http://vedio_play_support; session_sticky_hide_cookie upstream=vedio_play_support; } location = /mirror { internal; proxy_pass http://backend$request_uri; } }
mirror_handler处理
handler处于precontent阶段
static ngx_int_t ngx_http_mirror_handler(ngx_http_request_t *r) { ... mlcf = ngx_http_get_module_loc_conf(r, ngx_http_mirror_module); // 接收包体打开 if (mlcf->request_body) { rc = ngx_http_read_client_request_body(r, ngx_http_mirror_body_handler); } return ngx_http_mirror_handler_internal(r); ... } // 该handler是接收完包体的回调函数 static void ngx_http_mirror_body_handler(ngx_http_request_t *r) { ngx_http_mirror_ctx_t *ctx; ctx = ngx_http_get_module_ctx(r, ngx_http_mirror_module); // 子请求在这个函数中被创建,与主请求关联 ctx->status = ngx_http_mirror_handler_internal(r); r->preserve_body = 1; // 主请求继续run phases r->write_event_handler = ngx_http_core_run_phases; ngx_http_core_run_phases(r); }
static ngx_int_t ngx_http_mirror_handler_internal(ngx_http_request_t *r) { ... for (i = 0; i < mlcf->mirror->nelts; i++) { // 遍历mirror数组,生成subrequest if (ngx_http_subrequest(r, &name[i], &r->args, &sr, NULL, NGX_HTTP_SUBREQUEST_BACKGROUND) != NGX_OK) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } sr->header_only = 1; sr->method = r->method; sr->method_name = r->method_name; } return NGX_DECLINED; ... }
子请求处理
ngx_int_t ngx_http_subrequest(ngx_http_request_t *r, ngx_str_t *uri, ngx_str_t *args, ngx_http_request_t **psr, ngx_http_post_subrequest_t *ps, ngx_uint_t flags) { ... // 申请一个新的request sr = ngx_pcalloc(r->pool, sizeof(ngx_http_request_t)); if (sr == NULL) { return NGX_ERROR; } // 如果不是background模式(mirror默认使用这个模式),会申请ngx_http_postponed_request_t结构体(包装子请求) //将sr赋值给pr->request,并把pr挂到r->postponed的链表里(sr设置为r的子节点) if (!sr->background) { if (c->data == r && r->postponed == NULL) { c->data = sr; } pr = ngx_palloc(r->pool, sizeof(ngx_http_postponed_request_t)); if (pr == NULL) { return NGX_ERROR; } pr->request = sr; pr->out = NULL; pr->next = NULL; if (r->postponed) { for (p = r->postponed; p->next; p = p->next) { /* void */ } p->next = pr; } else { r->postponed = pr; } } sr->internal = 1; sr->method = NGX_HTTP_GET; sr->read_event_handler = ngx_http_request_empty_handler; sr->write_event_handler = ngx_http_handler; r->main->count++; *psr = sr; // 将当前这个sr挂到main request的posted_requests中。 return ngx_http_post_request(sr, NULL); ... }
ngx_int_t ngx_http_post_request(ngx_http_request_t *r, ngx_http_posted_request_t *pr) { ... // pr是null,申请一个pr if (pr == NULL) { pr = ngx_palloc(r->pool, sizeof(ngx_http_posted_request_t)); if (pr == NULL) { return NGX_ERROR; } } // 将当前的子请求赋值给pr->request pr->request = r; pr->next = NULL; for (p = &r->main->posted_requests; *p; p = &(*p)->next) { /* void */ } // 找到main request的posted_requests,给放到末尾 *p = pr; ... }
ngx_http_run_posted_requests
走到这时说明子请求创建完毕,通常子请求的创建都发生在某个请求的content handler或者某个filter内。
子请求并没有马上被执行,只是被挂载在了主请求的posted_requests链表中。
posted_requests链表是在ngx_http_run_posted_requests函数中遍历。
在某个请求的读(写)事件的handler中,执行完该请求相关的处理后被调用。
比如主请求在走完一遍PHASE的时候会调用ngx_http_run_posted_requests。这时子请求得以运行。
void ngx_http_run_posted_requests(ngx_connection_t *c) { ngx_http_request_t *r; ngx_http_posted_request_t *pr; for ( ;; ) { if (c->destroyed) { return; } r = c->data; // 从main开始遍历他的所有子请求 pr = r->main->posted_requests; if (pr == NULL) { return; } r->main->posted_requests = pr->next; r = pr->request; ngx_http_set_log_request(c->log, r); ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http posted request: \"%V?%V\"", &r->uri, &r->args); // 调用write event handller, mirror的handler就是ngx_http_handler r->write_event_handler(r); } }
void ngx_http_handler(ngx_http_request_t *r) { ngx_http_core_main_conf_t *cmcf; r->connection->log->action = NULL; if (!r->internal) { switch (r->headers_in.connection_type) { case 0: r->keepalive = (r->http_version > NGX_HTTP_VERSION_10); break; case NGX_HTTP_CONNECTION_CLOSE: r->keepalive = 0; break; case NGX_HTTP_CONNECTION_KEEP_ALIVE: r->keepalive = 1; break; } r->lingering_close = (r->headers_in.content_length_n > 0 || r->headers_in.chunked); r->phase_handler = 0; } else { // 对于mirror来说,因为是interal,从server_rewrite开始 cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module); r->phase_handler = cmcf->phase_engine.server_rewrite_index; } r->valid_location = 1; #if (NGX_HTTP_GZIP) r->gzip_tested = 0; r->gzip_ok = 0; r->gzip_vary = 0; #endif // 从头开始执行phases r->write_event_handler = ngx_http_core_run_phases; ngx_http_core_run_phases(r); }
ngx_http_process_request_headers处理
static void ngx_http_process_request_headers(ngx_event_t *rev) { ... /* a whole header has been parsed successfully */ rc = ngx_http_process_request_header(r); if (rc != NGX_OK) { break; } ngx_http_process_request(r); // 运行子请求 ngx_http_run_posted_requests(c); ... }
void ngx_http_process_request(ngx_http_request_t *r) { ... c->read->handler = ngx_http_request_handler; c->write->handler = ngx_http_request_handler; r->read_event_handler = ngx_http_block_reading; ngx_http_handler(r); ... }
void ngx_http_handler(ngx_http_request_t *r) { ... if (!r->internal) { switch (r->headers_in.connection_type) { case 0: r->keepalive = (r->http_version > NGX_HTTP_VERSION_10); break; case NGX_HTTP_CONNECTION_CLOSE: r->keepalive = 0; break; case NGX_HTTP_CONNECTION_KEEP_ALIVE: r->keepalive = 1; break; } r->lingering_close = (r->headers_in.content_length_n > 0 || r->headers_in.chunked); r->phase_handler = 0; } else { cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module); r->phase_handler = cmcf->phase_engine.server_rewrite_index; } r->write_event_handler = ngx_http_core_run_phases; ngx_http_core_run_phases(r); ... }
对于mirror来说,是从server rewrite开始一直到upstream,结束之后,对这个request进行finalize
void ngx_http_finalize_request(ngx_http_request_t *r, ngx_int_t rc) { ... // 子请求,且有回调函数,执行 if (r != r->main && r->post_subrequest) { rc = r->post_subrequest->handler(r, r->post_subrequest->data, rc); } // 子请求 if (r != r->main) { // background直接关闭request if (r->background) { r->done = 1; ngx_http_finalize_connection(r); return; } /* 该子请求还有未处理完的数据或者子请求 */ if (r->buffered || r->postponed) { // 添加一个该子请求的写事件,并设置合适的write event hander, //以便下次写事件来的时候继续处理,这里实际上下次执行时会调用ngx_http_output_filter函数, //最终还是会进入ngx_http_postpone_filter进行处理 if (ngx_http_set_write_handler(r) != NGX_OK) { ngx_http_terminate_request(r, 0); } return; } pr = r->parent; //该子请求处理完毕,如它有发送数据的权利,将权利移交给父请求,c->data指向正在处理的请求,和当前request相同说明是子请求移交过来的 if (r == c->data) { r->main->count--; r->done = 1; // 如果该子请求不是提前完成,则从父请求的postponed链表中删除 if (pr->postponed && pr->postponed->request == r) { pr->postponed = pr->postponed->next; } // 将c->data指向parent c->data = pr; } else { // 到这里表明该子请求提前执行完成,且它没有产生任何数据,则它下次再次获得 //将会执行ngx_http_request_finalzier函数,它实际上是执行 ngx_http_finalzie_request(r,0),什么都不做直到轮到它发送数据时, //ngx_http_finalzie_request函数会将它从父请求的postponed链表中删除 r->write_event_handler = ngx_http_request_finalizer; if (r->waited) { r->done = 1; } } // 将父请求加入posted_request队尾 if (ngx_http_post_request(pr, NULL) != NGX_OK) { r->main->count++; ngx_http_terminate_request(r, 0); return; } } // 这里是处理主请求结束的逻辑,如果主请求有未发送的数据或者未处理的子请求 //则给主请求添加写事件,并设置合适的write event hander, //以便下次写事件来的时候继续处理 if (r->buffered || c->buffered || r->postponed || r->blocked) { if (ngx_http_set_write_handler(r) != NGX_OK) { ngx_http_terminate_request(r, 0); } return; } ... }
body response响应处理逻辑
nginx作为反向代理,upstream处理完请求后,开始发送response
static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u) { ... ngx_http_upstream_send_response(r, u); ... }
static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u) { ... rc = ngx_http_send_header(r); p->output_filter = ngx_http_upstream_output_filter; ... }
static ngx_int_t ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain) { ... rc = ngx_http_output_filter(r, chain); ... }
ngx_int_t ngx_http_send_header(ngx_http_request_t *r) { return ngx_http_top_header_filter(r); }
ngx_int_t ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in) { rc = ngx_http_top_body_filter(r, in); }
output filter是发送output的filter,里面会调用top_body_filter。
这个链表里面有ngx_http_postpone_filter,这个是组织回包格式的filter。
ngx_int_t ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in) { ngx_int_t rc; ngx_connection_t *c; c = r->connection; ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http output filter \"%V?%V\"", &r->uri, &r->args); rc = ngx_http_top_body_filter(r, in); if (rc == NGX_ERROR) { /* NGX_ERROR may be returned by any filter */ c->error = 1; } return rc; }
static ngx_int_t ngx_http_postpone_filter(ngx_http_request_t *r, ngx_chain_t *in) { ... ngx_connection_t *c; ngx_http_postponed_request_t *pr; //取得当前的链接 c = r->connection; //如果r不等于c->data,前面的分析知道c->data保存的是最新的一个sub request(同级的话,是第一个),因此不等于则说明是需要保存数据的父request。 if (r != c->data) { if (in) { //保存数据 ngx_http_postpone_filter_add(r, in); //这里注意不发送任何数据,直接返回OK,会在finalize_request中处理。 return NGX_OK; } return NGX_OK; } //如果r->postponed为空,则说明是最后一个sub request,也就是最新的那个,因此需要将它先发送出去。 if (r->postponed == NULL) { //如果in存在,则发送出去 if (in || c->buffered) { return ngx_http_next_filter(r->main, in); } return NGX_OK; } ... }
子请求
子请求不存在响应头部的概念。
子请求读事件设置为空,子请求不受前段控制。
来源:freebuf.com 2020-12-02 00:11:37 by: stan1y
请登录后发表评论
注册