#include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <sys/epoll.h> #include <errno.h> #include "thread.h" #include "list.h" #include <netinet/in.h> #include <netinet/tcp.h> #include <sys/socket.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/signal.h> #include <sys/ioctl.h> #include <fcntl.h> typedef int BOOL; typedef struct _WORKER { int epfd; int nextepfd; int waitevents; }WORKER, *PWORKER; static int g_thread_num; static int g_max_backlogs; static int g_max_events; static int g_listenfd; static short g_listnport; static BOOL g_keepalive; static BOOL g_stop; static inline void epoll_add(int epfd, int fd, int events) { struct epoll_event ev; ev.events = events | EPOLLERR | EPOLLET ; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); } static inline void epoll_mod(int epfd, int fd, int events) { struct epoll_event ev; ev.events = events | EPOLLERR | EPOLLET; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); } static inline void epoll_del(int epfd, int fd) { struct epoll_event ev; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev); } static void set_tcpnodelay(int sockfd, BOOL set){ int v; struct linger linger = {0, 0}; int optval = 1; if( set ) { v=1; setsockopt(sockfd, SOL_TCP, TCP_NODELAY, &v, sizeof(v)); v=0; setsockopt(sockfd, SOL_TCP, TCP_CORK, &v, sizeof(v)); setsockopt(sockfd, SOL_SOCKET, SO_LINGER, (int *)&linger, sizeof(linger)); } } static void set_nonblock(int sockfd, BOOL set){ if(set) { fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL)|O_NONBLOCK); } } static void set_reuseable(int sockfd, BOOL set) { int optval = 1; if(set) { setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); } } void HandleRecv(int epfd, int clifd, int nextepfd, int keepalive) { int rc; char buf[4096] = { 0 }; while(1) { rc = recv(clifd, buf, sizeof(buf), MSG_DONTWAIT); if( rc < 0 ) { if( errno == EINTR ) { continue; }else { break; } }else if( rc == 0 ) { send(clifd, buf, rc, MSG_DONTWAIT|MSG_NOSIGNAL); epoll_del(epfd, clifd); close(clifd); }else if( rc > 0 ) { send(clifd, buf, rc, MSG_DONTWAIT|MSG_NOSIGNAL); if(keepalive) { epoll_del(epfd, clifd); epoll_add(nextepfd, clifd, EPOLLIN); }else { epoll_del(epfd, clifd); close(clifd); } } } } int worker_loop(void *para){ PWORKER wk = (PWORKER)para; struct epoll_event *pev = NULL; struct sockaddr_in addr; int socklen = sizeof(addr); int i; int n; int clifd; int v = 0; pev = (struct epoll_event*)malloc(sizeof(struct epoll_event) * g_max_events); while(!g_stop) { n = epoll_wait(wk->epfd, pev, wk->waitevents, 1); if( n <= 0 ) { continue; } for( i = 0; i < n; ++i ) { if(g_listenfd == pev[i].data.fd) { while(!g_stop){ clifd = accept(g_listenfd, (struct sockaddr*)&addr, &socklen); if(clifd < 0 ) break; set_nonblock(clifd, 1); set_tcpnodelay(clifd, 1); epoll_add(wk->epfd, clifd, EPOLLIN); HandleRecv(wk->epfd, clifd, wk->nextepfd, g_keepalive); } } else { if( pev[i].events & EPOLLIN ){ HandleRecv(wk->epfd, pev[i].data.fd, wk->nextepfd, g_keepalive); } } } } free(pev); pthread_exit(NULL); } PWORKER init_worker(int threadnum, int maxevents, int listenfd){ int i; char thread_name[THREAD_NAME_LEN]; PWORKER wk; wk = (PWORKER)malloc(sizeof(WORKER)*threadnum); wk[0].epfd = epoll_create(maxevents); wk[0].waitevents = maxevents-(maxevents/threadnum*(threadnum-1)); epoll_add(wk[0].epfd, listenfd, EPOLLIN); snprintf(thread_name, sizeof(thread_name), "Worker%d", 0); register_thread(thread_name, worker_loop, &wk[0], 0); for( i = 1; i < threadnum; ++i ) { wk[i].epfd = epoll_create(maxevents); wk[i].waitevents = maxevents/threadnum; wk[i-1].nextepfd = wk[i].epfd; epoll_add(wk[i].epfd, listenfd, EPOLLIN); snprintf(thread_name, sizeof(thread_name), "worker%d", i); register_thread(thread_name, worker_loop, &wk[i], 0); } wk[i-1].nextepfd = wk[0].epfd; return wk; } void fini_worker(PWORKER wk){ free(wk); } int make_socket(uint16_t port, int backlogs) { struct sockaddr_in addr; int s; int optval = 1; s = socket(AF_INET, SOCK_STREAM, 0); if(s < 0) { perror("socket"); return -1; } set_reuseable(s, 1); set_nonblock(s, 1); set_tcpnodelay(s, 1); memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = 0; addr.sin_port = htons(port); if(bind(s, (struct sockaddr*)&addr, sizeof(addr))<0){ perror("bind"); return -1; } if( listen(s, backlogs) < 0 ) { perror("listen"); return -1; } setsockopt(s, SOL_TCP, TCP_DEFER_ACCEPT, &optval, sizeof(optval)); return s; } static void sigusr1_handler(int signo) { g_stop = 1; } void set_stop_signal(){ struct sigaction sa; sigset_t sset; memset(&sa, 0, sizeof(sa)); sa.sa_handler = sigusr1_handler; sigaction(SIGUSR1, &sa, NULL); } int main(int argc, char** argv) { PWORKER wk; int status = -1; if( argc < 3 ) { printf("%s <port> <keepalive>\n", argv[0]); return -1; } set_stop_signal(); g_stop = 0; g_keepalive = atoi(argv[2]); g_listnport = atoi(argv[1]); g_max_backlogs = 10000; g_thread_num = 16; g_max_events = 65535; g_listenfd = make_socket(g_listnport, g_max_backlogs); wk = init_worker(g_thread_num, g_max_events, g_listenfd); start_threads(); while(!g_stop) { sleep(1); } sleep(1); stop_threads(); fini_worker(wk); close(g_listenfd); return 0; }