现在的位置: 首页 > 综合 > 正文

epoll源码分析—sys_epoll_wait()函数

2013年01月12日 ⁄ 综合 ⁄ 共 9514字 ⁄ 字号 评论关闭

一、sys_epoll_wait()函数

源码及分析如下所示:

/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_wait(2).
 */
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	int error;
	struct file *file;
	struct eventpoll *ep;

	/* The maximum number of event must be greater than zero */
	/*
	 * 检查maxevents参数。
	 */
	if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
		return -EINVAL;

	/* Verify that the area passed by the user is writeable */
	/*
	 * 检查用户空间传入的events指向的内存是否可写。参见__range_not_ok()。
	 */
	if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
		error = -EFAULT;
		goto error_return;
	}

	/* Get the "struct file *" for the eventpoll file */
	/*
	 * 获取epfd对应的eventpoll文件的file实例,file结构是在epoll_create中创建
	 */
	error = -EBADF;
	file = fget(epfd);
	if (!file)
		goto error_return;

	/*
	 * We have to check that the file structure underneath the fd
	 * the user passed to us _is_ an eventpoll file.
	 */
	/*
	 * 通过检查epfd对应的文件操作是不是eventpoll_fops
	 * 来判断epfd是否是一个eventpoll文件。如果不是
	 * 则返回EINVAL错误。
	 */
	error = -EINVAL;
	if (!is_file_epoll(file))
		goto error_fput;

	/*
	 * At this point it is safe to assume that the "private_data" contains
	 * our own data structure.
	 */
	ep = file->private_data;

	/* Time to fish for events ... */
	error = ep_poll(ep, events, maxevents, timeout);

error_fput:
	fput(file);
error_return:

	return error;
}

sys_epoll_wait()是epoll_wait()对应的系统调用,主要用来获取文件状态已经就绪的事件,该函数检查参数、获取eventpoll文件后调用ep_poll()来完成主要的工作。在分析ep_poll()函数之前,先介绍一下使用epoll_wait()时可能犯的错误(接下来介绍的就是我犯过的错误):

  1、返回EBADF错误

    除非你故意指定一个不存在的文件描述符,否则几乎百分百肯定,你的程序有BUG了!从源码中可以看到调用fget()函数返回NULL时,会返回此错误。fget()源码如下:

struct file *fget(unsigned int fd)
{
	struct file *file;
	struct files_struct *files = current->files;

	rcu_read_lock();
	file = fcheck_files(files, fd);
	if (file) {
		if (!atomic_long_inc_not_zero(&file->f_count)) {
			/* File object ref couldn't be taken */
			rcu_read_unlock();
			return NULL;
		}
	}
	rcu_read_unlock();

	return file;
}

主要看这句(struct files_struct *files = current->files;),这条语句是获取描述当前进程已经打开的文件的files_struct结构,然后从这个结构中查找传入的fd对应的file实例,如果没有找到,说明当前进程中打开的文件不包括这个fd,所以几乎百分百肯定是程序设计的问题。我的程序出错,就是因为在父进程中创建了文件描述符,但是将子进程变为守护进程了,也就没有继承父进程中打开的文件。
  2、死循环(一般不会犯,但是我是第一次用,犯了)

 epoll_wait()中有一个设置超时时间的参数,所以我在循环中没有使用睡眠队列的操作,想依赖epoll的睡眠操作,所以在返回值小于等于0时,直接进行下一次循环,没有充分考虑epoll_wait()的返回值小于0时的不同情况,所以代码写成了下面的样子:

for(;;) {
	......
	events = epoll_wait(fcluster_epfd, fcluster_wait_events, 
            fcluster_wait_size, 3000);
        if (unlikely(events <= 0)) {
            continue;
        }
	.......
}

当epoll_wait()返回EBADF或EFAULT时,就会陷入死循环,因此此时还没有进入睡眠的操作。

二、ep_poll()函数

