ngixn mirror及子请求源码分析 – 作者:stan1y

核心数据结构

两个核心数据结构:

1.每个请求都有的postponed链表。一般情况下每个链表节点保存了该请求的一个子请求。

struct ngx_http_postponed_request_s {
    ngx_http_request_t               *request; 
    ngx_chain_t                      *out;
    ngx_http_postponed_request_t     *next;
};

2.posted_requests链表,它挂载了当前需要遍历的请求(链表上的节点), 该链表保存在主请求(根节点)的posted_requests字段。

struct ngx_http_posted_request_s {
    ngx_http_request_t               *request;
    ngx_http_posted_request_t        *next;
};

mirror配置

server {

    listen 10.10.10.10:8080;
    location / {
            mirror /mirror;
            proxy_pass http://vedio_play_support;
            session_sticky_hide_cookie  upstream=vedio_play_support;
    }
    location = /mirror {
        internal;
        proxy_pass http://backend$request_uri;
    }
}

mirror_handler处理

handler处于precontent阶段

static ngx_int_t ngx_http_mirror_handler(ngx_http_request_t *r)
{
...
    mlcf = ngx_http_get_module_loc_conf(r, ngx_http_mirror_module);

    // 接收包体打开
    if (mlcf->request_body) {
        rc = ngx_http_read_client_request_body(r, ngx_http_mirror_body_handler);
    }

    return ngx_http_mirror_handler_internal(r);
...
}

// 该handler是接收完包体的回调函数
static void ngx_http_mirror_body_handler(ngx_http_request_t *r)
{
    ngx_http_mirror_ctx_t  *ctx;
    ctx = ngx_http_get_module_ctx(r, ngx_http_mirror_module);
    
    // 子请求在这个函数中被创建,与主请求关联
    ctx->status = ngx_http_mirror_handler_internal(r);

    r->preserve_body = 1;

    // 主请求继续run phases

    r->write_event_handler = ngx_http_core_run_phases;

    ngx_http_core_run_phases(r);
}

static ngx_int_t ngx_http_mirror_handler_internal(ngx_http_request_t *r)
{
...
    for (i = 0; i < mlcf->mirror->nelts; i++) {
        // 遍历mirror数组,生成subrequest
        if (ngx_http_subrequest(r, &name[i], &r->args, &sr, NULL, NGX_HTTP_SUBREQUEST_BACKGROUND) != NGX_OK)
        {
            return NGX_HTTP_INTERNAL_SERVER_ERROR;

        }
        sr->header_only = 1;
        sr->method = r->method;
        sr->method_name = r->method_name;
    }

    return NGX_DECLINED;
...
}

子请求处理

ngx_int_t 
ngx_http_subrequest(ngx_http_request_t *r, ngx_str_t *uri, ngx_str_t *args, ngx_http_request_t **psr, ngx_http_post_subrequest_t *ps, ngx_uint_t flags)
{
...
    // 申请一个新的request
    sr = ngx_pcalloc(r->pool, sizeof(ngx_http_request_t));
    if (sr == NULL) {
        return NGX_ERROR;
    }

    // 如果不是background模式(mirror默认使用这个模式),会申请ngx_http_postponed_request_t结构体(包装子请求)
    //将sr赋值给pr->request,并把pr挂到r->postponed的链表里(sr设置为r的子节点)

    if (!sr->background) {
        if (c->data == r && r->postponed == NULL) {
            c->data = sr;
        }

        pr = ngx_palloc(r->pool, sizeof(ngx_http_postponed_request_t));
        if (pr == NULL) {
            return NGX_ERROR;     
        }

        pr->request = sr;
        pr->out = NULL;
        pr->next = NULL;

        if (r->postponed) {

            for (p = r->postponed; p->next; p = p->next) { /* void */ }
            p->next = pr;
        } else {
            r->postponed = pr;
        }
    }

    sr->internal = 1;
    sr->method = NGX_HTTP_GET;
    sr->read_event_handler = ngx_http_request_empty_handler;
    sr->write_event_handler = ngx_http_handler;
    r->main->count++;
    *psr = sr;
    
    // 将当前这个sr挂到main request的posted_requests中。

    return ngx_http_post_request(sr, NULL);
...
}

