本文出处:http://www.ibm.com/developerworks/cn/aix/library/1105_huangrg_kqueue/
概述
kqueue
是
FreeBSD
上的一种的多路复用机制。它是针对传统的
select/poll
处理大量的文件描述符性能较低效而开发出来的。注册一堆描述符到
kqueue
以后,当其中的描述符状态发生变化时,
kqueue
将一次性通知应用程序哪些描述符可读、可写或出错了。
kqueue
支持多种类型的文件描述符,包括
socket
、信号、定时器、
AIO
、
VNODE
、
PIPE
。本文重点讨论
kqueue
如何控制
socket
描述符。其中
kqueue
对
AIO
,
POSIX
的异步
IO
系列的支持,是异步行为完成通知机制之一。另外两种常见的机制是异步信号和线程例程。用
kqueue
的明显好处是完成事件的处理线程可以灵活地指定。
本文重点在于
kqueue
技术本身。一些基础的知识点,比如
socket
API
和常用的
unix
数据结构将不作讲解,有需要的读者请先阅读
UNIX网络编程方面书籍
。
kqueue APIs
kqueue
提供
kqueue()
、
kevent()
两个系统调用和
struct kevent
结构。
kqueue
主要功能
通过
kevent()
提供三个主要的行为功能。在下面小节中将会用到这两个主要功能。
Ø
注册
/
反注册
注意
kevent()
中的
neventlist
这个输入参数,当将其设为
0
,且传入合法的
changelist
和
nchangelist
,就会将
changelist
中的事件注册到
kqueue
中。
当关闭某文件描述符时,与之关联的事件会被自动地从
kqueue
移除。
Ø
允许
/
禁止过滤器事件
通过
flags EV_ENABLE
和
EV_DISABLE
使过滤器事件有效或无效。这个功能在利用
EVFILT_WRITE
发送数据时非常有用。
Ø
等待事件通知
将
nchangelist
设置成
0
,当然要传入其它合法的参数,当
kevent
非错误和超时返回时,在在
eventlist
和
neventlist
中就保存可用事件集合。
kqueue()
int
kqueue(void)
生成一个内核事件队列,返回该队列的文件描述索。其它
API
通过该描述符操作这个
kqueue
。生成的多个
kqueue
的结构类似图
1
所示。
图
1
kqueue
队列结构
kevent()
int kevent(int
kq, const struct kevent *changelist, int nchanges,
struct
kevent *eventlist, int nevents,
const struct
timespec *timeout);
kevent
提供向内核注册
/
反注册事件和返回就绪事件或错误事件
.
kq:
kqueue
的文件描述符。
changelist:
要注册
/
反注册的事件数组;
nchanges:
changelist
的元素个数。
eventlist:
满足条件的通知事件数组;
nevents:
eventlist
的元素个数。
timeout:
等待事件到来时的超时时间,
0
,立刻返回;
NULL
,一直等待;有一个具体值,等待
timespec
时间值。
返回值:可用事件的个数。
struct kevent
struct
kevent {
uintptr_t ident;
/*
事件
ID */
short
filter;
/*
事件过滤器
*/
u_short
flags;
/*
行为标识
*/
u_int
fflags;
/*
过滤器标识值
*/
intptr_t
data;
/*
过滤器数据
*/
void
*udata;
/*
应用透传数据
*/
};
在一个
kqueue
中,
{ident, filter}
确定一个唯一的事件。
Ø
ident
事件的
id
,实际应用中,一般设置为文件描述符。
Ø
filter
可以将
kqueue filter
看作事件。内核检测
ident
上注册的
filter
的状态,状态发生了变化,就通知应用程序。
kqueue
定义了较多的
filter
,本文只介绍
Socket
读写相关的
filter
。
²
EVFILT_READ
TCP
监听
socket
,如果在完成的连接队列
(
已收三次握手最后一个
ACK)
中有数据,此事件将被通知。收到该通知的应用一般调用
accept()
,且可通过
data
获得完成队列的节点个数。
流或数据报
socket
,当协议栈的
socket
层接收缓冲区有数据时,该事件会被通知,并且
data
被设置成可读数据的字节数。
²
EVFILT_WRITE
当
socket
层的写入缓冲区可写入时,该事件将被通知;
data
指示目前缓冲区有多少字节空闲空间。
Ø
flags
²
EV_ADD
指示加入事件到
kqueue
。
²
EV_DELETE
指示将传入的事件从
kqueue
中移除。
²
EV_ENABLE
过滤器事件可用,注册一个事件时,默认是可用的。
²
EV_DISABLE
过滤器事件不可用,当内部描述可读或可写时,将不通知应用程序。第
5
小节有这个
flag
的用法介绍。
²
EV_ERROR
一个输出参数,当
changelist
中对应的描述符处理出错时,将输出这个
flag
。应用程序要判断这个
flag
,否则可能出现
kevent
不断地提示某个描述符出错,却没将这个描述符从
kq
中清除。处理
EV_ERROR
类似下面的代码:
if
(events[i].flags & EV_ERROR)
close(events[i].ident);
Ø
fflags
过滤器相关的一个输入输出类型标识,有时候和
data
结合使用。
Ø
data
过滤器相关的数据值,请看
EVFILT_READ
和
EVFILT_WRITE
描述。
Ø
udata
应用自定义数据,注册的时候传给
kernel
,
kernel
不会改变此数据,当有事件通知时,此数据会跟着返回给应用。
Ø
EV_SET
EV_SET(&kev,
ident, filter, flags, fflags, data, udata);
struct kevent
的初始化的辅助操作。
一个服务器示例
例子实现了一个只有较简单通信功能的但有性能保证的服务器。在下面各个清单中只写出关键性的代码,错误处理的代码未写出,完整的代码请参考附带的源码:
kqueue.cpp
。
Ø
注册事件到
kqueue
清单
1
73 bool Register(int kq, int fd)
74 {
75
struct kevent changes[1];
76
EV_SET(&changes[0], fd,
EVFILT_READ, EV_ADD, 0, 0, NULL);
77
78
int ret = kevent(kq,
changes, 1, NULL, 0, NULL);
81
82
return true;
83 }
Register
将
fd
注册到
kq
中。注册的方法是通过
kevent()
将
eventlist
和
neventlist
置成
NULL
和
0
来达到的。
Ø
创建监听
socket
和
kqueue
,等待内核事件通知
清单
2
27 int main(int argc, char* argv[])
28 {
29
listener_ =
CreateListener();
32
33
int kq = kqueue();
34
if (!Register(kq,
listener_))
39
40
WaitEvent(kq);
41
42
return 0;
43 }
85 void WaitEvent(int kq)
86 {
87
struct kevent
events[MAX_EVENT_COUNT];
88
while (true)
89
{
90
int ret = kevent(kq,
NULL, 0, events, MAX_EVENT_COUNT, NULL);
96
97
HandleEvent(kq, events,
ret);
98
}
99 }
29~40
,创建监听
socket
,将监听
socket
注册到
kq
,然后等待事件。
90
,这一行就是
kevent
事件等待方法,将
changelist
和
nchangelist
分别置成
NULL
和
0
,并且传一个足够大的
eventlist
空间给内核。当有事件过来时,
kevent
返回,这时调用
HandleEvent
处理可用事件。
Ø
struct kevent data
字段在
accept
和
recv
时的用法
清单
3
101 void HandleEvent(int kq, struct kevent* events, int nevents)
102 {
103
for (int i = 0; i <
nevents; i++)
104
{
105
int sock =
events[i].ident;
106
int data =
events[i].data;
107
108
if (sock == listener_)
109
Accept(kq, data);
110
else
111
Receive(sock,
data);
112
}
113 }
114
115 void Accept(int kq, int connSize)
116 {
117
for (int i = 0; i <
connSize; i++)
118
{
119
int client =
accept(listener_, NULL, NULL);
125
126
if (!Register(kq,
client))
131
}
132 }
133
134 void Receive(int sock, int availBytes)
135 {
136
int bytes = recv(sock,
buf_, availBytes, 0);
145
Enqueue(buf_, bytes);
146 }
108~111
,根据
events.ident
的类型来调用
Accept()
或
Receive()
。这里要注意的是
events[i].data
。
117~126
,对于监听
socket
,
data
表示连接完成队列中的元素
(
已经收到三次握手最后一个
ACK)
个数。
119
行演示了这种用法,
accept data
次。
126
行将
accept
成功的
socket
注册到
kq
。
136~145
,对于流
socket
,
data
表示协议栈
socket
层的接收缓冲区可读数据的字节数。
recv
时显示地指定接收
availBytes
字节
(
就是
data)
。这个功能点将对
recv
和
send
的性能提升有积极的作用,第
4
小节将这方面的讨论。
145
行表示将收到的数据入缓冲队列。
EVFILT_WRITE
用法
上面的例子没有涉及写事件的用法,这一小节简单介绍一下通过
WRITE
事件自动地实现发送数据的方法。
kqueue
默认是水平触发模式,当某个描述符的事件满足某种条件时,如果应用程序不处理对应的事件,
kqueue
将会不断地通知应用程序此描述符满足某种状态了。以
EVFILT_WRITE
举例,见图
2
。
图
2 WRITE
通知流程
在某种情形下,应用程序须要禁止
kqueue
不断地通知某个描述符的“可写”状态。将已注册的
{ident,
filter}
的
flags
设置成
EV_DISABLE
就达到这个目的。实现方法类似清单
4
。
清单
4
struct kevent changes[1];
EV_SET(&changes[0], fd,
EVFILT_WRITE, EV_DISABLE, 0, 0, NULL);
kevent(kq, changes, 1, NULL, 0,
NULL);
将上面代码中的
EV_DISABLE
替换成
EV_ENABLE
表示事件是可用的。
接下来,考虑一个实际的服务器应用,请见图
3
。
图
3
某个服务器应用
逻辑处理线程将处理结果写到发送队列,通信线程将其读出并通过
kqueue
EVFILT_WRITE
机制发送。二者具体流程请见图
4
。
图
4
逻辑流程
具体的代码相对较大,将不在这里列出。在
Speed库
demos/fb_tcp_server
有这种用法的代码例子。特别强调一下,两个线程中
writeEnable
变量和
EVFILTE_WRITE
状态的设置是有严格的顺序要求的。现代编译器优化和处理器执行指令时都有可能打乱指令顺序。有一种叫内存屏障(
memory
barrier
)的技术可以保证程序语句的编译和执行顺序,在
Linux内核设计与实现
中介绍了这一技术。
另外,这个例子可以做性能优化,当发送队列为空时,将一定长度的数据直接通过
send
()
API
非阻塞地发送,未发送完的数据再写入到发送队列。这样避免了大部分的数据拷贝。
阻塞与非阻塞
IO
用过
select
和
epoll
的读者,一般将
socket IO
设置成非阻塞模式,以提高读写性能的同时,避免
IO
读写不小心被锁定。
为了达到某种目的,甚至有人会通过
getsocketopt
来偷看
socket
读缓冲区的数据大小或写缓区
可用空间的大小。
kqueue
开发人员考虑到这些现状,在
kevent
返回时,将读写缓冲区的可读字
节数或可写空间大小告诉应用程序。基于这个特性,使用
kqueue
的应用一般不使用非阻塞
IO
。每次读时,根据
kevent
返回的可读字节大小,将接收缓冲区中的数据一次性读完;而发送数据时,也根据
kevent
返回的写缓冲区可写空间的大小,一次只发可写空间大小的数据。
结束语
本文介绍了
FreeBSD
kqueue
这种多路复用
IO
模型的用法,重点介绍了
kqueue
对
Sockets IO
的控制和事件通知过程。有一定网络编程基础的程序员学习本文后,结合给出的例子就能开发出有一定性能保证的
FreeBSD
应用服务器了。