下面来看获取事件的主要函数ep_poll(),源码及分析如下:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
	int res, eavail;
	unsigned long flags;
	long jtimeout;
	wait_queue_t wait;

	/*
	 * Calculate the timeout by checking for the "infinite" value (-1)
	 * and the overflow condition. The passed timeout is in milliseconds,
	 * that why (t * HZ) / 1000.
	 */
	/*
	 * timeout是以毫秒为单位,这里是要转换为jiffies时间。
	 * 这里加上999(即1000-1),是为了向上取整。
	 */
	jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
		MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;

retry:
	spin_lock_irqsave(&ep->lock, flags);

	res = 0;
	if (list_empty(&ep->rdllist)) {
		/*
		 * We don't have any available event to return to the caller.
		 * We need to sleep here, and we will be wake up by
		 * ep_poll_callback() when events will become available.
		 */
		init_waitqueue_entry(&wait, current);
		wait.flags |= WQ_FLAG_EXCLUSIVE;
		/*
		 * 将当前进程加入到eventpoll的等待队列中,
		 * 等待文件状态就绪或直到超时,或被
		 * 信号中断。
		 */
		__add_wait_queue(&ep->wq, &wait);

		for (;;) {
			/*
			 * We don't want to sleep if the ep_poll_callback() sends us
			 * a wakeup in between. That's why we set the task state
			 * to TASK_INTERRUPTIBLE before doing the checks.
			 */
			set_current_state(TASK_INTERRUPTIBLE);
			/*
			 * 如果就绪队列不为空,也就是说已经有文件的状态
			 * 就绪或者超时,则退出循环。
			 */
			if (!list_empty(&ep->rdllist) || !jtimeout)
				break;
			/*
			 * 如果当前进程接收到信号,则退出
			 * 循环,返回EINTR错误
			 */
			if (signal_pending(current)) {
				res = -EINTR;
				break;
			}

			spin_unlock_irqrestore(&ep->lock, flags);
			/*
			 * 主动让出处理器,等待ep_poll_callback()将当前进程
			 * 唤醒或者超时,返回值是剩余的时间。从这里开始
			 * 当前进程会进入睡眠状态,直到某些文件的状态
			 * 就绪或者超时。当文件状态就绪时,eventpoll的回调
			 * 函数ep_poll_callback()会唤醒在ep->wq指向的等待队列中的进程。
			 */
			jtimeout = schedule_timeout(jtimeout);
			spin_lock_irqsave(&ep->lock, flags);
		}
		__remove_wait_queue(&ep->wq, &wait);

		set_current_state(TASK_RUNNING);
	}
	/* Is it worth to try to dig for events ? */
	/*
	 * ep->ovflist链表存储的向用户传递事件时暂存就绪的文件。
	 * 所以不管是就绪队列ep->rdllist不为空,或者ep->ovflist不等于
	 * EP_UNACTIVE_PTR,都有可能现在已经有文件的状态就绪。
	 * ep->ovflist不等于EP_UNACTIVE_PTR有两种情况,一种是NULL,此时
	 * 可能正在向用户传递事件,不一定就有文件状态就绪,
	 * 一种情况时不为NULL,此时可以肯定有文件状态就绪,
	 * 参见ep_send_events()。
	 */
	eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;

	spin_unlock_irqrestore(&ep->lock, flags);

	/*
	 * Try to transfer events to user space. In case we get 0 events and
	 * there's still timeout left over, we go trying again in search of
	 * more luck.
	 */
	/*
	 * 如果没有被信号中断,并且有事件就绪,
	 * 但是没有获取到事件(有可能被其他进程获取到了),
	 * 并且没有超时,则跳转到retry标签处,重新等待
	 * 文件状态就绪。
	 */
	if (!res && eavail &&
	    !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
		goto retry;

	/*
	 * 返回获取到的事件的个数或者错误码
	 */
	return res;
}

ep_poll()的主要过程是:首先将超时时间(以毫秒为单位)转换为jiffies时间,然后检查是否有事件发生,如果没有事件发生,则将当前进程加入到eventpoll中的等待队列中,直到事件发生或者超时。如果有事件发生,则调用ep_send_events()将发生的事件传入用户空间的内存。ep_send_events()函数将用户传入的内存简单封装到ep_send_events_data结构中,然后调用ep_scan_ready_list()将就绪队列中的事件传入用户空间的内存。