ngx_int_t ngx_http_post_request(ngx_http_request_t *r, ngx_http_posted_request_t *pr)
{
...
    // pr是null,申请一个pr

    if (pr == NULL) {
        pr = ngx_palloc(r->pool, sizeof(ngx_http_posted_request_t));
        if (pr == NULL) {
            return NGX_ERROR;
        }
    }

    // 将当前的子请求赋值给pr->request

    pr->request = r;
    pr->next = NULL;

    for (p = &r->main->posted_requests; *p; p = &(*p)->next) { /* void */ }

    // 找到main request的posted_requests,给放到末尾
    *p = pr;
    ...
}

ngx_http_run_posted_requests

走到这时说明子请求创建完毕,通常子请求的创建都发生在某个请求的content handler或者某个filter内。

子请求并没有马上被执行,只是被挂载在了主请求的posted_requests链表中。

posted_requests链表是在ngx_http_run_posted_requests函数中遍历。

在某个请求的读(写)事件的handler中,执行完该请求相关的处理后被调用。

比如主请求在走完一遍PHASE的时候会调用ngx_http_run_posted_requests。这时子请求得以运行。

void
ngx_http_run_posted_requests(ngx_connection_t *c)
{
    ngx_http_request_t         *r;
    
    ngx_http_posted_request_t  *pr;

    for ( ;; ) {

        if (c->destroyed) {
            return;
        }

        r = c->data;

        // 从main开始遍历他的所有子请求

        pr = r->main->posted_requests;

        if (pr == NULL) {
            return;
        }

        r->main->posted_requests = pr->next;

        r = pr->request;

        ngx_http_set_log_request(c->log, r);

        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http posted request: \"%V?%V\"", &r->uri, &r->args);

        // 调用write event handller, mirror的handler就是ngx_http_handler

        r->write_event_handler(r);
    }
}

void
ngx_http_handler(ngx_http_request_t *r)
{

    ngx_http_core_main_conf_t  *cmcf;

    r->connection->log->action = NULL;

    if (!r->internal) {

        switch (r->headers_in.connection_type) {
        case 0:

            r->keepalive = (r->http_version > NGX_HTTP_VERSION_10);
            break;

        case NGX_HTTP_CONNECTION_CLOSE:
            r->keepalive = 0;
            break;

        case NGX_HTTP_CONNECTION_KEEP_ALIVE:
            r->keepalive = 1;
            break;
        }

        r->lingering_close = (r->headers_in.content_length_n > 0 || r->headers_in.chunked);

        r->phase_handler = 0;

        } else {
            // 对于mirror来说,因为是interal,从server_rewrite开始
            cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
            r->phase_handler = cmcf->phase_engine.server_rewrite_index;
        }

        r->valid_location = 1;

        #if (NGX_HTTP_GZIP)

        r->gzip_tested = 0;
        r->gzip_ok = 0;
        r->gzip_vary = 0;

        #endif

        // 从头开始执行phases
        r->write_event_handler = ngx_http_core_run_phases;
        ngx_http_core_run_phases(r);
}

ngx_http_process_request_headers处理

static void ngx_http_process_request_headers(ngx_event_t *rev)
{
...
    /* a whole header has been parsed successfully */

    rc = ngx_http_process_request_header(r);

    if (rc != NGX_OK) {
        break;
    }
    ngx_http_process_request(r);

    // 运行子请求
    ngx_http_run_posted_requests(c);
...
}

void ngx_http_process_request(ngx_http_request_t *r)
{
...

c->read->handler = ngx_http_request_handler;

c->write->handler = ngx_http_request_handler;

r->read_event_handler = ngx_http_block_reading;

ngx_http_handler(r);

...

}

void ngx_http_handler(ngx_http_request_t *r)
{
...

    if (!r->internal) {

        switch (r->headers_in.connection_type) {
        case 0:
            r->keepalive = (r->http_version > NGX_HTTP_VERSION_10);
            break;
        case NGX_HTTP_CONNECTION_CLOSE:
            r->keepalive = 0;
            break;
        case NGX_HTTP_CONNECTION_KEEP_ALIVE:
            r->keepalive = 1;
            break;
        }

        r->lingering_close = (r->headers_in.content_length_n > 0 || r->headers_in.chunked);

        r->phase_handler = 0;
    } else {
        cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
        r->phase_handler = cmcf->phase_engine.server_rewrite_index;
    }

    r->write_event_handler = ngx_http_core_run_phases;

    ngx_http_core_run_phases(r);
...
}

对于mirror来说,是从server rewrite开始一直到upstream,结束之后,对这个request进行finalize

