现在的位置: 首页 > 综合 > 正文

Haproxy frontend listen分析

2017年09月09日 ⁄ 综合 ⁄ 共 26307字 ⁄ 字号 评论关闭
<span style="font-family: Arial, Helvetica, sans-serif;">Haproxy的frontend到底是怎么进行listen,怎么accept客户端发来的请求呢?这边就按照haproxy 1.4.19进行分析一下。要知道frontend进行了哪些端口的侦听及其相关配置,这就需要解析haproxy的配置文件,这解析配置文件就不进行详细分析了,现在分析一下cfg_parse_listen函数</span>
<pre name="code" class="plain">int cfg_parse_listen(const char *file, int linenum, char **args, int kwm)
{
	static struct proxy *curproxy = NULL;
	struct server *newsrv = NULL;
	const char *err;
	int rc;
	unsigned val;
	int err_code = 0;
	struct acl_cond *cond = NULL;

	if (!strcmp(args[0], "listen"))
		rc = PR_CAP_LISTEN;
 	else if (!strcmp(args[0], "frontend"))
		rc = PR_CAP_FE | PR_CAP_RS;
 	else if (!strcmp(args[0], "backend"))
		rc = PR_CAP_BE | PR_CAP_RS;
 	else if (!strcmp(args[0], "ruleset"))
		rc = PR_CAP_RS;
	else
		rc = PR_CAP_NONE;

	if (rc != PR_CAP_NONE) {  /* new proxy */
                /* listen、frontend、backend、ruleset同一行要有其它参数,至少还需要跟一个参数 ,该参数不能全部是数字,不能有". : _ -"*/
                ……
		if ((curproxy = (struct proxy *)calloc(1, sizeof(struct proxy))) == NULL) {
			Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
			err_code |= ERR_ALERT | ERR_ABORT;
			goto out;
		}
                /* 插在链表最前面,proxy全局指针指向链表的第一个元素 */
		init_new_proxy(curproxy);
		curproxy->next = proxy;
		proxy = curproxy;
		curproxy->conf.file = file;
		curproxy->conf.line = linenum;
		curproxy->last_change = now.tv_sec;
		curproxy->id = strdup(args[1]);
		curproxy->cap = rc;

		/* parse the listener address if any,对于frontend可以有第三个参数,主要是侦听地址和端口,可以是一组ip地址,ip地址用","隔开,侦听端口也可以
                 * 是一个范围,用"-"来表示范围 ,具体为frontend test *:123-245,xx.xx.xx.xx:598或者frontend test *,xx.xx.xx.xx:598等
                 */
		if ((curproxy->cap & PR_CAP_FE) && *args[2]) {
			struct listener *new, *last = curproxy->listen;
                        /* 对frontend要解析listen地址和端口,端口可以是一个范围,具体为IP地址(可以是*,ipv4,ipv6地址):port1-port2
                         * 具体解析ip地址,ip地址分为v4和v6,v4格式为xx.xx.xx.xx,v6为XXXX:XXXX:XXXX:XXXX,对于v4地址还会调用gethostbyname判断地址的非                         
                         * 法性,也会根据端口个数listener,此时proxy->listen指向创建的listen链表第一个元素
                         */
			if (!str2listener(args[2], curproxy)) {
				err_code |= ERR_FATAL;
				goto out;
			}
			new = curproxy->listen;
			while (new != last) {
				new->conf.file = file;
				new->conf.line = linenum;
				new = new->next;
				global.maxsock++;
			}
		}

		/* set default values,对新申请的proxy初始化default段相关信息 */
		memcpy(&curproxy->defsrv, &defproxy.defsrv, sizeof(curproxy->defsrv));
		curproxy->defsrv.id = "default-server";

		curproxy->state = defproxy.state;
		curproxy->options = defproxy.options;
		curproxy->options2 = defproxy.options2;
		curproxy->no_options = defproxy.no_options;
		curproxy->no_options2 = defproxy.no_options2;
		curproxy->bind_proc = defproxy.bind_proc;
		curproxy->lbprm.algo = defproxy.lbprm.algo;
		curproxy->except_net = defproxy.except_net;
		curproxy->except_mask = defproxy.except_mask;
		curproxy->except_to = defproxy.except_to;
		curproxy->except_mask_to = defproxy.except_mask_to;

		if (defproxy.fwdfor_hdr_len) {
			curproxy->fwdfor_hdr_len  = defproxy.fwdfor_hdr_len;
			curproxy->fwdfor_hdr_name = strdup(defproxy.fwdfor_hdr_name);
		}

		if (defproxy.orgto_hdr_len) {
			curproxy->orgto_hdr_len  = defproxy.orgto_hdr_len;
			curproxy->orgto_hdr_name = strdup(defproxy.orgto_hdr_name);
		}

		if (defproxy.server_id_hdr_len) {
			curproxy->server_id_hdr_len  = defproxy.server_id_hdr_len;
			curproxy->server_id_hdr_name = strdup(defproxy.server_id_hdr_name);
		}

		if (curproxy->cap & PR_CAP_FE) {
			curproxy->maxconn = defproxy.maxconn;
			curproxy->backlog = defproxy.backlog;
			curproxy->fe_sps_lim = defproxy.fe_sps_lim;

			/* initialize error relocations */
			for (rc = 0; rc < HTTP_ERR_SIZE; rc++)
				chunk_dup(&curproxy->errmsg[rc], &defproxy.errmsg[rc]);

			curproxy->to_log = defproxy.to_log & ~LW_COOKIE & ~LW_REQHDR & ~ LW_RSPHDR;
		}

		if (curproxy->cap & PR_CAP_BE) {
			curproxy->fullconn = defproxy.fullconn;
			curproxy->conn_retries = defproxy.conn_retries;

			if (defproxy.check_req) {
				curproxy->check_req = calloc(1, defproxy.check_len);
				memcpy(curproxy->check_req, defproxy.check_req, defproxy.check_len);
			}
			curproxy->check_len = defproxy.check_len;

			if (defproxy.expect_str) {
				curproxy->expect_str = strdup(defproxy.expect_str);
				if (defproxy.expect_regex) {
					/* note: this regex is known to be valid */
					curproxy->expect_regex = calloc(1, sizeof(regex_t));
					regcomp(curproxy->expect_regex, defproxy.expect_str, REG_EXTENDED);
				}
			}

			if (defproxy.cookie_name)
				curproxy->cookie_name = strdup(defproxy.cookie_name);
			curproxy->cookie_len = defproxy.cookie_len;
			if (defproxy.cookie_domain)
				curproxy->cookie_domain = strdup(defproxy.cookie_domain);

			if (defproxy.cookie_maxidle)
				curproxy->cookie_maxidle = defproxy.cookie_maxidle;

			if (defproxy.cookie_maxlife)
				curproxy->cookie_maxlife = defproxy.cookie_maxlife;

			if (defproxy.rdp_cookie_name)
				 curproxy->rdp_cookie_name = strdup(defproxy.rdp_cookie_name);
			curproxy->rdp_cookie_len = defproxy.rdp_cookie_len;

			if (defproxy.url_param_name)
				curproxy->url_param_name = strdup(defproxy.url_param_name);
			curproxy->url_param_len = defproxy.url_param_len;

			if (defproxy.hh_name)
				curproxy->hh_name = strdup(defproxy.hh_name);
			curproxy->hh_len  = defproxy.hh_len;
			curproxy->hh_match_domain  = defproxy.hh_match_domain;

			if (defproxy.iface_name)
				curproxy->iface_name = strdup(defproxy.iface_name);
			curproxy->iface_len  = defproxy.iface_len;
		}

		if (curproxy->cap & PR_CAP_FE) {
			if (defproxy.capture_name)
				curproxy->capture_name = strdup(defproxy.capture_name);
			curproxy->capture_namelen = defproxy.capture_namelen;
			curproxy->capture_len = defproxy.capture_len;
		}

		if (curproxy->cap & PR_CAP_FE) {
			curproxy->timeout.client = defproxy.timeout.client;
			curproxy->timeout.tarpit = defproxy.timeout.tarpit;
			curproxy->timeout.httpreq = defproxy.timeout.httpreq;
			curproxy->timeout.httpka = defproxy.timeout.httpka;
			curproxy->uri_auth  = defproxy.uri_auth;
			curproxy->mon_net = defproxy.mon_net;
			curproxy->mon_mask = defproxy.mon_mask;
			if (defproxy.monitor_uri)
				curproxy->monitor_uri = strdup(defproxy.monitor_uri);
			curproxy->monitor_uri_len = defproxy.monitor_uri_len;
			if (defproxy.defbe.name)
				curproxy->defbe.name = strdup(defproxy.defbe.name);
		}

		if (curproxy->cap & PR_CAP_BE) {
			curproxy->timeout.connect = defproxy.timeout.connect;
			curproxy->timeout.server = defproxy.timeout.server;
			curproxy->timeout.check = defproxy.timeout.check;
			curproxy->timeout.queue = defproxy.timeout.queue;
			curproxy->timeout.tarpit = defproxy.timeout.tarpit;
			curproxy->timeout.httpreq = defproxy.timeout.httpreq;
			curproxy->timeout.httpka = defproxy.timeout.httpka;
			curproxy->source_addr = defproxy.source_addr;
		}

		curproxy->mode = defproxy.mode;
		curproxy->logfac1 = defproxy.logfac1;
		curproxy->logsrv1 = defproxy.logsrv1;
		curproxy->loglev1 = defproxy.loglev1;
		curproxy->minlvl1 = defproxy.minlvl1;
		curproxy->logfac2 = defproxy.logfac2;
		curproxy->logsrv2 = defproxy.logsrv2;
		curproxy->loglev2 = defproxy.loglev2;
		curproxy->minlvl2 = defproxy.minlvl2;
		curproxy->grace  = defproxy.grace;
		curproxy->conf.used_listener_id = EB_ROOT;
		curproxy->conf.used_server_id = EB_ROOT;

		goto out;
	}
	else if (!strcmp(args[0], "defaults")) {  /* use this one to assign default values,解析default段 */
		/* some variables may have already been initialized earlier */
		/* FIXME-20070101: we should do this too at the end of the
		 * config parsing to free all default values.
		 */
		free(defproxy.check_req);
		free(defproxy.cookie_name);
		free(defproxy.rdp_cookie_name);
		free(defproxy.cookie_domain);
		free(defproxy.url_param_name);
		free(defproxy.hh_name);
		free(defproxy.capture_name);
		free(defproxy.monitor_uri);
		free(defproxy.defbe.name);
		free(defproxy.iface_name);
		free(defproxy.fwdfor_hdr_name);
		defproxy.fwdfor_hdr_len = 0;
		free(defproxy.orgto_hdr_name);
		defproxy.orgto_hdr_len = 0;
		free(defproxy.server_id_hdr_name);
		defproxy.server_id_hdr_len = 0;
		free(defproxy.expect_str);
		if (defproxy.expect_regex) regfree(defproxy.expect_regex);

		for (rc = 0; rc < HTTP_ERR_SIZE; rc++)
			chunk_destroy(&defproxy.errmsg[rc]);

		/* we cannot free uri_auth because it might already be used */
		init_default_instance();
		curproxy = &defproxy;
		defproxy.cap = PR_CAP_LISTEN; /* all caps for now */
		goto out;
	}
	else if (curproxy == NULL) {
		Alert("parsing [%s:%d] : 'listen' or 'defaults' expected.\n", file, linenum);
		err_code |= ERR_ALERT | ERR_FATAL;
		goto out;
	}
    

	/* Now let's parse the proxy-specific keywords */
	if (!strcmp(args[0], "bind")) {  /* new listen addresses,解析bind字段,主要是对于有侦听的段,如frontend,stat段 */
		struct listener *new_listen, *last_listen;
		int cur_arg;
                ……

                /* 跟上面解析一样 */
		last_listen = curproxy->listen;
		if (!str2listener(args[1], curproxy)) {
			err_code |= ERR_ALERT | ERR_FATAL;
			goto out;
		}

		new_listen = curproxy->listen;
		while (new_listen != last_listen) {
			new_listen->conf.file = file;
			new_listen->conf.line = linenum;
			new_listen = new_listen->next;
			global.maxsock++;
		}

		cur_arg = 2;
                /* 第三个及其参数可以配置"interface,mss,defer-accept,transparent,name,id"句柄属性,其中defer-accept属性是指等有数据了再唤醒侦听句柄进行
                 * accept,transparent是haproxy进行透明代理 
                 */
		while (*(args[cur_arg])) {
			if (!strcmp(args[cur_arg], "interface")) { /* specifically bind to this interface */
#ifdef SO_BINDTODEVICE
				struct listener *l;

				if (!*args[cur_arg + 1]) {
					Alert("parsing [%s:%d] : '%s' : missing interface name.\n",
					      file, linenum, args[0]);
					err_code |= ERR_ALERT | ERR_FATAL;
					goto out;
				}
				
				for (l = curproxy->listen; l != last_listen; l = l->next)
					l->interface = strdup(args[cur_arg + 1]);

				global.last_checks |= LSTCHK_NETADM;

				cur_arg += 2;
				continue;
#else
				Alert("parsing [%s:%d] : '%s' : '%s' option not implemented.\n",
				      file, linenum, args[0], args[cur_arg]);
				err_code |= ERR_ALERT | ERR_FATAL;
				goto out;
#endif
			}
			if (!strcmp(args[cur_arg], "mss")) { /* set MSS of listening socket */
#ifdef TCP_MAXSEG
				struct listener *l;
				int mss;

				if (!*args[cur_arg + 1]) {
					Alert("parsing [%s:%d] : '%s' : missing MSS value.\n",
					      file, linenum, args[0]);
					err_code |= ERR_ALERT | ERR_FATAL;
					goto out;
				}

				mss = str2uic(args[cur_arg + 1]);
				if (mss < 1 || mss > 65535) {
					Alert("parsing [%s:%d]: %s expects an MSS value between 1 and 65535.\n",
					      file, linenum, args[0]);
					err_code |= ERR_ALERT | ERR_FATAL;
					goto out;
				}

				for (l = curproxy->listen; l != last_listen; l = l->next)
					l->maxseg = mss;

				cur_arg += 2;
				continue;
#else
				Alert("parsing [%s:%d] : '%s' : '%s' option not implemented.\n",
				      file, linenum, args[0], args[cur_arg]);
				err_code |= ERR_ALERT | ERR_FATAL;
				goto out;
#endif
			}

			if (!strcmp(args[cur_arg], "defer-accept")) { /* wait for some data for 1 second max before doing accept */
#ifdef TCP_DEFER_ACCEPT
				struct listener *l;

				for (l = curproxy->listen; l != last_listen; l = l->next)
					l->options |= LI_O_DEF_ACCEPT;

				cur_arg ++;
				continue;
#else
				Alert("parsing [%s:%d] : '%s' : '%s' option not implemented.\n",
				      file, linenum, args[0], args[cur_arg]);
				err_code |= ERR_ALERT | ERR_FATAL;
				goto out;
#endif
			}

			if (!strcmp(args[cur_arg], "transparent")) { /* transparently bind to these addresses */
#ifdef CONFIG_HAP_LINUX_TPROXY
				struct listener *l;

				for (l = curproxy->listen; l != last_listen; l = l->next)
					l->options |= LI_O_FOREIGN;

				cur_arg ++;
				continue;
#else
				Alert("parsing [%s:%d] : '%s' : '%s' option not implemented.\n",
				      file, linenum, args[0], args[cur_arg]);
				err_code |= ERR_ALERT | ERR_FATAL;
				goto out;
#endif
			}

			if (!strcmp(args[cur_arg], "name")) {
				struct listener *l;

				for (l = curproxy->listen; l != last_listen; l = l->next)
					l->name = strdup(args[cur_arg + 1]);

				cur_arg += 2;
				continue;
			}

			if (!strcmp(args[cur_arg], "id")) {
				struct eb32_node *node;
				struct listener *l;

				if (curproxy->listen->next != last_listen) {
					Alert("parsing [%s:%d]: '%s' can be only used with a single socket.\n",
						file, linenum, args[cur_arg]);
					err_code |= ERR_ALERT | ERR_FATAL;
					goto out;
				}

				if (!*args[cur_arg + 1]) {
					Alert("parsing [%s:%d]: '%s' expects an integer argument.\n",
						file, linenum, args[cur_arg]);
					err_code |= ERR_ALERT | ERR_FATAL;
					goto out;
				}

				curproxy->listen->luid = atol(args[cur_arg + 1]);
				curproxy->listen->conf.id.key = curproxy->listen->luid;

				if (curproxy->listen->luid <= 0) {
					Alert("parsing [%s:%d]: custom id has to be > 0\n",
						file, linenum);
					err_code |= ERR_ALERT | ERR_FATAL;
					goto out;
				}

				node = eb32_lookup(&curproxy->conf.used_listener_id, curproxy->listen->luid);
				if (node) {
					l = container_of(node, struct listener, conf.id);
					Alert("parsing [%s:%d]: custom id %d for socket '%s' already used at %s:%d.\n",
					      file, linenum, l->luid, args[1], l->conf.file, l->conf.line);
					err_code |= ERR_ALERT | ERR_FATAL;
					goto out;
				}
				eb32_insert(&curproxy->conf.used_listener_id, &curproxy->listen->conf.id);

				cur_arg += 2;
				continue;
			}

			Alert("parsing [%s:%d] : '%s' only supports the 'transparent', 'defer-accept', 'name', 'id', 'mss' and 'interface' options.\n",
			      file, linenum, args[0]);
			err_code |= ERR_ALERT | ERR_FATAL;
			goto out;
		}
		goto out;
	}
	else if (!strcmp(args[0], "monitor-net")) {  /* set the range of IPs to ignore */
		if (!*args[1] || !str2net(args[1], &curproxy->mon_net, &curproxy->mon_mask)) {
			Alert("parsing [%s:%d] : '%s' expects address[/mask].\n",
			      file, linenum, args[0]);
			err_code |= ERR_ALERT | ERR_FATAL;
			goto out;
		}
		if (warnifnotcap(curproxy, PR_CAP_FE, file, linenum, args[0], NULL))
			err_code |= ERR_WARN;

		/* flush useless bits */
		curproxy->mon_net.s_addr &= curproxy->mon_mask.s_addr;
		goto out;
	}
	else if (!strcmp(args[0], "monitor-uri")) {  /* set the URI to intercept */
		if (warnifnotcap(curproxy, PR_CAP_FE, file, linenum, args[0], NULL))
			err_code |= ERR_WARN;

		if (!*args[1]) {
			Alert("parsing [%s:%d] : '%s' expects an URI.\n",
			      file, linenum, args[0]);
			err_code |= ERR_ALERT | ERR_FATAL;
			goto out;
		}

		free(curproxy->monitor_uri);
		curproxy->monitor_uri_len = strlen(args[1]);
		curproxy->monitor_uri = (char *)calloc(1, curproxy->monitor_uri_len + 1);
		memcpy(curproxy->monitor_uri, args[1], curproxy->monitor_uri_len);
		curproxy->monitor_uri[curproxy->monitor_uri_len] = '\0';

		goto out;
	}
	else if (!strcmp(args[0], "mode")) {  /* sets the proxy mode */
		if (!strcmp(args[1], "http")) curproxy->mode = PR_MODE_HTTP;
		else if (!strcmp(args[1], "tcp")) curproxy->mode = PR_MODE_TCP;
		else if (!strcmp(args[1], "health")) curproxy->mode = PR_MODE_HEALTH;
		else {
			Alert("parsing [%s:%d] : unknown proxy mode '%s'.\n", file, linenum, args[1]);
			err_code |= ERR_ALERT | ERR_FATAL;
			goto out;
		}
	}
	else if (!strcmp(args[0], "id")) { /* 设置haproxy的user id */
		struct eb32_node *node;

		if (curproxy == &defproxy) {
			Alert("parsing [%s:%d]: '%s' not allowed in 'defaults' section.\n",
				 file, linenum, args[0]);
			err_code |= ERR_ALERT | ERR_FATAL;
			goto out;
		}

		if (!*args[1]) {
			Alert("parsing [%s:%d]: '%s' expects an integer argument.\n",
				file, linenum, args[0]);
			err_code |= ERR_ALERT | ERR_FATAL;
			goto out;
		}

		curproxy->uuid = atol(args[1]);
		curproxy->conf.id.key = curproxy->uuid;

		if (curproxy->uuid <= 0) {
			Alert("parsing [%s:%d]: custom id has to be > 0.\n",
				file, linenum);
			err_code |= ERR_ALERT | ERR_FATAL;
			goto out;
		}

		node = eb32_lookup(&used_proxy_id, curproxy->uuid);
		if (node) {
			struct proxy *target = container_of(node, struct proxy, conf.id);
			Alert("parsing [%s:%d]: %s %s reuses same custom id as %s %s (declared at %s:%d).\n",
			      file, linenum, proxy_type_str(curproxy), curproxy->id,
			      proxy_type_str(target), target->id, target->conf.file, target->conf.line);
			err_code |= ERR_ALERT | ERR_FATAL;
			goto out;
		}
		eb32_insert(&used_proxy_id, &curproxy->conf.id);
	}
	
	
        ……
}