三、ep_scan_ready_list()函数

源码及分析如下:

/**
 * ep_scan_ready_list - Scans the ready list in a way that makes possible for
 *                      the scan code, to call f_op->poll(). Also allows for
 *                      O(NumReady) performance.
 *
 * @ep: Pointer to the epoll private data structure.
 * @sproc: Pointer to the scan callback.
 * @priv: Private opaque data passed to the @sproc callback.
 *
 * Returns: The same integer error code returned by the @sproc callback.
 */
static int ep_scan_ready_list(struct eventpoll *ep,
			      int (*sproc)(struct eventpoll *,
					   struct list_head *, void *),
			      void *priv)
{
	int error, pwake = 0;
	unsigned long flags;
	struct epitem *epi, *nepi;
	LIST_HEAD(txlist);

	/*
	 * We need to lock this because we could be hit by
	 * eventpoll_release_file() and epoll_ctl().
	 */
	/*
	 * 获取互斥锁,该互斥锁在移除eventpoll文件(eventpoll_release_file() )、
	 * 操作文件描述符(epoll_ctl())和向用户传递事件(epoll_wait())之间进行互斥
	 */
	mutex_lock(&ep->mtx);

	/*
	 * Steal the ready list, and re-init the original one to the
	 * empty list. Also, set ep->ovflist to NULL so that events
	 * happening while looping w/out locks, are not lost. We cannot
	 * have the poll callback to queue directly on ep->rdllist,
	 * because we want the "sproc" callback to be able to do it
	 * in a lockless way.
	 */
	spin_lock_irqsave(&ep->lock, flags);
	/*
	 * 将就绪队列中就绪的文件链表暂存在临时
	 * 表头txlist中,并且初始化就绪队列。
	 */
	list_splice_init(&ep->rdllist, &txlist);
	/*
	 * 将ovflist置为NULL,表示此时正在向用户空间传递
	 * 事件。如果此时有文件状态就绪,不会放在
	 * 就绪队列中,而是放在ovflist链表中。
	 */
	ep->ovflist = NULL;
	spin_unlock_irqrestore(&ep->lock, flags);

	/*
	 * Now call the callback function.
	 */
	/*
	 * 调用ep_send_events_proc()将就绪队列中的事件
	 * 存入用户传入的内存中。
	 */
	error = (*sproc)(ep, &txlist, priv);

	spin_lock_irqsave(&ep->lock, flags);
	/*
	 * During the time we spent inside the "sproc" callback, some
	 * other events might have been queued by the poll callback.
	 * We re-insert them inside the main ready-list here.
	 */
	/*
	 * 在调用sproc指向的函数将就绪队列中的事件
	 * 传递到用户传入的内存的过程中,可能有文件
	 * 状态就绪,这些事件会暂存在ovflist链表中,
	 * 所以这里要将ovflist中的事件移到就绪队列中。
	 */
	for (nepi = ep->ovflist; (epi = nepi) != NULL;
	     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
		/*
		 * We need to check if the item is already in the list.
		 * During the "sproc" callback execution time, items are
		 * queued into ->ovflist but the "txlist" might already
		 * contain them, and the list_splice() below takes care of them.
		 */
		if (!ep_is_linked(&epi->rdllink))
			list_add_tail(&epi->rdllink, &ep->rdllist);
	}
	/*
	 * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
	 * releasing the lock, events will be queued in the normal way inside
	 * ep->rdllist.
	 */
	/*
	 * 重新初始化ovflist,表示传递事件已经完成,
	 * 之后再有文件状态就绪,这些事件会直接
	 * 放在就绪队列中。
	 */
	ep->ovflist = EP_UNACTIVE_PTR;

	/*
	 * Quickly re-inject items left on "txlist".
	 */
	/*
	 * 如果sproc指向的函数ep_send_events_proc()中处理出错或者某些文件的
	 * 触发方式设置为水平触发(Level Trigger),txlist中可能还有事件,需要
	 * 将这些就绪的事件重新添加回eventpoll文件的就绪队列中。
	 */
	list_splice(&txlist, &ep->rdllist);

	if (!list_empty(&ep->rdllist)) {
		/*
		 * Wake up (if active) both the eventpoll wait list and
		 * the ->poll() wait list (delayed after we release the lock).
		 */
		if (waitqueue_active(&ep->wq))
			wake_up_locked(&ep->wq);
		if (waitqueue_active(&ep->poll_wait))
			pwake++;
	}
	spin_unlock_irqrestore(&ep->lock, flags);

	mutex_unlock(&ep->mtx);

	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(&ep->poll_wait);

	return error;
}