void ngx_http_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
...
    // 子请求,且有回调函数,执行

    if (r != r->main && r->post_subrequest) {
        rc = r->post_subrequest->handler(r, r->post_subrequest->data, rc);
    }

    // 子请求

    if (r != r->main) {

        // background直接关闭request
        if (r->background) {
            r->done = 1;
            ngx_http_finalize_connection(r);
            return;
        }
        /* 该子请求还有未处理完的数据或者子请求 */
        if (r->buffered || r->postponed) {
            // 添加一个该子请求的写事件,并设置合适的write event hander,
            //以便下次写事件来的时候继续处理,这里实际上下次执行时会调用ngx_http_output_filter函数,
            //最终还是会进入ngx_http_postpone_filter进行处理
            if (ngx_http_set_write_handler(r) != NGX_OK) {
                ngx_http_terminate_request(r, 0);
            }
            return;
        }
        pr = r->parent;
        //该子请求处理完毕,如它有发送数据的权利,将权利移交给父请求,c->data指向正在处理的请求,和当前request相同说明是子请求移交过来的
        if (r == c->data) {
            r->main->count--;
            r->done = 1;
            // 如果该子请求不是提前完成,则从父请求的postponed链表中删除 
            if (pr->postponed && pr->postponed->request == r) {
                pr->postponed = pr->postponed->next;
            }
            // 将c->data指向parent
            c->data = pr;
        } else {
            // 到这里表明该子请求提前执行完成,且它没有产生任何数据,则它下次再次获得
            //将会执行ngx_http_request_finalzier函数,它实际上是执行 ngx_http_finalzie_request(r,0),什么都不做直到轮到它发送数据时,
            //ngx_http_finalzie_request函数会将它从父请求的postponed链表中删除
            r->write_event_handler = ngx_http_request_finalizer;

            if (r->waited) {
                r->done = 1;
            }
        }

        // 将父请求加入posted_request队尾 
        if (ngx_http_post_request(pr, NULL) != NGX_OK) {
            r->main->count++;
            ngx_http_terminate_request(r, 0);
            return;
        }
    }

    // 这里是处理主请求结束的逻辑,如果主请求有未发送的数据或者未处理的子请求
    //则给主请求添加写事件,并设置合适的write event hander,
    //以便下次写事件来的时候继续处理
    if (r->buffered || c->buffered || r->postponed || r->blocked) {
        if (ngx_http_set_write_handler(r) != NGX_OK) {
            ngx_http_terminate_request(r, 0);
        }
        return;
    }
...
}

body response响应处理逻辑

nginx作为反向代理,upstream处理完请求后,开始发送response

static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
...

    ngx_http_upstream_send_response(r, u);
...
}

static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
...
    rc = ngx_http_send_header(r);
    p->output_filter = ngx_http_upstream_output_filter;
...

}

static ngx_int_t ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)
{
...
    rc = ngx_http_output_filter(r, chain);
...
}
ngx_int_t ngx_http_send_header(ngx_http_request_t *r)
{
    return ngx_http_top_header_filter(r);
}

ngx_int_t ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    rc = ngx_http_top_body_filter(r, in);
}

output filter是发送output的filter,里面会调用top_body_filter。

这个链表里面有ngx_http_postpone_filter,这个是组织回包格式的filter。

ngx_int_t ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;
    c = r->connection;

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http output filter \"%V?%V\"", &r->uri, &r->args);

    rc = ngx_http_top_body_filter(r, in);

    if (rc == NGX_ERROR) {
        /* NGX_ERROR may be returned by any filter */
        c->error = 1;
    }
    return rc;
}

static ngx_int_t ngx_http_postpone_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
...
    ngx_connection_t              *c;
    ngx_http_postponed_request_t  *pr;

    //取得当前的链接
    c = r->connection;

    //如果r不等于c->data,前面的分析知道c->data保存的是最新的一个sub request(同级的话,是第一个),因此不等于则说明是需要保存数据的父request。

    if (r != c->data) {
        if (in) {
            //保存数据
            ngx_http_postpone_filter_add(r, in);
            //这里注意不发送任何数据,直接返回OK,会在finalize_request中处理。
            return NGX_OK;
        }
        return NGX_OK;
    }

    //如果r->postponed为空,则说明是最后一个sub request,也就是最新的那个,因此需要将它先发送出去。
    if (r->postponed == NULL) {

        //如果in存在,则发送出去

        if (in || c->buffered) {
            return ngx_http_next_filter(r->main, in);
        }
        return NGX_OK;
    }
...
}

子请求

子请求不存在响应头部的概念。

子请求读事件设置为空,子请求不受前段控制。

来源:freebuf.com 2020-12-02 00:11:37 by: stan1y

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发

请登录后发表评论