nginx upstream使用及源碼解析
來源:程序員人生 發布時間:2014-10-10 08:00:00 閱讀次數:3058次
nginx upstream機制使得nginx可以成為一個反向代理服務器,nginx一方面從下游客戶端接收http請求,處理請求,并根據請求發送tcp報文到上游服務器,根據上游服務器的返回報文,來向下游客戶端發送請求響應報文。
upstream機制也提供了負載分擔的功能,可以將請求負載分擔到集群服務器的某個服務器上面。
2.1upstream的流程介紹
1分析客戶端請求報文,構建發往上游服務器的請求報文。
2調用ngx_http_upstream_init開始與上游服務器建立tcp連接。
3發送在第一步中組建的請求報文。
4接收來自上游服務器的響應頭并進行解析,往下游轉發。
5接收來自上游服務器的相應體,進行轉發。
在這5個階段中,upstream機制允許開發人員自己設定相應的處理方式,來達到自己的目的,這也是開發人員使用upstream的方式。
2.2upstream的使用
開發人員使用upstream機制時,主要就是設置上面五個階段的處理回調函數。
以http反向代理為例:
ngx_http_proxy_handler(ngx_http_request_t *r)
{
:
:
:
//設置http proxy使用到的upstream機制的各種方法
//設置創建請求報文的回調函數
u->create_request = ngx_http_proxy_create_request;
//設置當鏈接失敗時,需要執行的動作
u->reinit_request = ngx_http_proxy_reinit_request;
//設置處理上游服務器的響應頭回調函數
u->process_header = ngx_http_proxy_process_status_line;
//當前無意義
u->abort_request = ngx_http_proxy_abort_request;
//請求結束后會調用該方法
u->finalize_request = ngx_http_proxy_finalize_request;
//設置upstream的buffer標志位,為0時,以下游網速優先,
//不會使用文件緩存響應包體,為1時,有多個buffer,并且
//可以使用文件來緩存響應包體
u->buffering = plcf->upstream.buffering;
//當buffering為1時會使用到該pipe結構,即下游網速優先,需要使用更多的buffer和臨時文件緩存響應
u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t));
if (u->pipe == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
u->pipe->input_filter = ngx_event_pipe_copy_input_filter;
u->accel = 1;
//開始讀取請求包體,讀取結束后,開始調用ngx_http_upstream_init,
//開始upstream的流程
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
:
:
return NGX_DONE;
}
2.3upstream源碼解析(1.0.15版本)
2.3.1構建發往上游服務器的請求,建立與上游服務器的連接
主要函數是ngx_http_upstream_init_request,該函數會調用用戶注冊的請求構建函數去構建發往上游服務器的請求,同時將建立與上游服務器的連接。首先介紹兩個輔助函數:
ngx_http_upstream_rd_check_broken_connection:該函數用來檢查nginx與客戶端之間的鏈路是否可用,
ngx_http_upstream_connect:該函數用來與上游服務器之間建立連接。
static void
ngx_http_upstream_check_broken_connection(ngx_http_request_t *r,
ngx_event_t *ev)
{
int n;
char buf[1];
ngx_err_t err;
ngx_int_t event;
ngx_connection_t *c;
ngx_http_upstream_t *u;
c = r->connection;
u = r->upstream;
//若連接已終止的話,該recv返回值會為0,MSG_PEEK表示會去
//讀取數據,但不會減少接收緩存中的數據,在這里讀取1
//個字節,來判斷讀方向能否正常工作
n = recv(c->fd, buf, 1, MSG_PEEK);
err = ngx_socket_errno;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, err,
"http upstream recv(): %d", n);
//ev->write表明是寫方向觸發的事件,讀方向能讀到數據,
//或者返回碼為NGX_eagain,表明應該沒有問題
if (ev->write && (n >= 0 || err == NGX_EAGAIN)) {
return;
}
if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) {
event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT;
if (ngx_del_event(ev, event, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
//能用該socket讀出數據,說明連接沒有問題
if (n > 0) {
return;
}
//返回值為-1,但錯誤為NGX_EAGAIN,表明recv超時時間到了
if (n == -1) {
if (err == NGX_EAGAIN) {
return;
}
//其他情況表明發生錯誤
ev->error = 1;
} else { //n=0,一般表示連接已經結束
err = 0;
}
//設置事件的標記位,標記已經結束了
ev->eof = 1;
c->error = 1;
if (!u->cacheable && u->peer.connection) {
ngx_log_error(NGX_LOG_INFO, ev->log, err,
"client prematurely closed connection, "
"so upstream connection is closed too");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_CLIENT_CLOSED_REQUEST);
return;
}
ngx_log_error(NGX_LOG_INFO, ev->log, err,
"client prematurely closed connection");
if (u->peer.connection == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_CLIENT_CLOSED_REQUEST);
}
}
2 ngx_http_upstream_connect
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_time_t *tp;
ngx_connection_t *c;
r->connection->log->action = "connecting to upstream";
r->connection->single_connection = 0;
//記錄下當前的響應秒數和毫秒數
if (u->state && u->state->response_sec) {
tp = ngx_timeofday();
u->state->response_sec = tp->sec - u->state->response_sec;
u->state->response_msec = tp->msec - u->state->response_msec;
}
u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
//記錄下當前的響應秒數和毫秒數
tp = ngx_timeofday();
u->state->response_sec = tp->sec;
u->state->response_msec = tp->msec;
//開始連接上游服務器
rc = ngx_event_connect_peer(&u->peer);
//printf("@@@@####rc is %d
", (int)rc);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream connect: %i", rc);
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->state->peer = u->peer.name;
//在busy或者declined的情況下,會調用ngx_http_upstream_next,該函數會
//多次嘗試調用connect試圖與上游服務器連接,多次連接失敗后,
//才會調用ngx_http_upstream_finalize_request
if (rc == NGX_BUSY) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
return;
}
if (rc == NGX_DECLINED) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/* rc == NGX_OK || rc == NGX_AGAIN */
c = u->peer.connection;
c->data = r;
//將客戶端與上游服務器的連接的讀寫事件的處理回調設置為
//ngx_http_upstream_handler
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;
//ngx_http_upstream_handler最后會調用u->write_event_handler或者read_event_handler
u->write_event_handler = ngx_http_upstream_send_request_handler;
u->read_event_handler = ngx_http_upstream_process_header;
c->sendfile &= r->connection->sendfile;
u->output.sendfile = c->sendfile;
c->pool = r->pool;
c->log = r->connection->log;
c->read->log = c->log;
c->write->log = c->log;
/* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
u->writer.out = NULL;
u->writer.last = &u->writer.out;
u->writer.connection = c;
u->writer.limit = 0;
if (u->request_sent) {
if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
if (r->request_body
&& r->request_body->buf
&& r->request_body->temp_file
&& r == r->main)
{
/*
* the r->request_body->buf can be reused for one request only,
* the subrequests should allocate their own temporay bufs
*/
u->output.free = ngx_alloc_chain_link(r->pool);
if (u->output.free == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->output.free->buf = r->request_body->buf;
u->output.free->next = NULL;
u->output.allocated = 1;
r->request_body->buf->pos = r->request_body->buf->start;
r->request_body->buf->last = r->request_body->buf->start;
r->request_body->buf->tag = u->output.tag;
}
u->request_sent = 0;
//與上游連接尚未建立起來,加入定時器,返回
//當與上游服務器連接建立成功會調用相關的處理函數
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}
#if (NGX_HTTP_SSL)
if (u->ssl && c->ssl == NULL) {
ngx_http_upstream_ssl_init_connection(r, u, c);
return;
}
#endif
//已經建立連接,向上游服務器發送請求內容
ngx_http_upstream_send_request(r, u);
}
3 ngx_http_upstream_init_request
static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
ngx_str_t *host;
ngx_uint_t i;
ngx_resolver_ctx_t *ctx, temp;
ngx_http_cleanup_t *cln;
ngx_http_upstream_t *u;
ngx_http_core_loc_conf_t *clcf;
ngx_http_upstream_srv_conf_t *uscf, **uscfp;
ngx_http_upstream_main_conf_t *umcf;
if (r->aio) {
return;
}
u = r->upstream;
u->store = (u->conf->store || u->conf->store_lengths);
//ignore_client_abort為0標志著需要關注nginx和客戶端的連接是否穩定
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
}
//從代碼來看,request_bufs貌似是在create_request中設置的
if (r->request_body) {
u->request_bufs = r->request_body->bufs;
}
//調用用戶設置的create_request函數
if (u->create_request(r) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
//u->conf->local中保存的是與上游服務建立連接的本地地址
u->peer.local = u->conf->local;
//得到http core模塊在該loc下的配置
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
//設置upstream向下游客戶端轉發數據的各種參數,主要和
//buf相關
u->output.alignment = clcf->directio_alignment;
u->output.pool = r->pool;
u->output.bufs.num = 1;
u->output.bufs.size = clcf->client_body_buffer_size;
//往下游客戶端寫數據的接口
u->output.output_filter = ngx_chain_writer;
u->output.filter_ctx = &u->writer;
u->writer.pool = r->pool;
if (r->upstream_states == NULL) {
r->upstream_states = ngx_array_create(r->pool, 1,
sizeof(ngx_http_upstream_state_t));
if (r->upstream_states == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
} else {
u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
}
//將ngx_http_upstream_cleanup函數加入到request的cleanup鏈表中,
//當request被刪除時,會調用該函數
cln = ngx_http_cleanup_add(r, 0);
if (cln == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
//ngx_http_upstream_cleanup主要釋放resolve數據結構,執行ngx_http_upstream_finalize
cln->handler = ngx_http_upstream_cleanup;
cln->data = r;
u->cleanup = &cln->handler;
//u->resolved中保存了用于與上游服務器建立連接的信息,
//可以由開發人員在代碼中設置,不設置的話,從配置文件中
//去獲取
if (u->resolved == NULL) {
uscf = u->conf->upstream;
} else {
//upstream中直接指定了相關的服務器地址,建立連接就ok了
if (u->resolved->sockaddr) {
if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_http_upstream_connect(r, u);
return;
}
//在這里host應該為一個upstream組的名字
host = &u->resolved->host;
umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
uscfp = umcf->upstreams.elts;
//遍歷系統中的upstream數組,找到匹配的upstream
for (i = 0; i < umcf->upstreams.nelts; i++) {
uscf = uscfp[i];
if (uscf->host.len == host->len
&& ((uscf->port == 0 && u->resolved->no_port)
|| uscf->port == u->resolved->port)
&& ngx_memcmp(uscf->host.data, host->data, host->len) == 0)
{
goto found;
}
}
if (u->resolved->port == 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no port in upstream "%V"", host);
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
temp.name = *host;
//下面這部分需要進行域名解析
ctx = ngx_resolve_start(clcf->resolver, &temp);
if (ctx == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
if (ctx == NGX_NO_RESOLVER) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no resolver defined to resolve %V", host);
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
return;
}
//ngx_http_upstream_resolve_handler是域名解析后的回調函數
ctx->name = *host;
ctx->type = NGX_RESOLVE_A;
ctx->handler = ngx_http_upstream_resolve_handler;
ctx->data = r;
ctx->timeout = clcf->resolver_timeout;
u->resolved->ctx = ctx;
if (ngx_resolve_name(ctx) != NGX_OK) {
u->resolved->ctx = NULL;
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
found:
//peer.init()方法中會根據upstream的算法去選擇一個服務器,來進行發送
//for example:us->peer.init = ngx_http_upstream_init_ip_hash_peer;
if (uscf->peer.init(r, uscf) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
//與上游服務器建立連接
ngx_http_upstream_connect(r, u);
}
2.3.2往上游發送請求
當建立了與上游服務器的連接后,就會向上游服務器發送請求,主要函數是ngx_http_upstream_send_request。
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_connection_t *c;
//peer.connection中是nginx與上游服務器建立的connection
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream send request");
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
c->log->action = "sending request to upstream";
//通過ngx_output_chain向上游服務器發送請求報文,request_sent
//用來表示是否已經發送請求頭了,發送了的話,繼續發送
//剩余未發的就OK了,剩余未發送的數據保存在了u->output里面
rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
//設置request_sent標志,表明已經發送過請求
u->request_sent = 1;
if (rc == NGX_ERROR) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
//若寫事件已經被加入到了定時器中,刪除它,為后面的
//添加做準備
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
//NGX_AGAIN表明數據尚未發送完畢,需要將其加入到定時器中
//當發送事件觸發時,會繼續調用該函數。
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->send_timeout);
//主要是設置發送緩存的事件喚醒下限
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
/* rc == NGX_OK */
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == NGX_ERROR) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
ngx_tcp_push_n " failed");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
}
//數據發送成功,添加一個讀事件定時器
ngx_add_timer(c->read, u->conf->read_timeout);
#if 1
//寫事件已經發出,判斷讀事件是否ready
if (c->read->ready) {
/* post aio operation */
/*
* TODO comment
* although we can post aio operation just in the end
* of ngx_http_upstream_connect() CHECK IT !!!
* it's better to do here because we postpone header buffer allocation
*/
//讀事件已經ready了,處理返回的報文頭
ngx_http_upstream_process_header(r, u);
return;
}
#endif
//將寫事件處理函數置為dummy的話,表明在讀完相應之前,不允許
//接著寫了
u->write_event_handler = ngx_http_upstream_dummy_handler;
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
2.3.3處理上游服務器返回的回應頭部
往上游服務器發送完請求后,就要等待著處理服務器的回應了,首先會去處理服務器發回的響應頭。處理函數是ngx_http_upstream_process_header.
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ssize_t n;
ngx_int_t rc;
ngx_connection_t *c;
c = u->peer.connection;
c->log->action = "reading response header from upstream";
//讀事件超時
if (c->read->timedout) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
//測試與upstream服務器的連通性
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
//尚未實質分配數據緩沖區
if (u->buffer.start == NULL) {
//分配數據緩沖區
u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
if (u->buffer.start == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
//對緩沖區描述符進行初始化
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
u->buffer.end = u->buffer.start + u->conf->buffer_size;
u->buffer.temporary = 1;
u->buffer.tag = u->output.tag;
//為收到的請求頭們創建ngx_list結構,用來存貯解析到的
//請求頭的名值對
if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
sizeof(ngx_table_elt_t))
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
//準備接受數據吧!!!狂奔的小怪獸
for ( ;; ) {
n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
//數據尚未接收完畢
if (n == NGX_AGAIN) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
//返回值為0,標志upstream服務器關閉了連接
if (n == 0) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream prematurely closed connection");
}
if (n == NGX_ERROR || n == 0) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
u->buffer.last += n;
//處理接收到響應頭數據
rc = u->process_header(r);
//響應頭尚未接收完畢
if (rc == NGX_AGAIN) {
//buffer已經滿了,無法容納更多的響應頭部
if (u->buffer.last == u->buffer.end) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream sent too big header");
ngx_http_upstream_next(r, u,
NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
continue;
}
break;
}
//解析到了無效錯誤頭,真真苦逼啊
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
/* rc == NGX_OK */
//頭部處理完畢,頭部返回碼大于300
if (u->headers_in.status_n > NGX_HTTP_SPECIAL_RESPONSE) {
if (r->subrequest_in_memory) {
u->buffer.last = u->buffer.pos;
}
if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
return;
}
//處理錯誤碼大于300的錯誤情況,比如404錯誤,
//頁面沒找到
if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
return;
}
}
//對u->headers_in中的頭部進行處理過濾,把u->headers_in中的
//各個頭部信息挪到r->headers_out里面,以便于發送
if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
return;
}
//不是子請求,需要轉發響應體
if (!r->subrequest_in_memory) {
//調用該函數,先轉發響應頭,再轉發響應體
ngx_http_upstream_send_response(r, u);
return;
}
/* subrequest content in memory */
//以下為子請求的處理流程,當子請求的input_filter未設置時,
//其默認的input_filter方法為ngx_http_upstream_non_buffered_filter,
//即不轉發收到的響應
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
//buffer.last和buffer.pos之間是多余的包體
n = u->buffer.last - u->buffer.pos;
//下面對這段頭部以外的包體進行處理
if (n) {
u->buffer.last -= n;
u->state->response_length += n;
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
//表明包體已經全部處理完畢,可以結束請求類
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
}
//設置接收事件的處理函數
u->read_event_handler = ngx_http_upstream_process_body_in_memory;
//在該函數中調用u->input_filter對后續包體進行處理,
//該函數是針對子請求來說的,不轉發包體,在內存中
//對包體進行處理
ngx_http_upstream_process_body_in_memory(r, u);
2.3.4處理響應包體
處理完返回的響應頭就要處理響應包體了,處理響應包體比較復雜,在子請求的情況下,不用轉發響應包體,處理一下就可以了,在upstream模式下,需要轉發接收到的請求,這時有下游網速優先和上游網速優先兩種,下游網速優先,假設下游網速比上游快,因此分配了一塊固定大小的buffer緩沖區去接收數據,同時進行轉發,上游網速優先,假設上游網速比下游快,因此需要使用多塊buffer緩沖區去緩存數據,同時必要時,使用臨時文件來緩存接收到的數據。
ngx_http_upstream_process_body_in_memory:該函數用來處理子請求的情形,不轉發響應包體。
ngx_http_upstream_send_response:該函數用來處理轉發響應包體的情形,該函數會轉發響應頭和響應體,轉發響應體時同時考慮了上游網速優先和下游網速優先兩種情況。
1ngx_http_upstream_process_body_in_memory
static void
ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
size_t size;
ssize_t n;
ngx_buf_t *b;
ngx_event_t *rev;
ngx_connection_t *c;
//c是nginx與upstream上游服務器之間建立的連接
c = u->peer.connection;
rev = c->read;
//讀事件超時,結束upstream
if (rev->timedout) {
return;
}
//u->buffer用來保存讀取的數據
b = &u->buffer;
for ( ;; ) {
size = b->end - b->last;
if (size == 0) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
//讀取相關數據到u->buffer中
n = c->recv(c, b->last, size);
//沒有數據可讀了,等待下一次處理
if (n == NGX_AGAIN) {
break;
}
//對端已經結束了該連接或者發生了錯誤
if (n == 0 || n == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, n);
return;
}
//response_length記錄了已接收相應的長度
u->state->response_length += n;
//對接收到的數據進行處理,一般子請求會重置該方法,
//未設置的話,則會默認為ngx_http_upstream_non_buffered_filter,該
//方法僅僅是設置下該buffer以便繼續接收數據
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
//接收方向未ready退出
if (!rev->ready) {
break;
}
}
//設置讀事件
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (rev->active) {
ngx_add_timer(rev, u->conf->read_timeout);
} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
2 ngx_http_upstream_send_response
該函數會往客戶端發送響應頭及轉發響應體,根據不同的設置來調用不同的包體轉發。
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
int tcp_nodelay;
ssize_t n;
ngx_int_t rc;
ngx_event_pipe_t *p;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
//發送回應頭部,回應頭部存放在request的headers_in里面,
//在這里,有可能頭部沒有發送完畢,沒關系,未發送
//完的數據在request的out鏈表里面放著呢,接著處理下面的
//響應包體即可
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
//c是客戶端與nginx之間建立的連接
c = r->connection;
if (r->header_only) {
if (u->cacheable || u->store) {
if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) {
ngx_connection_error(c, ngx_socket_errno,
ngx_shutdown_socket_n " failed");
}
r->read_event_handler = ngx_http_request_empty_handler;
r->write_event_handler = ngx_http_request_empty_handler;
c->error = 1;
} else {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
}
//將header_sent置位,表示響應頭部已經發送了
u->header_sent = 1;
//請求中帶有包體,且包體被保存在了臨時文件里面,
//現在這些臨時文件沒有用了,可以清理掉了,OK,
//畢竟,服務器的回應都來了,應該沒問題了
if (r->request_body && r->request_body->temp_file) {
ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
}
//獲得http core在該loc下的配置
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
//u->buffering為0表示下游網速優先,不需要開辟更多的緩存區
//來存放相關回應報文
if (!u->buffering) {
//未設置input_filter的話,設置默認的處理函數,input_filter是對
//在buffer中接收到的數據進行相應處理,感覺主要有兩個功能
//一是把相關buffer掛到out鏈表,一是對內容進行過濾
if (u->input_filter == NULL) {
//啥都不做
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
//該函數試圖在buffer中緩存所有的數據,會操作設置ngx_buf中的
//各個字段
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
//設置upstream讀事件的處理回調函數
u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
//設置request寫事件的處理回調函數
r->write_event_handler =
ngx_http_upstream_process_non_buffered_downstream;
r->limit_rate = 0;
//調用input_filter之前進行初始化
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
tcp_nodelay = 1;
if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
(const void *) &tcp_nodelay, sizeof(int)) == -1)
{
ngx_connection_error(c, ngx_socket_errno,
"setsockopt(TCP_NODELAY) failed");
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
c->tcp_nodelay = NGX_TCP_NODELAY_SET;
}
//buffer.last與buffer.pos之間是剩余未被處理的數據
n = u->buffer.last - u->buffer.pos;
//n>0,說明buffer中有未被轉發的響應包體
if (n) {
//在這里設置該last是為了在input_filter中處理時,對其
//進行重置
u->buffer.last = u->buffer.pos;
//將響應包體的長度加上n
u->state->response_length += n;
//在input_filter中處理此次接收到的數據
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
//在該函數中,開始向下游客戶端發送響應包體,
//發送完數據還會從上游接收包體
ngx_http_upstream_process_non_buffered_downstream(r);
} else {
//該buffer中目前僅有頭部,沒有回應包體,那下次
//從頭部接收就可以了
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
//有數據可以進行處理,處理上游數據,在該函數中
//收完上游包體也會往下游發送相應。
if (u->peer.connection->read->ready) {
ngx_http_upstream_process_non_buffered_upstream(r, u);
}
}
return;
}
/* TODO: preallocate event_pipe bufs, look "Content-Length" */
//下面這部分是buffer為1的情況,該情況允許nginx使用更多的buffer
//去緩存包體數據,或者使用文件來進行緩存
p = u->pipe;
//對pipe結構進行初始化,該結構專用于上游網速優先的情況
//設置向下游發送響應的調用函數
p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
p->output_ctx = r;
p->tag = u->output.tag;
//設置可以使用的緩沖區的個數
p->bufs = u->conf->bufs;
//設置busy緩沖區中待發送的響應長度觸發值
p->busy_size = u->conf->busy_buffers_size;
p->upstream = u->peer.connection;
p->downstream = c;
p->pool = r->pool;
p->log = c->log;
p->cache
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
------分隔線----------------------------
------分隔線----------------------------