</pre><p></p><p>上面主要分析了解析配置文件中关于listen相关的代码,现在已经有一些proxy,listen相关信息了,但是还有一些信息没初始化,这些信息有的在check_config_validity函数中进行赋值,下面分析一下check_config_validity函数:</p><p><pre name="code" class="cpp">int check_config_validity()
{
    ……
    
                listener = NULL;
                /* 反转listen链表 */
		while (curproxy->listen) {
			struct listener *next;

			next = curproxy->listen->next;
			curproxy->listen->next = listener;
			listener = curproxy->listen;

			if (!next)
				break;

			curproxy->listen = next;
		}

		/* adjust this proxy's listeners */
		next_id = 1;
		listener = curproxy->listen;
		while (listener) {
			if (!listener->luid) {
				/* listener ID not set, use automatic numbering with first
				 * spare entry starting with next_luid.
				 */
				next_id = get_next_id(&curproxy->conf.used_listener_id, next_id);
				listener->conf.id.key = listener->luid = next_id;
				eb32_insert(&curproxy->conf.used_listener_id, &listener->conf.id);
			}
			next_id++;

			/* enable separate counters */
			if (curproxy->options2 & PR_O2_SOCKSTAT) {
				listener->counters = (struct licounters *)calloc(1, sizeof(struct licounters));
				if (!listener->name) {
					sprintf(trash, "sock-%d", listener->luid);
					listener->name = strdup(trash);
				}
			}

			if (curproxy->options & PR_O_TCP_NOLING)
				listener->options |= LI_O_NOLINGER;
			listener->maxconn = curproxy->maxconn;
			listener->backlog = curproxy->backlog;
			listener->timeout = &curproxy->timeout.client;
			listener->accept = event_accept;  /* proxy的的accept回调函数是event_accept,这只要对应listen句柄有新连接进来就会调用这函数 */
			listener->private = curproxy;
			listener->handler = process_session; /* 数据处理回调函数 */
			listener->analysers |= curproxy->fe_req_ana;

			/* smart accept mode is automatic in HTTP mode */
			if ((curproxy->options2 & PR_O2_SMARTACC) ||
			    (curproxy->mode == PR_MODE_HTTP &&
			     !(curproxy->no_options2 & PR_O2_SMARTACC)))
				listener->options |= LI_O_NOQUICKACK;

			/* We want the use_backend and default_backend rules to apply */
			listener = listener->next;
		}
}

