现在的位置: 首页 > 综合 > 正文

epoll

2018年05月08日 ⁄ 综合 ⁄ 共 4532字 ⁄ 字号 评论关闭
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <errno.h>
#include "thread.h"
#include "list.h"

#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/signal.h>
#include <sys/ioctl.h>
#include <fcntl.h>


typedef int BOOL;

typedef struct _WORKER {
        int epfd;
        int nextepfd;
        int waitevents;        
}WORKER, *PWORKER;

static int   g_thread_num;
static int   g_max_backlogs;
static int   g_max_events;
static int   g_listenfd;
static short g_listnport;
static BOOL  g_keepalive;
static BOOL  g_stop;

static inline void epoll_add(int epfd, int fd, int events) {
	struct epoll_event ev;
	ev.events = events | EPOLLERR | EPOLLET ;
	ev.data.fd = fd;
	epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);        
}
static inline void epoll_mod(int epfd, int fd, int events) {
	struct epoll_event ev;
	ev.events = events | EPOLLERR | EPOLLET;
	ev.data.fd = fd;
	epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);        
}
static inline void epoll_del(int epfd, int fd) {
	struct epoll_event ev;
	ev.data.fd = fd;
	epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev);        
}

static void set_tcpnodelay(int sockfd, BOOL set){
	int v;
	struct linger linger = {0, 0};
	int optval = 1;
	if( set ) {
		v=1;
		setsockopt(sockfd, SOL_TCP, TCP_NODELAY, &v, sizeof(v));
		v=0;
		setsockopt(sockfd, SOL_TCP, TCP_CORK, &v, sizeof(v));

		setsockopt(sockfd, SOL_SOCKET, SO_LINGER, (int *)&linger, sizeof(linger));
	}
}

static void set_nonblock(int sockfd, BOOL set){
	if(set) {
		fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL)|O_NONBLOCK);
	}
}

static void set_reuseable(int sockfd, BOOL set) {
	int optval = 1;
	if(set) {
		setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
	}
}

void HandleRecv(int epfd, int clifd, int nextepfd, int keepalive) {
	int rc;
	char buf[4096] = { 0 };
	while(1) {
		rc = recv(clifd, buf, sizeof(buf), MSG_DONTWAIT);
		if( rc < 0 ) {
			if( errno == EINTR ) {
					continue;
			}else {
					break;
			}
		}else if( rc == 0 ) {
			send(clifd, buf, rc, MSG_DONTWAIT|MSG_NOSIGNAL);
			epoll_del(epfd, clifd);
			close(clifd);
		}else if( rc > 0 ) {
			send(clifd, buf, rc, MSG_DONTWAIT|MSG_NOSIGNAL);
			if(keepalive) {
				epoll_del(epfd, clifd);
				epoll_add(nextepfd, clifd, EPOLLIN);
			}else {
				epoll_del(epfd, clifd);
				close(clifd);
			}
		}
	}
}

int worker_loop(void *para){
	PWORKER wk = (PWORKER)para;
	struct epoll_event *pev = NULL;
	struct sockaddr_in addr;
	int socklen = sizeof(addr);

	int i;
	int n;
	int clifd;
	int v = 0;        


	pev = (struct epoll_event*)malloc(sizeof(struct epoll_event) * g_max_events);

	while(!g_stop) {
		n = epoll_wait(wk->epfd, pev, wk->waitevents, 1);
		if( n <= 0 ) {
			continue;
		}

		for( i = 0; i < n; ++i ) {
			if(g_listenfd == pev[i].data.fd) {
				while(!g_stop){
					clifd = accept(g_listenfd, (struct sockaddr*)&addr, &socklen);

					if(clifd < 0 ) break;

					set_nonblock(clifd, 1);
					set_tcpnodelay(clifd, 1);

					epoll_add(wk->epfd, clifd, EPOLLIN);
					HandleRecv(wk->epfd, clifd, wk->nextepfd, g_keepalive);
				}
			} else {
				if( pev[i].events & EPOLLIN ){
					HandleRecv(wk->epfd, pev[i].data.fd, wk->nextepfd, g_keepalive);
				}
			}
		}
	}

	free(pev);
	pthread_exit(NULL);

}

PWORKER init_worker(int threadnum, int maxevents, int listenfd){
	int i;
	char thread_name[THREAD_NAME_LEN];
	PWORKER wk;

	wk = (PWORKER)malloc(sizeof(WORKER)*threadnum);

	wk[0].epfd = epoll_create(maxevents);                
	wk[0].waitevents = maxevents-(maxevents/threadnum*(threadnum-1));
	epoll_add(wk[0].epfd, listenfd, EPOLLIN);
	snprintf(thread_name, sizeof(thread_name), "Worker%d", 0);
	register_thread(thread_name, worker_loop, &wk[0], 0);

	for( i = 1; i < threadnum; ++i ) {

		wk[i].epfd = epoll_create(maxevents);
		wk[i].waitevents = maxevents/threadnum;
		wk[i-1].nextepfd = wk[i].epfd;
		epoll_add(wk[i].epfd, listenfd, EPOLLIN);
		snprintf(thread_name, sizeof(thread_name), "worker%d", i);
		register_thread(thread_name, worker_loop, &wk[i], 0);
	}

	wk[i-1].nextepfd = wk[0].epfd;

	return wk;
}

void fini_worker(PWORKER wk){
	free(wk);
}

int make_socket(uint16_t port, int backlogs) {
	struct sockaddr_in addr;        
	int s;        
	int optval = 1;

	s = socket(AF_INET, SOCK_STREAM, 0);
	if(s < 0) {
		perror("socket");
		return -1;
	}

	set_reuseable(s, 1);
	set_nonblock(s, 1);
	set_tcpnodelay(s, 1);

	memset(&addr, 0, sizeof(struct sockaddr_in));
	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = 0;
	addr.sin_port = htons(port);

	if(bind(s, (struct sockaddr*)&addr, sizeof(addr))<0){
		perror("bind");
		return -1;
	}

	if( listen(s, backlogs) < 0 ) {
		perror("listen");
		return -1;
	}

	setsockopt(s, SOL_TCP, TCP_DEFER_ACCEPT, &optval, sizeof(optval));

	return s;
}

static void sigusr1_handler(int signo) {
	g_stop = 1;
}

void set_stop_signal(){
	struct sigaction sa;
	sigset_t sset;
	memset(&sa, 0, sizeof(sa));
	sa.sa_handler = sigusr1_handler;
	sigaction(SIGUSR1, &sa, NULL);
}
int main(int argc, char** argv) {
	PWORKER wk;
	int status = -1;

	if( argc < 3 ) {
		printf("%s <port> <keepalive>\n", argv[0]);
		return -1;
	}

	set_stop_signal();

	g_stop = 0;
	g_keepalive = atoi(argv[2]);
	g_listnport = atoi(argv[1]);
	g_max_backlogs = 10000;
	g_thread_num = 16;
	g_max_events = 65535;
	g_listenfd = make_socket(g_listnport, g_max_backlogs);
	wk = init_worker(g_thread_num, g_max_events, g_listenfd);

	start_threads();

	while(!g_stop) {
		sleep(1);
	}

	sleep(1);
	stop_threads();        
	fini_worker(wk);
	close(g_listenfd);        

	return 0;
}

抱歉!评论已关闭.