ep_scan_ready_list()函数的参数sproc指向的函数是ep_send_events_proc(),参见ep_send_events()函数。

四、ep_send_events_proc()函数

/*
 * @head:已经就绪的文件列表
 * @priv:用来存储已经就绪的文件
 */
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
			       void *priv)
{
	struct ep_send_events_data *esed = priv;
	int eventcnt;
	unsigned int revents;
	struct epitem *epi;
	struct epoll_event __user *uevent;

	/*
	 * We can loop without lock because we are passed a task private list.
	 * Items cannot vanish during the loop because ep_scan_ready_list() is
	 * holding "mtx" during this call.
	 */
	for (eventcnt = 0, uevent = esed->events;
	     !list_empty(head) && eventcnt < esed->maxevents;) {
		epi = list_first_entry(head, struct epitem, rdllink);

		list_del_init(&epi->rdllink);

		/*
		 * 调用文件的poll函数有两个作用,一是在文件的唤醒
		 * 队列上注册回调函数,二是返回文件当前的事件状
		 * 态,如果第二个参数为NULL,则只是查看文件当前
		 * 状态。
		 */
		revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
			epi->event.events;

		/*
		 * If the event mask intersect the caller-requested one,
		 * deliver the event to userspace. Again, ep_scan_ready_list()
		 * is holding "mtx", so no operations coming from userspace
		 * can change the item.
		 */
		if (revents) {
			/*
			 * 向用户内存传值失败时,将当前epitem实例重新放回
			 * 到链表中,从这里也可以看出,在处理失败后,head指向的
			 * 链表(对应ep_scan_ready_list()中的临时变量txlist)中
			 * 有可能会没有完全处理完,因此在ep_scan_ready_list()中
			 * 需要下面的语句
			 *    list_splice(&txlist, &ep->rdllist);
			 * 来将未处理的事件重新放回到eventpoll文件的就绪队列中。
			 */
			if (__put_user(revents, &uevent->events) ||
			    __put_user(epi->event.data, &uevent->data)) {
				list_add(&epi->rdllink, head);
				/*
				 * 如果此时已经获取了部分事件,则返回已经获取的事件个数,
				 * 否则返回EFAULT错误。
				 */
				return eventcnt ? eventcnt : -EFAULT;
			}
			eventcnt++;
			uevent++;
			if (epi->event.events & EPOLLONESHOT)
				epi->event.events &= EP_PRIVATE_BITS;
			/*
			 * 如果是触发方式不是边缘触发(Edge Trigger),而是水平
			 * 触发(Level Trigger),需要将当前的epitem实例添加回
			 * 链表中,下次读取事件时会再次上报。
			 */
			else if (!(epi->event.events & EPOLLET)) {
				/*
				 * If this file has been added with Level
				 * Trigger mode, we need to insert back inside
				 * the ready list, so that the next call to
				 * epoll_wait() will check again the events
				 * availability. At this point, noone can insert
				 * into ep->rdllist besides us. The epoll_ctl()
				 * callers are locked out by
				 * ep_scan_ready_list() holding "mtx" and the
				 * poll callback will queue them in ep->ovflist.
				 */
				list_add_tail(&epi->rdllink, &ep->rdllist);
			}
		}
	}

	return eventcnt;
}

抱歉!评论已关闭.