相关的回调函数及一些最大连接、超时时间,listen的backlog队列设置完成后,现在基本就万事俱备了,下面可以开始listen,然后accept连接了,listen主要是在start_proxies函数中进行,该函数被main函数调用:

/*
 * This function creates all proxy sockets. It should be done very early,
 * typically before privileges are dropped. The sockets will be registered
 * but not added to any fd_set, in order not to loose them across the fork().
 * The proxies also start in IDLE state, meaning that it will be
 * maintain_proxies that will finally complete their loading.
 *
 * Its return value is composed from ERR_NONE, ERR_RETRYABLE and ERR_FATAL.
 * Retryable errors will only be printed if <verbose> is not zero.
 */
int start_proxies(int verbose)
{
	struct proxy *curproxy;
	struct listener *listener;
	int lerr, err = ERR_NONE;
	int pxerr;
	char msg[100];

	for (curproxy = proxy; curproxy != NULL; curproxy = curproxy->next) {
		if (curproxy->state != PR_STNEW)
			continue; /* already initialized */

		pxerr = 0;
		for (listener = curproxy->listen; listener != NULL; listener = listener->next) {
			if (listener->state != LI_ASSIGNED)
				continue; /* already started */

			lerr = tcp_bind_listener(listener, msg, sizeof(msg)); /* 这边进行创建句柄,进行listen,把句柄加入IO事件驱动机制中去,这样有新连接进来就会调用event_accept函数接收连接了 */

			/* errors are reported if <verbose> is set or if they are fatal */
			if (verbose || (lerr & (ERR_FATAL | ERR_ABORT))) {
				if (lerr & ERR_ALERT)
					Alert("Starting %s %s: %s\n",
					      proxy_type_str(curproxy), curproxy->id, msg);
				else if (lerr & ERR_WARN)
					Warning("Starting %s %s: %s\n",
						proxy_type_str(curproxy), curproxy->id, msg);
			}

			err |= lerr;
			if (lerr & (ERR_ABORT | ERR_FATAL)) {
				pxerr |= 1;
				break;
			}
			else if (lerr & ERR_CODE) {
				pxerr |= 1;
				continue;
			}
		}

		if (!pxerr) {
			curproxy->state = PR_STIDLE;
			send_log(curproxy, LOG_NOTICE, "Proxy %s started.\n", curproxy->id);
		}

		if (err & ERR_ABORT)
			break;
	}

	return err;
}

