epoll原理简介
通过上面的分析,poll运行效率的两个瓶颈已经找出,现在的问题是怎么改进。首先,每次poll都要把1000个fd 拷入内核,太不科学了,内核干嘛不自己保存已经拷入的fd呢?答对了,epoll就是自己保存拷入的fd,它的API就已经说明了这一点——不是 epoll_wait的时候才传入fd,而是通过epoll_ctl把所有fd传入内核再一起"wait",这就省掉了不必要的重复拷贝。其次,在
epoll_wait时,也不是把current轮流的加入fd对应的设备等待队列,而是在设备等待队列醒来时调用一个回调函数(当然,这就需要“唤醒回调”机制),把产生事件的fd归入一个链表,然后返回这个链表上的fd。
epoll剖析
epoll是个module,所以先看看module的入口eventpoll_init
[fs/eventpoll.c-->evetpoll_init()]
static int __init eventpoll_init(void) { int error; init_MUTEX(&epsem); /* Initialize the structure used to perform safe poll wait head wake ups */ ep_poll_safewake_init(&psw); /* Allocates slab cache used to allocate "struct epitem" items */ epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC,NULL, NULL); /* Allocates slab cache used to allocate "struct eppoll_entry" */ pwq_cache = kmem_cache_create("eventpoll_pwq", sizeof(struct eppoll_entry), 0,EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL); /* * Register the virtual file system that will be the source of inodes * for the eventpoll files */ error = register_filesystem(&eventpoll_fs_type); if (error) goto epanic; /* Mount the above commented virtual file system */ eventpoll_mnt = kern_mount(&eventpoll_fs_type); error = PTR_ERR(eventpoll_mnt); if (IS_ERR(eventpoll_mnt)) goto epanic; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: successfully initialized.\n",current)); return 0; epanic: panic("eventpoll_init() failed\n"); }
很有趣,这个module在初始化时注册了一个新的文件系统,叫"eventpollfs"(在eventpoll_fs_type结构里),然后挂载此文件系统。另外创建两个内核cache(在内核编程中,如果需要频繁分配小块内存,应该创建kmem_cahe来做“内存池”),分别用于存放struct
epitem和eppoll_entry。如果以后要开发新的文件系统,可以参考这段代码。
现在想想epoll_create为什么会返回一个新的fd?因为它就是在这个叫做"eventpollfs"的文件系统里创建了一个新文件!如下:
[fs/eventpoll.c-->sys_epoll_create()]
asmlinkage long sys_epoll_create(int size) { int error, fd; struct inode *inode; struct file *file; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d)\n",current, size)); /* Sanity check on the size parameter */ error = -EINVAL; if (size <= 0) goto eexit_1; /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure, and inode and a free file descriptor. */ error = ep_getfd(&fd, &inode, &file); if (error) goto eexit_1; /* Setup the file internal data structure ( "struct eventpoll" ) */ error = ep_file_init(file); if (error) goto eexit_2; //... }
函数很简单,其中ep_getfd看上去是“get”,其实在第一次调用epoll_create时,它是要创建新inode、新的file、新的fd。而ep_file_init则要创建一个struct eventpoll结构,并把它放入file->private_data,注意,这个private_data后面还要用到的。
看到这里,也许有人要问了,为什么epoll的开发者不做一个内核的超级大map把用户要创建的epoll句柄存起来,在epoll_create时返回一个指针?那似乎很直观呀。但是,仔细看看,linux的系统调用有多少是返回指针的?你会发现几乎没有!(特此强调,malloc不是系统调用,malloc调用的brk才是)因为linux做为unix的最杰出的继承人,它遵循了unix的一个巨大优点——一切皆文件,输入输出是文件、socket也是文件,一切皆文件意味着使用这个操作系统的程序可以非常简单,因为一切都是文件操作而已!(unix还不是完全做到,plan
9才算)。而且使用文件系统有个好处:epoll_create返回的是一个fd,而不是该死的指针,指针如果指错了,你简直没办法判断,而fd则可以通过current->files->fd_array[]找到其真伪。
/* * This structure is stored inside the "private_data" member of the file * structure and rapresent the main data sructure for the eventpoll * interface. */ struct eventpoll { /* Protect the this structure access */ rwlock_t lock; /* * This semaphore is used to ensure that files are not removed * while epoll is using them. This is read-held during the event * collection loop and it is write-held during the file cleanup * path, the epoll file exit code and the ctl operations. */ struct rw_semaphore sem; /* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq; /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait; /* List of ready file descriptors */ struct list_head rdllist; /* RB-Tree root used to store monitored fd structs */ struct rb_root rbr; };
epoll_create好了,该epoll_ctl了,我们略去判断性的代码:
[fs/eventpoll.c-->sys_epoll_ctl()]
asmlinkage long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event) { int error; struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; //.... epi = ep_find(ep, tfile, fd); error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); } else error = -EEXIST; break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); } else error = -ENOENT; break; } //... }
原来就是在一个大的结构(现在先不管是什么大结构)里先ep_find,如果找到了struct epitem而用户操作是ADD,那么返回-EEXIST;如果是DEL,则ep_remove。如果找不到struct epitem而用户操作是ADD,就ep_insert创建并插入一个。很直白。那这个“大结构”是什么呢?看ep_find的调用方式,ep参数应该是指向这个“大结构”的指针,再看ep
= file->private_data,我们才明白,原来这个“大结构”就是那个在epoll_create时创建的struct eventpoll,具体再看看ep_find的实现,发现原来是struct eventpoll的rbr成员(struct rb_root),原来这是一个红黑树的根!而红黑树上挂的都是struct epitem。现在清楚了,一个新创建的epoll文件带有一个struct eventpoll结构,这个结构上再挂一个红黑树,而这个红黑树就是每次epoll_ctl时fd存放的地方!
现在数据结构都已经清楚了,我们来看最核心的:[fs/eventpoll.c-->sys_epoll_wait()]
asmlinkage long sys_epoll_wait(int epfd, struct epoll_event __user *events,int maxevents, int timeout) { int error; struct file *file; struct eventpoll *ep; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d)\n", current, epfd, events, maxevents, timeout)); /* The maximum number of event must be greater than zero */ if (maxevents <= 0) return -EINVAL; /* Verify that the area passed by the user is writeable */ if ((error = verify_area(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))) goto eexit_1; /* Get the "struct file *" for the eventpoll file */ error = -EBADF; file = fget(epfd); if (!file) goto eexit_1; /* * We have to check that the file structure underneath the fd * the user passed to us _is_ an eventpoll file. */ error = -EINVAL; if (!IS_FILE_EPOLL(file)) goto eexit_2; /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ ep = file->private_data; /* Time to fish for events ... */ error = ep_poll(ep, events, maxevents, timeout); eexit_2: fput(file); eexit_1: DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d) = %d\n", current, epfd, events, maxevents, timeout, error)); return error; }
故伎重演,从file->private_data中拿到struct eventpoll,再调用ep_poll
[fs/eventpoll.c-->sys_epoll_wait()->ep_poll()]
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { int res, eavail; unsigned long flags; long jtimeout; wait_queue_t wait; /* * Calculate the timeout by checking for the "infinite" value ( -1 ) * and the overflow condition. The passed timeout is in milliseconds, * that why (t * HZ) / 1000. */ jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ? MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000; retry: write_lock_irqsave(&ep->lock, flags); res = 0; if (list_empty(&ep->rdllist)) { /* * We don't have any available event to return to the caller. * We need to sleep here, and we will be wake up by * ep_poll_callback() when events will become available. */ init_waitqueue_entry(&wait, current); add_wait_queue(&ep->wq, &wait); for (;;) { /* * We don't want to sleep if the ep_poll_callback() sends us * a wakeup in between. That's why we set the task state * to TASK_INTERRUPTIBLE before doing the checks. */ set_current_state(TASK_INTERRUPTIBLE); if (!list_empty(&ep->rdllist) || !jtimeout) break; if (signal_pending(current)) { res = -EINTR; break; } write_unlock_irqrestore(&ep->lock, flags); jtimeout = schedule_timeout(jtimeout); write_lock_irqsave(&ep->lock, flags); } remove_wait_queue(&ep->wq, &wait); set_current_state(TASK_RUNNING); } //.... }
又是一个大循环,不过这个大循环比poll的那个好,因为仔细一看——它居然除了睡觉和判断ep->rdllist是否为空以外,啥也没做!
什么也没做当然效率高了,但到底是谁来让ep->rdllist不为空呢?
答案是ep_insert时设下的回调函数:
[fs/eventpoll.c-->sys_epoll_ctl()-->ep_insert()]
923 static int ep_insert(struct eventpoll *ep, struct epoll_event *event, 924 struct file *tfile, int fd) 925 { 926 int error, revents, pwake = 0; 927 unsigned long flags; 928 struct epitem *epi; 929 struct ep_pqueue epq; 930 931 error = -ENOMEM; 932 if (!(epi = EPI_MEM_ALLOC())) 933 goto eexit_1; 934 935 /* Item initialization follow here ... */ 936 EP_RB_INITNODE(&epi->rbn); 937 INIT_LIST_HEAD(&epi->rdllink); 938 INIT_LIST_HEAD(&epi->fllink); 939 INIT_LIST_HEAD(&epi->txlink); 940 INIT_LIST_HEAD(&epi->pwqlist); 941 epi->ep = ep; 942 EP_SET_FFD(&epi->ffd, tfile, fd); 943 epi->event = *event; 944 atomic_set(&epi->usecnt, 1); 945 epi->nwait = 0; 946 947 /* Initialize the poll table using the queue callback */ 948 epq.epi = epi; 949 init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); 950 951 /* 952 * Attach the item to the poll hooks and get current event bits. 953 * We can safely use the file* here because its usage count has 954 * been increased by the caller of this function. 955 */ 956 revents = tfile->f_op->poll(tfile, &epq.pt);
我们注意949行,其实就是
&(epq.pt)->qproc = ep_ptable_queue_proc;
紧接着 tfile->f_op->poll(tfile, &epq.pt)其实就是调用被监控文件(epoll里叫“target file”)的poll方法,而这个poll其实就是调用poll_wait(还记得poll_wait吗?每个支持poll的设备驱动程序都要调用的),最后就是调用ep_ptable_queue_proc。这是比较难解的一个调用关系,因为不是语言级的直接调用。
ep_insert还把struct epitem放到struct file里的f_ep_links连表里,以方便查找,struct epitem里的fllink就是担负这个使命的。
[fs/eventpoll.c-->ep_ptable_queue_proc()]
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,poll_table *pt) { struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt); struct eppoll_entry *pwq; if (epi->nwait >= 0 && (pwq = PWQ_MEM_ALLOC())) { init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; } }
上面的代码就是ep_insert中要做的最重要的事:创建struct eppoll_entry,设置其唤醒回调函数为ep_poll_callback,然后加入设备等待队列(注意这里的whead就是上一章所说的每个设备驱动都要带的等待队列)。只有这样,当设备就绪,唤醒等待队列上的等待着时,ep_poll_callback就会被调用。每次调用poll系统调用,操作系统都要把current(当前进程)挂到fd对应的所有设备的等待队列上,可以想象,fd多到上千的时候,这样“挂”法很费事;而每次调用epoll_wait则没有这么罗嗦,epoll只在epoll_ctl时把current挂一遍(这第一遍是免不了的)并给每个fd一个命令“好了就调回调函数”,如果设备有事件了,通过回调函数,会把fd放入rdllist,而每次调用epoll_wait就只是收集rdllist里的fd就可以了
——epoll巧妙的利用回调函数,实现了更高效的事件驱动模型。
现在我们猜也能猜出来ep_poll_callback会干什么了——肯定是把红黑树上的收到event的epitem(代表
每个fd)插入ep->rdllist中,这样,当epoll_wait返回时,rdllist里就都是就绪的fd了!
[fs/eventpoll.c-->ep_poll_callback()]
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) { int pwake = 0; unsigned long flags; struct epitem *epi = EP_ITEM_FROM_WAIT(wait); struct eventpoll *ep = epi->ep; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: poll_callback(%p) epi=%p ep=%p\n",1214 current, epi->file, epi, ep)); write_lock_irqsave(&ep->lock, flags); /* * If the event mask does not contain any poll(2) event, we consider the * descriptor to be disabled. This condition is likely the effect of the * EPOLLONESHOT bit that disables the descriptor when an event is received, * until the next EPOLL_CTL_MOD will be issued. */ if (!(epi->event.events & ~EP_PRIVATE_BITS)) goto is_disabled; /* If this file is already in the ready list we exit soon */ if (EP_IS_LINKED(&epi->rdllink)) goto is_linked; list_add_tail(&epi->rdllink, &ep->rdllist); is_linked: /* * Wake up ( if active ) both the eventpoll wait list and the ->poll() * wait list. */ if (waitqueue_active(&ep->wq)) wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; is_disabled: write_unlock_irqrestore(&ep->lock, flags); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&psw, &ep->poll_wait); return 1; }
真正重要的只有1231行的只一句,就是把struct epitem放到struct eventpoll的rdllist中去。现在我们可以画出epoll的核心数据结构图了:
epoll独有的EPOLLET
EPOLLET是epoll系统调用独有的flag,ET就是Edge Trigger(边缘触发)的意思,具体含义和应用大家可google之。有了EPOLLET,重复的事件就不会总是出来打扰程序的判断,故而常被使用。那EPOLLET的原理是什么呢?
上篇我们讲到epoll把fd都挂上一个回调函数,当fd对应的设备有消息时,就把fd放入rdllist链表,这样epoll_wait只要检查这个rdllist链表就可以知道哪些fd有事件了。我们看看ep_poll的最后几行代码:
[fs/eventpoll.c->ep_poll()]
1524 1525 /* 1526 * Try to transfer events to user space. In case we get 0 events and 1527 * there's still timeout left over, we go trying again in search of 1528 * more luck. 1529 */ 1530 if (!res && eavail && 1531 !(res = ep_events_transfer(ep, events, maxevents)) && jtimeout) 1532 goto retry; 1533 1534 return res; 1535 } 把rdllist里的fd拷到用户空间,这个任务是ep_events_transfer做的: [fs/eventpoll.c->ep_events_transfer()] 1439 static int ep_events_transfer(struct eventpoll *ep, 1440 struct epoll_event __user *events, int maxevents) 1441 { 1442 int eventcnt = 0; 1443 struct list_head txlist; 1444 1445 INIT_LIST_HEAD(&txlist); 1446 1447 /* 1448 * We need to lock this because we could be hit by 1449 * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL). 1450 */ 1451 down_read(&ep->sem); 1452 1453 /* Collect/extract ready items */ 1454 if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) { 1455 /* Build result set in userspace */ 1456 eventcnt = ep_send_events(ep, &txlist, events); 1457 1458 /* Reinject ready items into the ready list */ 1459 ep_reinject_items(ep, &txlist); 1460 } 1461 1462 up_read(&ep->sem); 1463 1464 return eventcnt; 1465 }
代码很少,其中ep_collect_ready_items把rdllist里的fd挪到txlist里(挪完后rdllist就空了),接着ep_send_events把txlist里的fd拷给用户空间,然后ep_reinject_items把一部分fd从txlist里“返还”给rdllist以便下次还能从rdllist里发现它。其中ep_send_events的实现:
[fs/eventpoll.c->ep_send_events()]
1337 static int ep_send_events(struct eventpoll *ep, struct list_head *txlist, 1338 struct epoll_event __user *events) 1339 { 1340 int eventcnt = 0; 1341 unsigned int revents; 1342 struct list_head *lnk; 1343 struct epitem *epi; 1344 1345 /* 1346 * We can loop without lock because this is a task private list. 1347 * The test done during the collection loop will guarantee us that 1348 * another task will not try to collect this file. Also, items 1349 * cannot vanish during the loop because we are holding "sem". 1350 */ 1351 list_for_each(lnk, txlist) { 1352 epi = list_entry(lnk, struct epitem, txlink); 1353 1354 /* 1355 * Get the ready file event set. We can safely use the file 1356 * because we are holding the "sem" in read and this will 1357 * guarantee that both the file and the item will not vanish. 1358 */ 1359 revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL); 1360 1361 /* 1362 * Set the return event set for the current file descriptor. 1363 * Note that only the task task was successfully able to link 1364 * the item to its "txlist" will write this field. 1365 */ 1366 epi->revents = revents & epi->event.events; 1367 1368 if (epi->revents) { 1369 if (__put_user(epi->revents, 1370 &events[eventcnt].events) || 1371 __put_user(epi->event.data, 1372 &events[eventcnt].data)) 1373 return -EFAULT; 1374 if (epi->event.events & EPOLLONESHOT) 1375 epi->event.events &= EP_PRIVATE_BITS; 1376 eventcnt++; 1377 } 1378 } 1379 return eventcnt; 1380 }
这个拷贝实现其实没什么可看的,但是请注意1359行,这个poll很狡猾,它把第二个参数置为NULL来调
用。我们先看一下设备驱动通常是怎么实现poll的:
static unsigned int scull_p_poll(struct file *filp, poll_table *wait) { struct scull_pipe *dev = filp->private_data; unsigned int mask = 0; /* * The buffer is circular; it is considered full * if "wp" is right behind "rp" and empty if the * two are equal. */ down(&dev->sem); poll_wait(filp, &dev->inq, wait); poll_wait(filp, &dev->outq, wait); if (dev->rp != dev->wp) mask |= POLLIN | POLLRDNORM; /* readable */ if (spacefree(dev)) mask |= POLLOUT | POLLWRNORM; /* writable */ up(&dev->sem); return mask; }
上面这段代码摘自《linux设备驱动程序(第三版)》,绝对经典,设备先要把current(当前进程)挂在
inq和outq两个队列上(这个“挂”操作是wait回调函数指针做的),然后等设备来唤醒,唤醒后就能通过
mask拿到事件掩码了(注意那个mask参数,它就是负责拿事件掩码的)。那如果wait为NULL,
poll_wait会做些什么呢?
[include/linux/poll.h->poll_wait]
25 static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) 26 { 27 if (p && wait_address) 28 p->qproc(filp, wait_address, p); 29 }
喏,看见了,如果poll_table为空,什么也不做。我们倒回ep_send_events,那句标红的poll,实际上
就是“我不想休眠,我只想拿到事件掩码”的意思。然后再把拿到的事件掩码拷给用户空间。
ep_send_events完成后,就轮到ep_reinject_items了:
[fs/eventpoll.c->ep_reinject_items]
1389 static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist) 1390 { 1391 int ricnt = 0, pwake = 0; 1392 unsigned long flags; 1393 struct epitem *epi; 1394 1395 write_lock_irqsave(&ep->lock, flags); 1396 1397 while (!list_empty(txlist)) { 1398 epi = list_entry(txlist->next, struct epitem, txlink); 1399 1400 /* Unlink the current item from the transfer list */ 1401 EP_LIST_DEL(&epi->txlink); 1402 1403 /* 1404 * If the item is no more linked to the interest set, we don't 1405 * have to push it inside the ready list because the following 1406 * ep_release_epitem() is going to drop it. Also, if the current 1407 * item is set to have an Edge Triggered behaviour, we don't have 1408 * to push it back either. 1409 */ 1410 if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) && 1411 (epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) { 1412 list_add_tail(&epi->rdllink, &ep->rdllist); 1413 ricnt++; 1414 } 1415 } 1416 1417 if (ricnt) { 1418 /* 1419 * Wake up ( if active ) both the eventpoll wait list and the ->poll() 1420 * wait list. 1421 */ 1422 if (waitqueue_active(&ep->wq)) 1423 wake_up(&ep->wq); 1424 if (waitqueue_active(&ep->poll_wait)) 1425 pwake++; 1426 } 1427 1428 write_unlock_irqrestore(&ep->lock, flags); 1429 1430 /* We have to call this outside the lock */ 1431 if (pwake) 1432 ep_poll_safewake(&psw, &ep->poll_wait); 1433 }
ep_reinject_items把txlist里的一部分fd又放回rdllist,那么,是把哪一部分fd放回去呢?看上面1410行的那个判断——是哪些“没有标上EPOLLET”(标红代码)且“事件被关注”(标蓝代码)的fd被重新放回了rdllist。那么下次epoll_wait当然会又把rdllist里的fd拿来拷给用户了。
举个例子。假设一个socket,只是connect,还没有收发数据,那么它的poll事件掩码总是有POLLOUT的(参见上面的驱动示例),每次调用epoll_wait总是返回POLLOUT事件(比较烦),因为它的fd就总是被放回rdllist;假如此时有人往这个socket里写了一大堆数据,造成socket塞住(不可写了),那么1411行里标蓝色的判断就不成立了(没有POLLOUT了),fd不会放回rdllist,epoll_wait将不会再返回用户POLLOUT事件。现在我们给这个socket加上EPOLLET,然后connect,没有收发数据,此时,1410行标红的判断又不成立了,所以epoll_wait只会返回一次POLLOUT通知给用户(因为此fd不会再回到rdllist了),接下来的epoll_wait都不会有任何事件通知了。
函数详解:
- int epoll_create(int size);
-
int epoll_ctl(int epfd, int op, int fd, struct epoll_event
*event); -
int epoll_wait(int epfd, struct epoll_event
*events,int maxevents, int timeout);
/* * It opens an eventpoll file descriptor by suggesting a storage of "size" * file descriptors. The size parameter is just an hint about how to size * data structures. It won't prevent the user to store more than "size" * file descriptors inside the epoll interface. It is the kernel part of * the userspace epoll_create(2). */ asmlinkage long sys_epoll_create(int size) { int error, fd; struct inode *inode; struct file *file; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d)n", current, size)); /* Sanity check on the size parameter */ error = -EINVAL; if (size <= 0) goto eexit_1; /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure, and inode and a free file descriptor. */ error = ep_getfd(&fd, &inode, &file); //(1) if (error) goto eexit_1; /* Setup the file internal data structure ( "struct eventpoll" ) */ error = ep_file_init(file); //(2) if (error) goto eexit_2; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %dn", current, size, fd)); return fd; eexit_2: sys_close(fd); eexit_1: DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %dn", current, size, error)); return error; }
(1)这里用到了一个ep_getfd函数,从注释我们知道,这个函数建立eventpoll相关的file,当然,一个file要包括文件描述符、inode、还有文件对象,这也是我们传的三个参数。废话不说,看源码:
/* * Creates the file descriptor to be used by the epoll interface. */ static int ep_getfd(int *efd, struct inode **einode, struct file **efile) { struct qstr this; char name[32]; struct dentry *dentry; struct inode *inode; struct file *file; int error, fd; /* Get an ready to use file */ error = -ENFILE; file = get_empty_filp(); if (!file) goto eexit_1; /* Allocates an inode from the eventpoll file system */ inode = ep_eventpoll_inode(); error = PTR_ERR(inode); if (IS_ERR(inode)) goto eexit_2; /* Allocates a free descriptor to plug the file onto */ error = get_unused_fd(); if (error < 0) goto eexit_3; fd = error; /* * Link the inode to a directory entry by creating a unique name * using the inode number. */ error = -ENOMEM; sprintf(name, "[%lu]", inode->i_ino); this.name = name; this.len = strlen(name); this.hash = inode->i_ino; dentry = d_alloc(eventpoll_mnt->mnt_sb->s_root, &this); if (!dentry) goto eexit_4; dentry->d_op = &eventpollfs_dentry_operations; d_add(dentry, inode); file->f_vfsmnt = mntget(eventpoll_mnt); file->f_dentry = dentry; file->f_mapping = inode->i_mapping; file->f_pos = 0; file->f_flags = O_RDONLY; file->f_op = &eventpoll_fops; file->f_mode = FMODE_READ; file->f_version = 0; file->private_data = NULL; /* Install the new setup file into the allocated fd. */ fd_install(fd, file); *efd = fd; *einode = inode; *efile = file; return 0; eexit_4: put_unused_fd(fd); eexit_3: iput(inode); eexit_2: put_filp(file); eexit_1: return error; }
这个函数的注释都比较全,这里简单提一下,况且因为涉及到的函数太多,要深究起来涉及的知识太多,也不可能逐一去列代码。不过这个函数个人觉得比较经典,这函数就是创建一个文件的流程。
首先,我们得拿到一个file结构体,通过内核分配给我们;然后我们要拿到inode,调用这个ep_eventpoll_inode()就可以了;接着是get_unused_fd()拿到文件描述符;接着d_alloc()函数为我们拿到一个dentry;d_add(dentry, inode)函数把dentry建立hash里面并且绑定inode;后面是继续填充文件对象file;fd_install(fd, file)向进程注册文件,并通过这样的方式把文件描述符和文件对象关联起来。
(2)在跟踪ep_file_init函数之前,我们先来看一下eventpoll结构体:
/* * This structure is stored inside the "private_data" member of the file * structure and rapresent the main data sructure for the eventpoll * interface. */ struct eventpoll { /* Protect the this structure access */ rwlock_t lock; /* * This semaphore is used to ensure that files are not removed * while epoll is using them. This is read-held during the event * collection loop and it is write-held during the file cleanup * path, the epoll file exit code and the ctl operations. */ struct rw_semaphore sem; /* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq; /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait; /* List of ready file descriptors */ struct list_head rdllist; /* RB-Tree root used to store monitored fd structs */ struct rb_root rbr; };
注释也是相当清楚。这个eventpoll可以看得出来,是epoll的核心,它将会存储你想要监听的文件描述符,这也是为什么epoll高效之所在。
好,我们回到sys_epoll_create函数,开始跟踪ep_file_init函数:
-
static int ep_file_init(struct file *file) { struct eventpoll *ep; if (!(ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL))) return -ENOMEM; memset(ep, 0, sizeof(*ep)); rwlock_init(&ep->lock); init_rwsem(&ep->sem); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; file->private_data = ep; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_file_init() ep=%pn", current, ep)); return 0; }
其实也就是eventpoll结构体的初始化。
补充技术知识:
作者:董昊 (要转载的同学帮忙把名字和博客链接http://donghao.org/uii/带上,多谢了!)
epoll
通过上面的分析,poll运行效率的两个瓶颈已经找出,现在的问题是怎么改进。首先,每次poll都要把
1000个fd 拷入内核,太不科学了,内核干嘛不自己保存已经拷入的fd呢?答对了,epoll就是自己保存拷
入的fd,它的API就已经说明了这一点——不是 epoll_wait的时候才传入fd,而是通过epoll_ctl把所有fd
传入内核再一起"wait",这就省掉了不必要的重复拷贝。其次,在 epoll_wait时,也不是把current轮流
的加入fd对应的设备等待队列,而是在设备等待队列醒来时调用一个回调函数(当然,这就需要“唤醒回
调”机制),把产生事件的fd归入一个链表,然后返回这个链表上的fd。
epoll
epoll是个module,所以先看看module的入口eventpoll_init
[fs/eventpoll.c-->evetpoll_init()]
1582 static int __init eventpoll_init(void) 1583 { 1584 int error; 1585 1586 init_MUTEX(&epsem); 1587 1588 /* Initialize the structure used to perform safe poll wait head wake ups */ 1589 ep_poll_safewake_init(&psw); 1590 1591 /* Allocates slab cache used to allocate "struct epitem" items */ 1592 epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 1593 0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, 1594 NULL, NULL); 1595 1596 /* Allocates slab cache used to allocate "struct eppoll_entry" */ 1597 pwq_cache = kmem_cache_create("eventpoll_pwq", 1598 sizeof(struct eppoll_entry), 0, 1599 EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL); 1600 1601 /* 1602 * Register the virtual file system that will be the source of inodes 1603 * for the eventpoll files 1604 */ 1605 error = register_filesystem(&eventpoll_fs_type); 1606 if (error) 1607 goto epanic; 1608 1609 /* Mount the above commented virtual file system */ 1610 eventpoll_mnt = kern_mount(&eventpoll_fs_type); 1611 error = PTR_ERR(eventpoll_mnt); 1612 if (IS_ERR(eventpoll_mnt)) 1613 goto epanic; 1614 1615 DNPRINTK(3, (KERN_INFO "[%p] eventpoll: successfully initialized.\n", 1616 current)); 1617 return 0; 16181619 epanic: 1620 panic("eventpoll_init() failed\n"); 1621 }
很有趣,这个module在初始化时注册了一个新的文件系统,叫"eventpollfs"(在eventpoll_fs_type结
构里),然后挂载此文件系统。另外创建两个内核cache(在内核编程中,如果需要频繁分配小块内存,
应该创建kmem_cahe来做“内存池”),分别用于存放struct epitem和eppoll_entry。如果以后要开发新
的文件系统,可以参考这段代码。
现在想想epoll_create为什么会返回一个新的fd?因为它就是在这个叫做"eventpollfs"的文件系统里创建
了一个新文件!如下:
[fs/eventpoll.c-->sys_epoll_create()]
476 asmlinkage long sys_epoll_create(int size) 477 { 478 int error, fd; 479 struct inode *inode; 480 struct file *file; 481 482 DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d)\n", 483 current, size)); 484 485 /* Sanity check on the size parameter */ 486 error = -EINVAL; 487 if (size <= 0) 488 goto eexit_1; 489 490 /* 491 * Creates all the items needed to setup an eventpoll file. That is, 492 * a file structure, and inode and a free file descriptor. 493 */ 494 error = ep_getfd(&fd, &inode, &file); 495 if (error) 496 goto eexit_1; 497 498 /* Setup the file internal data structure ( "struct eventpoll" ) */ 499 error = ep_file_init(file); 500 if (error) 501 goto eexit_2;
函数很简单,其中ep_getfd看上去是“get”,其实在第一次调用epoll_create时,它是要创建新inode、
新的file、新的fd。而ep_file_init则要创建一个struct eventpoll结构,并把它放入file-
>private_data,注意,这个private_data后面还要用到的。
看到这里,也许有人要问了,为什么epoll的开发者不做一个内核的超级大map把用户要创建的epoll句柄
存起来,在epoll_create时返回一个指针?那似乎很直观呀。但是,仔细看看,linux的系统调用有多少是
返回指针的?你会发现几乎没有!(特此强调,malloc不是系统调用,malloc调用的brk才是)因为linux
做为unix的最杰出的继承人,它遵循了unix的一个巨大优点——一切皆文件,输入输出是文件、socket也
是文件,一切皆文件意味着使用这个操作系统的程序可以非常简单,因为一切都是文件操作而已!(unix
还不是完全做到,plan 9才算)。而且使用文件系统有个好处:epoll_create返回的是一个fd,而不是该
死的指针,指针如果指错了,你简直没办法判断,而fd则可以通过current->files->fd_array[]找到其真
伪。
epoll_create好了,该epoll_ctl了,我们略去判断性的代码:
[fs/eventpoll.c-->sys_epoll_ctl()]
524 asmlinkage long 525 sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event) 526 {527 int error; 528 struct file *file, *tfile; 529 struct eventpoll *ep; 530 struct epitem *epi; 531 struct epoll_event epds; .... 575 epi = ep_find(ep, tfile, fd); 576 577 error = -EINVAL; 578 switch (op) { 579 case EPOLL_CTL_ADD: 580 if (!epi) { 581 epds.events |= POLLERR | POLLHUP; 582 583 error = ep_insert(ep, &epds, tfile, fd); 584 } else 585 error = -EEXIST; 586 break; 587 case EPOLL_CTL_DEL: 588 if (epi) 589 error = ep_remove(ep, epi); 590 else 591 error = -ENOENT; 592 break; 593 case EPOLL_CTL_MOD: 594 if (epi) { 595 epds.events |= POLLERR | POLLHUP; 596 error = ep_modify(ep, epi, &epds); 597 } else 598 error = -ENOENT; 599 break; 600 }
原来就是在一个大的结构(现在先不管是什么大结构)里先ep_find,如果找到了struct epitem而用户操
作是ADD,那么返回-EEXIST;如果是DEL,则ep_remove。如果找不到struct epitem而用户操作是
ADD,就ep_insert创建并插入一个。很直白。那这个“大结构”是什么呢?看ep_find的调用方式,ep参数
应该是指向这个“大结构”的指针,再看ep = file->private_data,我们才明白,原来这个“大结构”就是那
个在epoll_create时创建的struct eventpoll,具体再看看ep_find的实现,发现原来是struct eventpoll
的rbr成员(struct rb_root),原来这是一个红黑树的根!而红黑树上挂的都是struct epitem。
现在清楚了,一个新创建的epoll文件带有一个struct eventpoll结构,这个结构上再挂一个红黑树,而这
个红黑树就是每次epoll_ctl时fd存放的地方!
现在数据结构都已经清楚了,我们来看最核心的:
[fs/eventpoll.c-->sys_epoll_wait()]
627 asmlinkage long sys_epoll_wait(int epfd, struct epoll_event __user *events, 628 int maxevents, int timeout) 629 { 630 int error; 631 struct file *file; 632 struct eventpoll *ep; 633 634 DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d)\n", 635 current, epfd, events, maxevents, timeout)); 636 637 /* The maximum number of event must be greater than zero */638 if (maxevents <= 0) 639 return -EINVAL; 640 641 /* Verify that the area passed by the user is writeable */ 642 if ((error = verify_area(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))) 643 goto eexit_1; 644 645 /* Get the "struct file *" for the eventpoll file */ 646 error = -EBADF; 647 file = fget(epfd); 648 if (!file) 649 goto eexit_1; 650 651 /* 652 * We have to check that the file structure underneath the fd 653 * the user passed to us _is_ an eventpoll file. 654 */ 655 error = -EINVAL; 656 if (!IS_FILE_EPOLL(file)) 657 goto eexit_2; 658 659 /* 660 * At this point it is safe to assume that the "private_data" contains 661 * our own data structure. 662 */ 663 ep = file->private_data; 664 665 /* Time to fish for events ... */ 666 error = ep_poll(ep, events, maxevents, timeout); 667 668 eexit_2: 669 fput(file); 670 eexit_1: 671 DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d) = %d\n", 672 current, epfd, events, maxevents, timeout, error)); 673 674 return error; 675 }
故伎重演,从file->private_data中拿到struct eventpoll,再调用ep_poll
[fs/eventpoll.c-->sys_epoll_wait()->ep_poll()]
1468 static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, 1469 int maxevents, long timeout) 1470 { 1471 int res, eavail; 1472 unsigned long flags; 1473 long jtimeout; 1474 wait_queue_t wait; 1475 1476 /* 1477 * Calculate the timeout by checking for the "infinite" value ( -1 ) 1478 * and the overflow condition. The passed timeout is in milliseconds,1479 * that why (t * HZ) / 1000. 1480 */ 1481 jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ? 1482 MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000; 1483 1484 retry: 1485 write_lock_irqsave(&ep->lock, flags); 1486 1487 res = 0; 1488 if (list_empty(&ep->rdllist)) { 1489 /* 1490 * We don't have any available event to return to the caller. 1491 * We need to sleep here, and we will be wake up by 1492 * ep_poll_callback() when events will become available. 1493 */ 1494 init_waitqueue_entry(&wait, current); 1495 add_wait_queue(&ep->wq, &wait); 1496 1497 for (;;) { 1498 /* 1499 * We don't want to sleep if the ep_poll_callback() sends us 1500 * a wakeup in between. That's why we set the task state 1501 * to TASK_INTERRUPTIBLE before doing the checks. 1502 */ 1503 set_current_state(TASK_INTERRUPTIBLE); 1504 if (!list_empty(&ep->rdllist) || !jtimeout) 1505 break; 1506 if (signal_pending(current)) { 1507 res = -EINTR; 1508 break; 1509 } 1510 1511 write_unlock_irqrestore(&ep->lock, flags); 1512 jtimeout = schedule_timeout(jtimeout); 1513 write_lock_irqsave(&ep->lock, flags); 1514 } 1515 remove_wait_queue(&ep->wq, &wait); 1516 1517 set_current_state(TASK_RUNNING); 1518 } ....
又是一个大循环,不过这个大循环比poll的那个好,因为仔细一看——它居然除了睡觉和判断ep->rdllist
是否为空以外,啥也没做!
什么也没做当然效率高了,但到底是谁来让ep->rdllist不为空呢?
答案是ep_insert时设下的回调函数:
[fs/eventpoll.c-->sys_epoll_ctl()-->ep_insert()]
923 static int ep_insert(struct eventpoll *ep, struct epoll_event *event, 924 struct file *tfile, int fd) 925 { 926 int error, revents, pwake = 0; 927 unsigned long flags; 928 struct epitem *epi; 929 struct ep_pqueue epq;930 931 error = -ENOMEM; 932 if (!(epi = EPI_MEM_ALLOC())) 933 goto eexit_1; 934 935 /* Item initialization follow here ... */ 936 EP_RB_INITNODE(&epi->rbn); 937 INIT_LIST_HEAD(&epi->rdllink); 938 INIT_LIST_HEAD(&epi->fllink); 939 INIT_LIST_HEAD(&epi->txlink); 940 INIT_LIST_HEAD(&epi->pwqlist); 941 epi->ep = ep; 942 EP_SET_FFD(&epi->ffd, tfile, fd); 943 epi->event = *event; 944 atomic_set(&epi->usecnt, 1); 945 epi->nwait = 0; 946 947 /* Initialize the poll table using the queue callback */ 948 epq.epi = epi; 949 init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); 950 951 /* 952 * Attach the item to the poll hooks and get current event bits. 953 * We can safely use the file* here because its usage count has 954 * been increased by the caller of this function. 955 */ 956 revents = tfile->f_op->poll(tfile, &epq.pt);
我们注意949行,其实就是
&(epq.pt)->qproc = ep_ptable_queue_proc;
紧接着 tfile->f_op->poll(tfile, &epq.pt)其实就是调用被监控文件(epoll里叫“target file”)的poll方
法,而这个poll其实就是调用poll_wait(还记得poll_wait吗?每个支持poll的设备驱动程序都要调用
的),最后就是调用ep_ptable_queue_proc。这是比较难解的一个调用关系,因为不是语言级的直接调
用。
ep_insert还把struct epitem放到struct file里的f_ep_links连表里,以方便查找,struct epitem里的
fllink就是担负这个使命的。
[fs/eventpoll.c-->ep_ptable_queue_proc()]
883 static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, 884 poll_table *pt) 885 { 886 struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt); 887 struct eppoll_entry *pwq; 888 889 if (epi->nwait >= 0 && (pwq = PWQ_MEM_ALLOC())) { 890 init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); 891 pwq->whead = whead; 892 pwq->base = epi; 893 add_wait_queue(whead, &pwq->wait); 894 list_add_tail(&pwq->llink, &epi->pwqlist); 895 epi->nwait++; 896 } else { 897 /* We have to signal that an error occurred */ 898 epi->nwait = -1; 899 }900 }
上面的代码就是ep_insert中要做的最重要的事:创建struct eppoll_entry,设置其唤醒回调函数为
ep_poll_callback,然后加入设备等待队列(注意这里的whead就是上一章所说的每个设备驱动都要带的
等待队列)。只有这样,当设备就绪,唤醒等待队列上的等待着时,ep_poll_callback就会被调用。每次
调用poll系统调用,操作系统都要把current(当前进程)挂到fd对应的所有设备的等待队列上,可以想
象,fd多到上千的时候,这样“挂”法很费事;而每次调用epoll_wait则没有这么罗嗦,epoll只在epoll_ctl
时把current挂一遍(这第一遍是免不了的)并给每个fd一个命令“好了就调回调函数”,如果设备有事件
了,通过回调函数,会把fd放入rdllist,而每次调用epoll_wait就只是收集rdllist里的fd就可以了
——epoll巧妙的利用回调函数,实现了更高效的事件驱动模型。
现在我们猜也能猜出来ep_poll_callback会干什么了——肯定是把红黑树上的收到event的epitem(代表
每个fd)插入ep->rdllist中,这样,当epoll_wait返回时,rdllist里就都是就绪的fd了!
[fs/eventpoll.c-->ep_poll_callback()]
1206 static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) 1207 { 1208 int pwake = 0; 1209 unsigned long flags; 1210 struct epitem *epi = EP_ITEM_FROM_WAIT(wait); 1211 struct eventpoll *ep = epi->ep; 1212 1213 DNPRINTK(3, (KERN_INFO "[%p] eventpoll: poll_callback(%p) epi=%p ep=%p\n", 1214 current, epi->file, epi, ep)); 1215 1216 write_lock_irqsave(&ep->lock, flags); 1217 1218 /* 1219 * If the event mask does not contain any poll(2) event, we consider the 1220 * descriptor to be disabled. This condition is likely the effect of the 1221 * EPOLLONESHOT bit that disables the descriptor when an event is received, 1222 * until the next EPOLL_CTL_MOD will be issued. 1223 */ 1224 if (!(epi->event.events & ~EP_PRIVATE_BITS)) 1225 goto is_disabled; 1226 1227 /* If this file is already in the ready list we exit soon */ 1228 if (EP_IS_LINKED(&epi->rdllink)) 1229 goto is_linked; 1230 1231 list_add_tail(&epi->rdllink, &ep->rdllist); 1232 1233 is_linked: 1234 /* 1235 * Wake up ( if active ) both the eventpoll wait list and the ->poll() 1236 * wait list. 1237 */ 1238 if (waitqueue_active(&ep->wq)) 1239 wake_up(&ep->wq); 1240 if (waitqueue_active(&ep->poll_wait)) 1241 pwake++; 1242 1243 is_disabled: 1244 write_unlock_irqrestore(&ep->lock, flags); 12451246 /* We have to call this outside the lock */ 1247 if (pwake) 1248 ep_poll_safewake(&psw, &ep->poll_wait); 1249 1250 return 1; 1251 }
真正重要的只有1231行的只一句,就是把struct epitem放到struct eventpoll的rdllist中去。现在我们
可以画出epoll的核心数据结构图了:
http://donghao.org/docs/linux_kernel_poll_epoll_2.pdf
本文转自:作者:董昊 博客链接http://donghao.org/uii/
############################################################
epoll的实现原理
1 功能介绍
epoll与select/poll不同的一点是,它是由一组系统调用组成。
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
epoll相关系统调用是在Linux 2.5.44开始引入的。该系统调用针对传统的select/poll系统调用的不足,设计上作了很大的改动。select/poll的缺点在于:
1.每次调用时要重复地从用户态读入参数。
2.每次调用时要重复地扫描文件描述符。
3.每次在调用开始时,要把当前进程放入各个文件描述符的等待队列。在调用结束后,又把进程从各个等待队列中删除。
在实际应用中,select/poll监视的文件描述符可能会非常多,如果每次只是返回一小部分,那么,这种情况下select/poll显得不够高效。epoll的设计思路,是把select/poll单个的操作拆分为1个epoll_create+多个epoll_ctrl+一个wait。此外,
内核针对epoll操作添加了一个文件系统”eventpollfs”,每一个或者多个要监视的文件描述符都有一个对应的eventpollfs文件系统的inode节点,主要信息保存在eventpoll结构体中。而被监视的文件的重要信息则保存在epitem结构体中。所以他们
是一对多的关系。
由于在执行epoll_create和epoll_ctrl时,已经把用户态的信息保存到内核态了,所以之后即使反复地调用epoll_wait,也不会重复地拷贝参数,扫描文件描述符,反复地把当前进程放入/放出等待队列。这样就避免了以上的三个缺点。
接下去看看它们的实现:
2 关键结构体:
/* Wrapper struct used by poll queueing */
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
这个结构体类似于select/poll中的struct poll_wqueues。由于epoll需要在内核态保存大量信息,所以光光一个回调函数指针已经不能满足要求,所以在这里引入了一个新的结构体struct epitem。
/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the hash.
*/