<span style="font-family: Arial, Helvetica, sans-serif;">Haproxy的frontend到底是怎么进行listen,怎么accept客户端发来的请求呢?这边就按照haproxy 1.4.19进行分析一下。要知道frontend进行了哪些端口的侦听及其相关配置,这就需要解析haproxy的配置文件,这解析配置文件就不进行详细分析了,现在分析一下cfg_parse_listen函数</span>
<pre name="code" class="plain">int cfg_parse_listen(const char *file, int linenum, char **args, int kwm) { static struct proxy *curproxy = NULL; struct server *newsrv = NULL; const char *err; int rc; unsigned val; int err_code = 0; struct acl_cond *cond = NULL; if (!strcmp(args[0], "listen")) rc = PR_CAP_LISTEN; else if (!strcmp(args[0], "frontend")) rc = PR_CAP_FE | PR_CAP_RS; else if (!strcmp(args[0], "backend")) rc = PR_CAP_BE | PR_CAP_RS; else if (!strcmp(args[0], "ruleset")) rc = PR_CAP_RS; else rc = PR_CAP_NONE; if (rc != PR_CAP_NONE) { /* new proxy */ /* listen、frontend、backend、ruleset同一行要有其它参数,至少还需要跟一个参数 ,该参数不能全部是数字,不能有". : _ -"*/ …… if ((curproxy = (struct proxy *)calloc(1, sizeof(struct proxy))) == NULL) { Alert("parsing [%s:%d] : out of memory.\n", file, linenum); err_code |= ERR_ALERT | ERR_ABORT; goto out; } /* 插在链表最前面,proxy全局指针指向链表的第一个元素 */ init_new_proxy(curproxy); curproxy->next = proxy; proxy = curproxy; curproxy->conf.file = file; curproxy->conf.line = linenum; curproxy->last_change = now.tv_sec; curproxy->id = strdup(args[1]); curproxy->cap = rc; /* parse the listener address if any,对于frontend可以有第三个参数,主要是侦听地址和端口,可以是一组ip地址,ip地址用","隔开,侦听端口也可以 * 是一个范围,用"-"来表示范围 ,具体为frontend test *:123-245,xx.xx.xx.xx:598或者frontend test *,xx.xx.xx.xx:598等 */ if ((curproxy->cap & PR_CAP_FE) && *args[2]) { struct listener *new, *last = curproxy->listen; /* 对frontend要解析listen地址和端口,端口可以是一个范围,具体为IP地址(可以是*,ipv4,ipv6地址):port1-port2 * 具体解析ip地址,ip地址分为v4和v6,v4格式为xx.xx.xx.xx,v6为XXXX:XXXX:XXXX:XXXX,对于v4地址还会调用gethostbyname判断地址的非
* 法性,也会根据端口个数listener,此时proxy->listen指向创建的listen链表第一个元素 */ if (!str2listener(args[2], curproxy)) { err_code |= ERR_FATAL; goto out; } new = curproxy->listen; while (new != last) { new->conf.file = file; new->conf.line = linenum; new = new->next; global.maxsock++; } } /* set default values,对新申请的proxy初始化default段相关信息 */ memcpy(&curproxy->defsrv, &defproxy.defsrv, sizeof(curproxy->defsrv)); curproxy->defsrv.id = "default-server"; curproxy->state = defproxy.state; curproxy->options = defproxy.options; curproxy->options2 = defproxy.options2; curproxy->no_options = defproxy.no_options; curproxy->no_options2 = defproxy.no_options2; curproxy->bind_proc = defproxy.bind_proc; curproxy->lbprm.algo = defproxy.lbprm.algo; curproxy->except_net = defproxy.except_net; curproxy->except_mask = defproxy.except_mask; curproxy->except_to = defproxy.except_to; curproxy->except_mask_to = defproxy.except_mask_to; if (defproxy.fwdfor_hdr_len) { curproxy->fwdfor_hdr_len = defproxy.fwdfor_hdr_len; curproxy->fwdfor_hdr_name = strdup(defproxy.fwdfor_hdr_name); } if (defproxy.orgto_hdr_len) { curproxy->orgto_hdr_len = defproxy.orgto_hdr_len; curproxy->orgto_hdr_name = strdup(defproxy.orgto_hdr_name); } if (defproxy.server_id_hdr_len) { curproxy->server_id_hdr_len = defproxy.server_id_hdr_len; curproxy->server_id_hdr_name = strdup(defproxy.server_id_hdr_name); } if (curproxy->cap & PR_CAP_FE) { curproxy->maxconn = defproxy.maxconn; curproxy->backlog = defproxy.backlog; curproxy->fe_sps_lim = defproxy.fe_sps_lim; /* initialize error relocations */ for (rc = 0; rc < HTTP_ERR_SIZE; rc++) chunk_dup(&curproxy->errmsg[rc], &defproxy.errmsg[rc]); curproxy->to_log = defproxy.to_log & ~LW_COOKIE & ~LW_REQHDR & ~ LW_RSPHDR; } if (curproxy->cap & PR_CAP_BE) { curproxy->fullconn = defproxy.fullconn; curproxy->conn_retries = defproxy.conn_retries; if (defproxy.check_req) { curproxy->check_req = calloc(1, defproxy.check_len); memcpy(curproxy->check_req, defproxy.check_req, defproxy.check_len); } curproxy->check_len = defproxy.check_len; if (defproxy.expect_str) { curproxy->expect_str = strdup(defproxy.expect_str); if (defproxy.expect_regex) { /* note: this regex is known to be valid */ curproxy->expect_regex = calloc(1, sizeof(regex_t)); regcomp(curproxy->expect_regex, defproxy.expect_str, REG_EXTENDED); } } if (defproxy.cookie_name) curproxy->cookie_name = strdup(defproxy.cookie_name); curproxy->cookie_len = defproxy.cookie_len; if (defproxy.cookie_domain) curproxy->cookie_domain = strdup(defproxy.cookie_domain); if (defproxy.cookie_maxidle) curproxy->cookie_maxidle = defproxy.cookie_maxidle; if (defproxy.cookie_maxlife) curproxy->cookie_maxlife = defproxy.cookie_maxlife; if (defproxy.rdp_cookie_name) curproxy->rdp_cookie_name = strdup(defproxy.rdp_cookie_name); curproxy->rdp_cookie_len = defproxy.rdp_cookie_len; if (defproxy.url_param_name) curproxy->url_param_name = strdup(defproxy.url_param_name); curproxy->url_param_len = defproxy.url_param_len; if (defproxy.hh_name) curproxy->hh_name = strdup(defproxy.hh_name); curproxy->hh_len = defproxy.hh_len; curproxy->hh_match_domain = defproxy.hh_match_domain; if (defproxy.iface_name) curproxy->iface_name = strdup(defproxy.iface_name); curproxy->iface_len = defproxy.iface_len; } if (curproxy->cap & PR_CAP_FE) { if (defproxy.capture_name) curproxy->capture_name = strdup(defproxy.capture_name); curproxy->capture_namelen = defproxy.capture_namelen; curproxy->capture_len = defproxy.capture_len; } if (curproxy->cap & PR_CAP_FE) { curproxy->timeout.client = defproxy.timeout.client; curproxy->timeout.tarpit = defproxy.timeout.tarpit; curproxy->timeout.httpreq = defproxy.timeout.httpreq; curproxy->timeout.httpka = defproxy.timeout.httpka; curproxy->uri_auth = defproxy.uri_auth; curproxy->mon_net = defproxy.mon_net; curproxy->mon_mask = defproxy.mon_mask; if (defproxy.monitor_uri) curproxy->monitor_uri = strdup(defproxy.monitor_uri); curproxy->monitor_uri_len = defproxy.monitor_uri_len; if (defproxy.defbe.name) curproxy->defbe.name = strdup(defproxy.defbe.name); } if (curproxy->cap & PR_CAP_BE) { curproxy->timeout.connect = defproxy.timeout.connect; curproxy->timeout.server = defproxy.timeout.server; curproxy->timeout.check = defproxy.timeout.check; curproxy->timeout.queue = defproxy.timeout.queue; curproxy->timeout.tarpit = defproxy.timeout.tarpit; curproxy->timeout.httpreq = defproxy.timeout.httpreq; curproxy->timeout.httpka = defproxy.timeout.httpka; curproxy->source_addr = defproxy.source_addr; } curproxy->mode = defproxy.mode; curproxy->logfac1 = defproxy.logfac1; curproxy->logsrv1 = defproxy.logsrv1; curproxy->loglev1 = defproxy.loglev1; curproxy->minlvl1 = defproxy.minlvl1; curproxy->logfac2 = defproxy.logfac2; curproxy->logsrv2 = defproxy.logsrv2; curproxy->loglev2 = defproxy.loglev2; curproxy->minlvl2 = defproxy.minlvl2; curproxy->grace = defproxy.grace; curproxy->conf.used_listener_id = EB_ROOT; curproxy->conf.used_server_id = EB_ROOT; goto out; } else if (!strcmp(args[0], "defaults")) { /* use this one to assign default values,解析default段 */ /* some variables may have already been initialized earlier */ /* FIXME-20070101: we should do this too at the end of the * config parsing to free all default values. */ free(defproxy.check_req); free(defproxy.cookie_name); free(defproxy.rdp_cookie_name); free(defproxy.cookie_domain); free(defproxy.url_param_name); free(defproxy.hh_name); free(defproxy.capture_name); free(defproxy.monitor_uri); free(defproxy.defbe.name); free(defproxy.iface_name); free(defproxy.fwdfor_hdr_name); defproxy.fwdfor_hdr_len = 0; free(defproxy.orgto_hdr_name); defproxy.orgto_hdr_len = 0; free(defproxy.server_id_hdr_name); defproxy.server_id_hdr_len = 0; free(defproxy.expect_str); if (defproxy.expect_regex) regfree(defproxy.expect_regex); for (rc = 0; rc < HTTP_ERR_SIZE; rc++) chunk_destroy(&defproxy.errmsg[rc]); /* we cannot free uri_auth because it might already be used */ init_default_instance(); curproxy = &defproxy; defproxy.cap = PR_CAP_LISTEN; /* all caps for now */ goto out; } else if (curproxy == NULL) { Alert("parsing [%s:%d] : 'listen' or 'defaults' expected.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto out; } /* Now let's parse the proxy-specific keywords */ if (!strcmp(args[0], "bind")) { /* new listen addresses,解析bind字段,主要是对于有侦听的段,如frontend,stat段 */ struct listener *new_listen, *last_listen; int cur_arg; …… /* 跟上面解析一样 */ last_listen = curproxy->listen; if (!str2listener(args[1], curproxy)) { err_code |= ERR_ALERT | ERR_FATAL; goto out; } new_listen = curproxy->listen; while (new_listen != last_listen) { new_listen->conf.file = file; new_listen->conf.line = linenum; new_listen = new_listen->next; global.maxsock++; } cur_arg = 2; /* 第三个及其参数可以配置"interface,mss,defer-accept,transparent,name,id"句柄属性,其中defer-accept属性是指等有数据了再唤醒侦听句柄进行 * accept,transparent是haproxy进行透明代理 */ while (*(args[cur_arg])) { if (!strcmp(args[cur_arg], "interface")) { /* specifically bind to this interface */ #ifdef SO_BINDTODEVICE struct listener *l; if (!*args[cur_arg + 1]) { Alert("parsing [%s:%d] : '%s' : missing interface name.\n", file, linenum, args[0]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } for (l = curproxy->listen; l != last_listen; l = l->next) l->interface = strdup(args[cur_arg + 1]); global.last_checks |= LSTCHK_NETADM; cur_arg += 2; continue; #else Alert("parsing [%s:%d] : '%s' : '%s' option not implemented.\n", file, linenum, args[0], args[cur_arg]); err_code |= ERR_ALERT | ERR_FATAL; goto out; #endif } if (!strcmp(args[cur_arg], "mss")) { /* set MSS of listening socket */ #ifdef TCP_MAXSEG struct listener *l; int mss; if (!*args[cur_arg + 1]) { Alert("parsing [%s:%d] : '%s' : missing MSS value.\n", file, linenum, args[0]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } mss = str2uic(args[cur_arg + 1]); if (mss < 1 || mss > 65535) { Alert("parsing [%s:%d]: %s expects an MSS value between 1 and 65535.\n", file, linenum, args[0]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } for (l = curproxy->listen; l != last_listen; l = l->next) l->maxseg = mss; cur_arg += 2; continue; #else Alert("parsing [%s:%d] : '%s' : '%s' option not implemented.\n", file, linenum, args[0], args[cur_arg]); err_code |= ERR_ALERT | ERR_FATAL; goto out; #endif } if (!strcmp(args[cur_arg], "defer-accept")) { /* wait for some data for 1 second max before doing accept */ #ifdef TCP_DEFER_ACCEPT struct listener *l; for (l = curproxy->listen; l != last_listen; l = l->next) l->options |= LI_O_DEF_ACCEPT; cur_arg ++; continue; #else Alert("parsing [%s:%d] : '%s' : '%s' option not implemented.\n", file, linenum, args[0], args[cur_arg]); err_code |= ERR_ALERT | ERR_FATAL; goto out; #endif } if (!strcmp(args[cur_arg], "transparent")) { /* transparently bind to these addresses */ #ifdef CONFIG_HAP_LINUX_TPROXY struct listener *l; for (l = curproxy->listen; l != last_listen; l = l->next) l->options |= LI_O_FOREIGN; cur_arg ++; continue; #else Alert("parsing [%s:%d] : '%s' : '%s' option not implemented.\n", file, linenum, args[0], args[cur_arg]); err_code |= ERR_ALERT | ERR_FATAL; goto out; #endif } if (!strcmp(args[cur_arg], "name")) { struct listener *l; for (l = curproxy->listen; l != last_listen; l = l->next) l->name = strdup(args[cur_arg + 1]); cur_arg += 2; continue; } if (!strcmp(args[cur_arg], "id")) { struct eb32_node *node; struct listener *l; if (curproxy->listen->next != last_listen) { Alert("parsing [%s:%d]: '%s' can be only used with a single socket.\n", file, linenum, args[cur_arg]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } if (!*args[cur_arg + 1]) { Alert("parsing [%s:%d]: '%s' expects an integer argument.\n", file, linenum, args[cur_arg]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } curproxy->listen->luid = atol(args[cur_arg + 1]); curproxy->listen->conf.id.key = curproxy->listen->luid; if (curproxy->listen->luid <= 0) { Alert("parsing [%s:%d]: custom id has to be > 0\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto out; } node = eb32_lookup(&curproxy->conf.used_listener_id, curproxy->listen->luid); if (node) { l = container_of(node, struct listener, conf.id); Alert("parsing [%s:%d]: custom id %d for socket '%s' already used at %s:%d.\n", file, linenum, l->luid, args[1], l->conf.file, l->conf.line); err_code |= ERR_ALERT | ERR_FATAL; goto out; } eb32_insert(&curproxy->conf.used_listener_id, &curproxy->listen->conf.id); cur_arg += 2; continue; } Alert("parsing [%s:%d] : '%s' only supports the 'transparent', 'defer-accept', 'name', 'id', 'mss' and 'interface' options.\n", file, linenum, args[0]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } goto out; } else if (!strcmp(args[0], "monitor-net")) { /* set the range of IPs to ignore */ if (!*args[1] || !str2net(args[1], &curproxy->mon_net, &curproxy->mon_mask)) { Alert("parsing [%s:%d] : '%s' expects address[/mask].\n", file, linenum, args[0]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } if (warnifnotcap(curproxy, PR_CAP_FE, file, linenum, args[0], NULL)) err_code |= ERR_WARN; /* flush useless bits */ curproxy->mon_net.s_addr &= curproxy->mon_mask.s_addr; goto out; } else if (!strcmp(args[0], "monitor-uri")) { /* set the URI to intercept */ if (warnifnotcap(curproxy, PR_CAP_FE, file, linenum, args[0], NULL)) err_code |= ERR_WARN; if (!*args[1]) { Alert("parsing [%s:%d] : '%s' expects an URI.\n", file, linenum, args[0]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } free(curproxy->monitor_uri); curproxy->monitor_uri_len = strlen(args[1]); curproxy->monitor_uri = (char *)calloc(1, curproxy->monitor_uri_len + 1); memcpy(curproxy->monitor_uri, args[1], curproxy->monitor_uri_len); curproxy->monitor_uri[curproxy->monitor_uri_len] = '\0'; goto out; } else if (!strcmp(args[0], "mode")) { /* sets the proxy mode */ if (!strcmp(args[1], "http")) curproxy->mode = PR_MODE_HTTP; else if (!strcmp(args[1], "tcp")) curproxy->mode = PR_MODE_TCP; else if (!strcmp(args[1], "health")) curproxy->mode = PR_MODE_HEALTH; else { Alert("parsing [%s:%d] : unknown proxy mode '%s'.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } } else if (!strcmp(args[0], "id")) { /* 设置haproxy的user id */ struct eb32_node *node; if (curproxy == &defproxy) { Alert("parsing [%s:%d]: '%s' not allowed in 'defaults' section.\n", file, linenum, args[0]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } if (!*args[1]) { Alert("parsing [%s:%d]: '%s' expects an integer argument.\n", file, linenum, args[0]); err_code |= ERR_ALERT | ERR_FATAL; goto out; } curproxy->uuid = atol(args[1]); curproxy->conf.id.key = curproxy->uuid; if (curproxy->uuid <= 0) { Alert("parsing [%s:%d]: custom id has to be > 0.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto out; } node = eb32_lookup(&used_proxy_id, curproxy->uuid); if (node) { struct proxy *target = container_of(node, struct proxy, conf.id); Alert("parsing [%s:%d]: %s %s reuses same custom id as %s %s (declared at %s:%d).\n", file, linenum, proxy_type_str(curproxy), curproxy->id, proxy_type_str(target), target->id, target->conf.file, target->conf.line); err_code |= ERR_ALERT | ERR_FATAL; goto out; } eb32_insert(&used_proxy_id, &curproxy->conf.id); } …… }
</pre><p></p><p>上面主要分析了解析配置文件中关于listen相关的代码,现在已经有一些proxy,listen相关信息了,但是还有一些信息没初始化,这些信息有的在check_config_validity函数中进行赋值,下面分析一下check_config_validity函数:</p><p><pre name="code" class="cpp">int check_config_validity() { …… listener = NULL;
/* 反转listen链表 */ while (curproxy->listen) { struct listener *next; next = curproxy->listen->next; curproxy->listen->next = listener; listener = curproxy->listen; if (!next) break; curproxy->listen = next; } /* adjust this proxy's listeners */ next_id = 1; listener = curproxy->listen; while (listener) { if (!listener->luid) { /* listener ID not set, use automatic numbering with first * spare entry starting with next_luid. */ next_id = get_next_id(&curproxy->conf.used_listener_id, next_id); listener->conf.id.key = listener->luid = next_id; eb32_insert(&curproxy->conf.used_listener_id, &listener->conf.id); } next_id++; /* enable separate counters */ if (curproxy->options2 & PR_O2_SOCKSTAT) { listener->counters = (struct licounters *)calloc(1, sizeof(struct licounters)); if (!listener->name) { sprintf(trash, "sock-%d", listener->luid); listener->name = strdup(trash); } } if (curproxy->options & PR_O_TCP_NOLING) listener->options |= LI_O_NOLINGER; listener->maxconn = curproxy->maxconn; listener->backlog = curproxy->backlog; listener->timeout = &curproxy->timeout.client; listener->accept = event_accept; /* proxy的的accept回调函数是event_accept,这只要对应listen句柄有新连接进来就会调用这函数 */ listener->private = curproxy; listener->handler = process_session; /* 数据处理回调函数 */ listener->analysers |= curproxy->fe_req_ana; /* smart accept mode is automatic in HTTP mode */ if ((curproxy->options2 & PR_O2_SMARTACC) || (curproxy->mode == PR_MODE_HTTP && !(curproxy->no_options2 & PR_O2_SMARTACC))) listener->options |= LI_O_NOQUICKACK; /* We want the use_backend and default_backend rules to apply */ listener = listener->next; }
}
相关的回调函数及一些最大连接、超时时间,listen的backlog队列设置完成后,现在基本就万事俱备了,下面可以开始listen,然后accept连接了,listen主要是在start_proxies函数中进行,该函数被main函数调用:
/* * This function creates all proxy sockets. It should be done very early, * typically before privileges are dropped. The sockets will be registered * but not added to any fd_set, in order not to loose them across the fork(). * The proxies also start in IDLE state, meaning that it will be * maintain_proxies that will finally complete their loading. * * Its return value is composed from ERR_NONE, ERR_RETRYABLE and ERR_FATAL. * Retryable errors will only be printed if <verbose> is not zero. */ int start_proxies(int verbose) { struct proxy *curproxy; struct listener *listener; int lerr, err = ERR_NONE; int pxerr; char msg[100]; for (curproxy = proxy; curproxy != NULL; curproxy = curproxy->next) { if (curproxy->state != PR_STNEW) continue; /* already initialized */ pxerr = 0; for (listener = curproxy->listen; listener != NULL; listener = listener->next) { if (listener->state != LI_ASSIGNED) continue; /* already started */ lerr = tcp_bind_listener(listener, msg, sizeof(msg)); /* 这边进行创建句柄,进行listen,把句柄加入IO事件驱动机制中去,这样有新连接进来就会调用event_accept函数接收连接了 */ /* errors are reported if <verbose> is set or if they are fatal */ if (verbose || (lerr & (ERR_FATAL | ERR_ABORT))) { if (lerr & ERR_ALERT) Alert("Starting %s %s: %s\n", proxy_type_str(curproxy), curproxy->id, msg); else if (lerr & ERR_WARN) Warning("Starting %s %s: %s\n", proxy_type_str(curproxy), curproxy->id, msg); } err |= lerr; if (lerr & (ERR_ABORT | ERR_FATAL)) { pxerr |= 1; break; } else if (lerr & ERR_CODE) { pxerr |= 1; continue; } } if (!pxerr) { curproxy->state = PR_STIDLE; send_log(curproxy, LOG_NOTICE, "Proxy %s started.\n", curproxy->id); } if (err & ERR_ABORT) break; } return err; }
此时,proxy已开始进行listen了,一个客户端发起了一个http请求,先进行tcp的三次握手,三次握手完成后,内核会唤醒haproxy进程,注意对于支持也配置了defer-accept的,只有接收到数据后才会唤醒haproxy进程。haproxy的事件驱动机制的事件处理也是在main函数中进行,具体函数为main函数调用run_poll_loop函数,run_poll_loop调用cur_poller.poll(&cur_poller, next);函数,此时有新客户端去连接服务器,haproxy接收到新连接,唤醒对应的listen句柄,listen句柄会收到读事件,而listen读事件的回调函数是event_accept,IO事件驱动调用event_accept函数,下面分析下event_accept函数到底了什么工作:
int event_accept(int fd) { struct listener *l = fdtab[fd].owner; struct proxy *p = (struct proxy *)l->private; /* attached frontend */ struct session *s; struct http_txn *txn; struct task *t; int cfd; int max_accept = global.tune.maxaccept; if (p->fe_sps_lim) { int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0); if (max_accept > max) max_accept = max; } while (p->feconn < p->maxconn && actconn < global.maxconn && max_accept--) { /* 判断是否可以接收连接,主要是判断该proxy对应frontend创建的连接有没有超过该代理的最大连接,全局活跃的连接有没有超过全局总是及一次最多可以accept的连接 */ struct sockaddr_storage addr; socklen_t laddr = sizeof(addr); if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) { /* 接收连接 */ ……} if (l->nbconn >= l->maxconn) { /* too many connections, we shoot this one and return. * FIXME: it would be better to simply switch the listener's * state to LI_FULL and disable the FD. We could re-enable * it upon fd_delete(), but this requires all protocols to * be switched. */ goto out_close; } if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while,创建会话 */ Alert("out of memory in event_accept().\n"); disable_listener(l); p->state = PR_STIDLE; goto out_close; } LIST_INIT(&s->back_refs); s->flags = 0; s->term_trace = 0; /* if this session comes from a known monitoring system, we want to ignore * it as soon as possible, which means closing it immediately for TCP. */ if (addr.ss_family == AF_INET && p->mon_mask.s_addr && (((struct sockaddr_in *)&addr)->sin_addr.s_addr & p->mon_mask.s_addr) == p->mon_net.s_addr) { if (p->mode == PR_MODE_TCP) { close(cfd); pool_free2(pool2_session, s); continue; } s->flags |= SN_MONITOR; } LIST_ADDQ(&sessions, &s->list); if ((t = task_new()) == NULL) { /* disable this proxy for a while,创建一个task,该task主要用于处理后续连接后端服务器、解析、数据等操作 */ Alert("out of memory in event_accept().\n"); disable_listener(l); p->state = PR_STIDLE; goto out_free_session; } s->cli_addr = addr; if (cfd >= global.maxsock) { Alert("accept(): not enough free sockets. Raise -n argument. Giving up.\n"); goto out_free_task; } /* 设置一下句柄属性,包括非阻塞、保活、接收/发送缓冲区等 */ if ((fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) || (setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1)) { Alert("accept(): cannot set the socket in non blocking mode. Giving up\n"); goto out_free_task; } if (p->options & PR_O_TCP_CLI_KA) setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)); if (p->options & PR_O_TCP_NOLING) setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger)); if (global.tune.client_sndbuf) setsockopt(cfd, SOL_SOCKET, SO_SNDBUF, &global.tune.client_sndbuf, sizeof(global.tune.client_sndbuf)); if (global.tune.client_rcvbuf) setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &global.tune.client_rcvbuf, sizeof(global.tune.client_rcvbuf)); t->process = l->handler; /* 此task的回调函数也是process_session */ t->context = s; t->nice = l->nice; s->task = t; s->listener = l; /* Note: initially, the session's backend points to the frontend. * This changes later when switching rules are executed or * when the default backend is assigned. */ s->be = s->fe = p; s->req = s->rep = NULL; /* will be allocated later */ /* 设置stream interface信息,0对于的是frontend端,1对应的是backend端,对于update、chk_rcv、chk_snd函数指针会在process_session等函数中调用,收发事件设置等 */ s->si[0].state = s->si[0].prev_state = SI_ST_EST; s->si[0].err_type = SI_ET_NONE; s->si[0].err_loc = NULL; s->si[0].owner = t; s->si[0].update = stream_sock_data_finish; s->si[0].shutr = stream_sock_shutr; s->si[0].shutw = stream_sock_shutw; s->si[0].chk_rcv = stream_sock_chk_rcv; s->si[0].chk_snd = stream_sock_chk_snd; s->si[0].connect = NULL; s->si[0].iohandler = NULL; s->si[0].fd = cfd; s->si[0].flags = SI_FL_NONE | SI_FL_CAP_SPLTCP; /* TCP splicing capable */ if (s->fe->options2 & PR_O2_INDEPSTR) s->si[0].flags |= SI_FL_INDEP_STR; s->si[0].exp = TICK_ETERNITY; s->si[1].state = s->si[1].prev_state = SI_ST_INI; s->si[1].err_type = SI_ET_NONE; s->si[1].err_loc = NULL; s->si[1].owner = t; s->si[1].update = stream_sock_data_finish; s->si[1].shutr = stream_sock_shutr; s->si[1].shutw = stream_sock_shutw; s->si[1].chk_rcv = stream_sock_chk_rcv; s->si[1].chk_snd = stream_sock_chk_snd; s->si[1].connect = tcpv4_connect_server; s->si[1].iohandler = NULL; s->si[1].exp = TICK_ETERNITY; s->si[1].fd = -1; /* just to help with debugging */ s->si[1].flags = SI_FL_NONE; if (s->be->options2 & PR_O2_INDEPSTR) s->si[1].flags |= SI_FL_INDEP_STR; s->srv = s->prev_srv = s->srv_conn = NULL; s->pend_pos = NULL; s->conn_retries = s->be->conn_retries; /* init store persistence */ s->store_count = 0; /* FIXME: the logs are horribly complicated now, because they are * defined in <p>, <p>, and later <be> and <be>. */ if (s->flags & SN_MONITOR) s->logs.logwait = 0; else s->logs.logwait = p->to_log; if (s->logs.logwait & LW_REQ) s->do_log = http_sess_log; else s->do_log = tcp_sess_log; /* default error reporting function, may be changed by analysers */ s->srv_error = default_srv_error; s->logs.accept_date = date; /* user-visible date for logging */ s->logs.tv_accept = now; /* corrected date for internal use */ tv_zero(&s->logs.tv_request); s->logs.t_queue = -1; s->logs.t_connect = -1; s->logs.t_data = -1; s->logs.t_close = 0; s->logs.bytes_in = s->logs.bytes_out = 0; s->logs.prx_queue_size = 0; /* we get the number of pending conns before us */ s->logs.srv_queue_size = 0; /* we will get this number soon */ s->data_source = DATA_SRC_NONE; s->uniq_id = totalconn; proxy_inc_fe_ctr(l, p); /* note: cum_beconn will be increased once assigned */ txn = &s->txn; /* Those variables will be checked and freed if non-NULL in * session.c:session_free(). It is important that they are * properly initialized. */ txn->sessid = NULL; txn->srv_cookie = NULL; txn->cli_cookie = NULL; txn->uri = NULL; txn->req.cap = NULL; txn->rsp.cap = NULL; txn->hdr_idx.v = NULL; txn->hdr_idx.size = txn->hdr_idx.used = 0; …… if ((s->req = pool_alloc2(pool2_buffer)) == NULL) /* 申请req内存,该req主要是frontend接收数据,backend发给后端服务器的 */ goto out_fail_req; /* no memory */ s->req->size = global.tune.bufsize; buffer_init(s->req); s->req->prod = &s->si[0]; /* req的生产者对于的是对应frontend端的句柄 */ s->req->cons = &s->si[1]; /* req的消费者对应backend端的句柄 */ s->si[0].ib = s->si[1].ob = s->req; s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */ if (p->mode == PR_MODE_HTTP) s->req->flags |= BF_READ_DONTWAIT; /* one read is usually enough */ /* activate default analysers enabled for this listener */<pre name="code" class="cpp"> /* 对于http模式,一般会有AN_REQ_WAIT_HTTP | AN_REQ_HTTP_PROCESS_FE | AN_REQ_SWITCHING_RULES,tcp模式有AN_REQ_SWITCHING_RULES */<span style="font-family: Arial, Helvetica, sans-serif;"> </span>
s->req->analysers = l->analysers; /* note: this should not happen anymore since there's always at least the switching rules */ if (!s->req->analysers) { buffer_auto_connect(s->req); /* don't wait to establish connection */ buffer_auto_close(s->req); /* let the producer forward close requests */ } s->req->rto = s->fe->timeout.client; s->req->wto = s->be->timeout.server; s->req->cto = s->be->timeout.connect; if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) /* 申请request的内存空间,对应于真实服务器发送的数据 */ goto out_fail_rep; /* no memory */ s->rep->size = global.tune.bufsize; buffer_init(s->rep); s->rep->prod = &s->si[1]; s->rep->cons = &s->si[0]; s->si[0].ob = s->si[1].ib = s->rep; s->rep->analysers = 0; if (s->fe->options2 & PR_O2_NODELAY) { s->req->flags |= BF_NEVER_WAIT; s->rep->flags |= BF_NEVER_WAIT; } s->rep->rto = s->be->timeout.server; s->rep->wto = s->fe->timeout.client; s->rep->cto = TICK_ETERNITY; s->req->rex = TICK_ETERNITY; s->req->wex = TICK_ETERNITY; s->req->analyse_exp = TICK_ETERNITY; s->rep->rex = TICK_ETERNITY; s->rep->wex = TICK_ETERNITY; s->rep->analyse_exp = TICK_ETERNITY; t->expire = TICK_ETERNITY; /* 把accept出来的句柄加入IO事件驱动模型中去,此时对应读事情会调用stream_sock_read函数,写事件调用stream_sock_write函数 */ fd_insert(cfd); fdtab[cfd].owner = &s->si[0]; fdtab[cfd].state = FD_STREADY; fdtab[cfd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY; if (p->options & PR_O_TCP_NOLING) fdtab[cfd].flags |= FD_FL_TCP_NOLING; fdtab[cfd].cb[DIR_RD].f = l->proto->read; /* 指向proto_tcpv4,对应的读函数是stream_sock_read */ fdtab[cfd].cb[DIR_RD].b = s->req; fdtab[cfd].cb[DIR_WR].f = l->proto->write; <span style="font-family: Arial, Helvetica, sans-serif;">/* 指向proto_tcpv4,对应的读函数是stream_sock_write */</span> fdtab[cfd].cb[DIR_WR].b = s->rep; fdinfo[cfd].peeraddr = (struct sockaddr *)&s->cli_addr; fdinfo[cfd].peerlen = sizeof(s->cli_addr); if ((p->mode == PR_MODE_HTTP && (s->flags & SN_MONITOR)) || (p->mode == PR_MODE_HEALTH && (p->options & PR_O_HTTP_CHK))) { /* Either we got a request from a monitoring system on an HTTP instance, * or we're in health check mode with the 'httpchk' option enabled. In * both cases, we return a fake "HTTP/1.0 200 OK" response and we exit. */ struct chunk msg; chunk_initstr(&msg, "HTTP/1.0 200 OK\r\n\r\n"); stream_int_retnclose(&s->si[0], &msg); /* forge a 200 response */ s->req->analysers = 0; t->expire = s->rep->wex; } else if (p->mode == PR_MODE_HEALTH) { /* health check mode, no client reading */ struct chunk msg; chunk_initstr(&msg, "OK\n"); stream_int_retnclose(&s->si[0], &msg); /* forge an "OK" response */ s->req->analysers = 0; t->expire = s->rep->wex; } else { EV_FD_SET(cfd, DIR_RD); /* 正常模式只会设置读事件 */ } /* it is important not to call the wakeup function directly but to * pass through task_wakeup(), because this one knows how to apply * priorities to tasks. */ task_wakeup(t, TASK_WOKEN_INIT); l->nbconn++; /* warning! right now, it's up to the handler to decrease this */ if (l->nbconn >= l->maxconn) { EV_FD_CLR(l->fd, DIR_RD); l->state = LI_FULL; } p->feconn++; /* beconn will be increased later */ if (p->feconn > p->counters.feconn_max) p->counters.feconn_max = p->feconn; if (l->counters) { if (l->nbconn > l->counters->conn_max) l->counters->conn_max = l->nbconn; } actconn++; totalconn++; // fprintf(stderr, "accepting from %p => %d conn, %d total, task=%p\n", p, actconn, totalconn, t); } /* end of while (p->feconn < p->maxconn) */ return 0; …… }
event_accept主要是接收新的了解,创建一个task和session,初始化task、session操作,另外还会把accept出来的句柄加入IO事件模型中去,到时候有数据发送过来就会调用回调函数stream_sock_read函数来接收数据,至此,haproxy已经可以接收客户端发送过来的请求了,那么haproxy什么时候connect真实服务器呢?怎么把客户端发过来的数据发给真实服务器呢?这边先讲下大概流程:
event_accept中创建的task会被执行,此时会调用process_session函数,在process_session函数中会先处理数据,如果对于http模式,req未接收完整或者还未接收到数据,此时会调用buffer_dont_connect函数,此时cons的state为SI_ST_INT,继续执行process_session下面的代码,此时不会进行分配后端服务器,不会connect后端服务器,但对于处理完req或者是tcp模式,会判断cons->state的状态是否为SI_ST_INT状态,继续往下判断,此时会s->req->cons->state
= SI_ST_REQ,继续往下执行就会执行到sess_prepare_conn_req函数,会根据配置的负载均衡算法分配后端服务器,设置状态为SI_ST_ASS状态,继续while讯,执行sess_update_stream_int函数,connect后端服务器……
这边就不继续往下分析了,后续再进行详细分析,未完待续。