此时,proxy已开始进行listen了,一个客户端发起了一个http请求,先进行tcp的三次握手,三次握手完成后,内核会唤醒haproxy进程,注意对于支持也配置了defer-accept的,只有接收到数据后才会唤醒haproxy进程。haproxy的事件驱动机制的事件处理也是在main函数中进行,具体函数为main函数调用run_poll_loop函数,run_poll_loop调用cur_poller.poll(&cur_poller, next);函数,此时有新客户端去连接服务器,haproxy接收到新连接,唤醒对应的listen句柄,listen句柄会收到读事件,而listen读事件的回调函数是event_accept,IO事件驱动调用event_accept函数,下面分析下event_accept函数到底了什么工作:

int event_accept(int fd)
{
	struct listener *l = fdtab[fd].owner;
	struct proxy *p = (struct proxy *)l->private; /* attached frontend */
	struct session *s;
	struct http_txn *txn;
	struct task *t;
	int cfd;
	int max_accept = global.tune.maxaccept;

	if (p->fe_sps_lim) {
		int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0);
		if (max_accept > max)
			max_accept = max;
	}

	while (p->feconn < p->maxconn && actconn < global.maxconn && max_accept--) { /* 判断是否可以接收连接,主要是判断该proxy对应frontend创建的连接有没有超过该代理的最大连接,全局活跃的连接有没有超过全局总是及一次最多可以accept的连接 */
		struct sockaddr_storage addr;
		socklen_t laddr = sizeof(addr);

		if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) { /* 接收连接 */
			……}

		if (l->nbconn >= l->maxconn) {
			/* too many connections, we shoot this one and return.
			 * FIXME: it would be better to simply switch the listener's
			 * state to LI_FULL and disable the FD. We could re-enable
			 * it upon fd_delete(), but this requires all protocols to
			 * be switched.
			 */
			goto out_close;
		}

		if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while,创建会话 */
			Alert("out of memory in event_accept().\n");
			disable_listener(l);
			p->state = PR_STIDLE;
			goto out_close;
		}

		LIST_INIT(&s->back_refs);

		s->flags = 0;
		s->term_trace = 0;

		/* if this session comes from a known monitoring system, we want to ignore
		 * it as soon as possible, which means closing it immediately for TCP.
		 */
		if (addr.ss_family == AF_INET &&
		    p->mon_mask.s_addr &&
		    (((struct sockaddr_in *)&addr)->sin_addr.s_addr & p->mon_mask.s_addr) == p->mon_net.s_addr) {
			if (p->mode == PR_MODE_TCP) {
				close(cfd);
				pool_free2(pool2_session, s);
				continue;
			}
			s->flags |= SN_MONITOR;
		}

		LIST_ADDQ(&sessions, &s->list);

		if ((t = task_new()) == NULL) { /* disable this proxy for a while,创建一个task,该task主要用于处理后续连接后端服务器、解析、数据等操作 */
			Alert("out of memory in event_accept().\n");
			disable_listener(l);
			p->state = PR_STIDLE;
			goto out_free_session;
		}

		s->cli_addr = addr;
		if (cfd >= global.maxsock) {
			Alert("accept(): not enough free sockets. Raise -n argument. Giving up.\n");
			goto out_free_task;
		}
                /* 设置一下句柄属性,包括非阻塞、保活、接收/发送缓冲区等 */
		if ((fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) ||
		    (setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY,
				(char *) &one, sizeof(one)) == -1)) {
			Alert("accept(): cannot set the socket in non blocking mode. Giving up\n");
			goto out_free_task;
		}

		if (p->options & PR_O_TCP_CLI_KA)
			setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one));

		if (p->options & PR_O_TCP_NOLING)
			setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger));

		if (global.tune.client_sndbuf)
			setsockopt(cfd, SOL_SOCKET, SO_SNDBUF, &global.tune.client_sndbuf, sizeof(global.tune.client_sndbuf));

		if (global.tune.client_rcvbuf)
			setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &global.tune.client_rcvbuf, sizeof(global.tune.client_rcvbuf));

		t->process = l->handler; /* 此task的回调函数也是process_session */
		t->context = s;
		t->nice = l->nice;

		s->task = t;
		s->listener = l;

		/* Note: initially, the session's backend points to the frontend.
		 * This changes later when switching rules are executed or
		 * when the default backend is assigned.
		 */
		s->be = s->fe = p;

		s->req = s->rep = NULL; /* will be allocated later */
                /* 设置stream interface信息,0对于的是frontend端,1对应的是backend端,对于update、chk_rcv、chk_snd函数指针会在process_session等函数中调用,收发事件设置等 */
		s->si[0].state = s->si[0].prev_state = SI_ST_EST;
		s->si[0].err_type = SI_ET_NONE;
		s->si[0].err_loc = NULL;
		s->si[0].owner = t;
		s->si[0].update = stream_sock_data_finish;
		s->si[0].shutr = stream_sock_shutr;
		s->si[0].shutw = stream_sock_shutw;
		s->si[0].chk_rcv = stream_sock_chk_rcv;
		s->si[0].chk_snd = stream_sock_chk_snd;
		s->si[0].connect = NULL;
		s->si[0].iohandler = NULL;
		s->si[0].fd = cfd;
		s->si[0].flags = SI_FL_NONE | SI_FL_CAP_SPLTCP; /* TCP splicing capable */
		if (s->fe->options2 & PR_O2_INDEPSTR)
			s->si[0].flags |= SI_FL_INDEP_STR;
		s->si[0].exp = TICK_ETERNITY;

		s->si[1].state = s->si[1].prev_state = SI_ST_INI;
		s->si[1].err_type = SI_ET_NONE;
		s->si[1].err_loc = NULL;
		s->si[1].owner = t;
		s->si[1].update = stream_sock_data_finish;
		s->si[1].shutr = stream_sock_shutr;
		s->si[1].shutw = stream_sock_shutw;
		s->si[1].chk_rcv = stream_sock_chk_rcv;
		s->si[1].chk_snd = stream_sock_chk_snd;
		s->si[1].connect = tcpv4_connect_server;
		s->si[1].iohandler = NULL;
		s->si[1].exp = TICK_ETERNITY;
		s->si[1].fd = -1; /* just to help with debugging */
		s->si[1].flags = SI_FL_NONE;
		if (s->be->options2 & PR_O2_INDEPSTR)
			s->si[1].flags |= SI_FL_INDEP_STR;

		s->srv = s->prev_srv = s->srv_conn = NULL;
		s->pend_pos = NULL;
		s->conn_retries = s->be->conn_retries;

		/* init store persistence */
		s->store_count = 0;

		/* FIXME: the logs are horribly complicated now, because they are
		 * defined in <p>, <p>, and later <be> and <be>.
		 */

		if (s->flags & SN_MONITOR)
			s->logs.logwait = 0;
		else
			s->logs.logwait = p->to_log;

		if (s->logs.logwait & LW_REQ)
			s->do_log = http_sess_log;
		else
			s->do_log = tcp_sess_log;

		/* default error reporting function, may be changed by analysers */
		s->srv_error = default_srv_error;

		s->logs.accept_date = date; /* user-visible date for logging */
		s->logs.tv_accept = now;  /* corrected date for internal use */
		tv_zero(&s->logs.tv_request);
		s->logs.t_queue = -1;
		s->logs.t_connect = -1;
		s->logs.t_data = -1;
		s->logs.t_close = 0;
		s->logs.bytes_in = s->logs.bytes_out = 0;
		s->logs.prx_queue_size = 0;  /* we get the number of pending conns before us */
		s->logs.srv_queue_size = 0; /* we will get this number soon */

		s->data_source = DATA_SRC_NONE;

		s->uniq_id = totalconn;
		proxy_inc_fe_ctr(l, p);	/* note: cum_beconn will be increased once assigned */

		txn = &s->txn;
		/* Those variables will be checked and freed if non-NULL in
		 * session.c:session_free(). It is important that they are
		 * properly initialized.
		 */
		txn->sessid = NULL;
		txn->srv_cookie = NULL;
		txn->cli_cookie = NULL;
		txn->uri = NULL;
		txn->req.cap = NULL;
		txn->rsp.cap = NULL;
		txn->hdr_idx.v = NULL;
		txn->hdr_idx.size = txn->hdr_idx.used = 0;

                ……

		if ((s->req = pool_alloc2(pool2_buffer)) == NULL)  /* 申请req内存,该req主要是frontend接收数据,backend发给后端服务器的 */
			goto out_fail_req; /* no memory */

		s->req->size = global.tune.bufsize;
		buffer_init(s->req);
		s->req->prod = &s->si[0]; /* req的生产者对于的是对应frontend端的句柄 */
		s->req->cons = &s->si[1]; /* req的消费者对应backend端的句柄 */
		s->si[0].ib = s->si[1].ob = s->req;

		s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */

		if (p->mode == PR_MODE_HTTP)
			s->req->flags |= BF_READ_DONTWAIT; /* one read is usually enough */

		/* activate default analysers enabled for this listener */<pre name="code" class="cpp">                /* 对于http模式,一般会有AN_REQ_WAIT_HTTP | AN_REQ_HTTP_PROCESS_FE | AN_REQ_SWITCHING_RULES,tcp模式有AN_REQ_SWITCHING_RULES */<span style="font-family: Arial, Helvetica, sans-serif;">        </span>

		s->req->analysers = l->analysers; 
		/* note: this should not happen anymore since there's always at least the switching rules */
		if (!s->req->analysers) {
			buffer_auto_connect(s->req);  /* don't wait to establish connection */
			buffer_auto_close(s->req);    /* let the producer forward close requests */
		}

		s->req->rto = s->fe->timeout.client;
		s->req->wto = s->be->timeout.server;
		s->req->cto = s->be->timeout.connect;

		if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) /* 申请request的内存空间,对应于真实服务器发送的数据 */
			goto out_fail_rep; /* no memory */

		s->rep->size = global.tune.bufsize;
		buffer_init(s->rep);
		s->rep->prod = &s->si[1];
		s->rep->cons = &s->si[0];
		s->si[0].ob = s->si[1].ib = s->rep;
		s->rep->analysers = 0;

		if (s->fe->options2 & PR_O2_NODELAY) {
			s->req->flags |= BF_NEVER_WAIT;
			s->rep->flags |= BF_NEVER_WAIT;
		}

		s->rep->rto = s->be->timeout.server;
		s->rep->wto = s->fe->timeout.client;
		s->rep->cto = TICK_ETERNITY;

		s->req->rex = TICK_ETERNITY;
		s->req->wex = TICK_ETERNITY;
		s->req->analyse_exp = TICK_ETERNITY;
		s->rep->rex = TICK_ETERNITY;
		s->rep->wex = TICK_ETERNITY;
		s->rep->analyse_exp = TICK_ETERNITY;
		t->expire = TICK_ETERNITY;
                /* 把accept出来的句柄加入IO事件驱动模型中去,此时对应读事情会调用stream_sock_read函数,写事件调用stream_sock_write函数 */
		fd_insert(cfd);
		fdtab[cfd].owner = &s->si[0];
		fdtab[cfd].state = FD_STREADY;
		fdtab[cfd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY;
		if (p->options & PR_O_TCP_NOLING)
			fdtab[cfd].flags |= FD_FL_TCP_NOLING;

		fdtab[cfd].cb[DIR_RD].f = l->proto->read; /* 指向proto_tcpv4,对应的读函数是stream_sock_read */
		fdtab[cfd].cb[DIR_RD].b = s->req;
		fdtab[cfd].cb[DIR_WR].f = l->proto->write; <span style="font-family: Arial, Helvetica, sans-serif;">/* 指向proto_tcpv4,对应的读函数是stream_sock_write */</span>
		fdtab[cfd].cb[DIR_WR].b = s->rep;
		fdinfo[cfd].peeraddr = (struct sockaddr *)&s->cli_addr;
		fdinfo[cfd].peerlen = sizeof(s->cli_addr);

		if ((p->mode == PR_MODE_HTTP && (s->flags & SN_MONITOR)) ||
		    (p->mode == PR_MODE_HEALTH && (p->options & PR_O_HTTP_CHK))) {
			/* Either we got a request from a monitoring system on an HTTP instance,
			 * or we're in health check mode with the 'httpchk' option enabled. In
			 * both cases, we return a fake "HTTP/1.0 200 OK" response and we exit.
			 */
			struct chunk msg;
			chunk_initstr(&msg, "HTTP/1.0 200 OK\r\n\r\n");
			stream_int_retnclose(&s->si[0], &msg); /* forge a 200 response */
			s->req->analysers = 0;
			t->expire = s->rep->wex;
		}
		else if (p->mode == PR_MODE_HEALTH) {  /* health check mode, no client reading */
			struct chunk msg;
			chunk_initstr(&msg, "OK\n");
			stream_int_retnclose(&s->si[0], &msg); /* forge an "OK" response */
			s->req->analysers = 0;
			t->expire = s->rep->wex;
		}
		else {
			EV_FD_SET(cfd, DIR_RD); /* 正常模式只会设置读事件 */
		}

		/* it is important not to call the wakeup function directly but to
		 * pass through task_wakeup(), because this one knows how to apply
		 * priorities to tasks.
		 */
		task_wakeup(t, TASK_WOKEN_INIT);

		l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
		if (l->nbconn >= l->maxconn) {
			EV_FD_CLR(l->fd, DIR_RD);
			l->state = LI_FULL;
		}

		p->feconn++;  /* beconn will be increased later */
		if (p->feconn > p->counters.feconn_max)
			p->counters.feconn_max = p->feconn;

		if (l->counters) {
			if (l->nbconn > l->counters->conn_max)
				l->counters->conn_max = l->nbconn;
		}

		actconn++;
		totalconn++;

		// fprintf(stderr, "accepting from %p => %d conn, %d total, task=%p\n", p, actconn, totalconn, t);
	} /* end of while (p->feconn < p->maxconn) */
	return 0;

……
}

event_accept主要是接收新的了解,创建一个task和session,初始化task、session操作,另外还会把accept出来的句柄加入IO事件模型中去,到时候有数据发送过来就会调用回调函数stream_sock_read函数来接收数据,至此,haproxy已经可以接收客户端发送过来的请求了,那么haproxy什么时候connect真实服务器呢?怎么把客户端发过来的数据发给真实服务器呢?这边先讲下大概流程:

event_accept中创建的task会被执行,此时会调用process_session函数,在process_session函数中会先处理数据,如果对于http模式,req未接收完整或者还未接收到数据,此时会调用buffer_dont_connect函数,此时cons的state为SI_ST_INT,继续执行process_session下面的代码,此时不会进行分配后端服务器,不会connect后端服务器,但对于处理完req或者是tcp模式,会判断cons->state的状态是否为SI_ST_INT状态,继续往下判断,此时会s->req->cons->state
= SI_ST_REQ,继续往下执行就会执行到sess_prepare_conn_req函数,会根据配置的负载均衡算法分配后端服务器,设置状态为SI_ST_ASS状态,继续while讯,执行sess_update_stream_int函数,connect后端服务器……

这边就不继续往下分析了,后续再进行详细分析,未完待续。

抱歉!评论已关闭.