现在的位置: 首页 > 综合 > 正文

Qemu, Spice 消息模型

2017年08月27日 ⁄ 综合 ⁄ 共 5785字 ⁄ 字号 评论关闭

作者“达沃时代”    原文链接:http://www.cnblogs.com/D-Tec/archive/2013/03/21/2973339.html

网络事件处理是libspice设计中最关键的部分,可以说是整个Spice的骨架,用以支撑Spice的运行,是理解Spice运作方式的切入口之一(VDI是另一个阅读代码的切入口)。Spice的server和client通信方式采用了三种框架:

1、 Qemu的main函数中采用非阻塞select方式轮训网络事件

2、 Libspice中有一个专门的线程,采用非阻塞epoll模型监听网络事件

3、 Qemu中采用定时器方式进行网络数据发送

一、select模型处理

   Spice中最基本的网络事件处理均采用select模型,即大部分的网络事件是在Qemu的主函数中进行捕获的。直接看代码:

void main_loop_wait(int nonblocking)

{

    IOHandlerRecord *ioh;

    fd_set rfds, wfds, xfds;

    int ret, nfds;

 

    nfds = -1;

    FD_ZERO(&rfds);

    FD_ZERO(&wfds);

    FD_ZERO(&xfds);

 

    // FD_SET 对队列中的所有节点进行处理

    QLIST_FOREACH(ioh, &io_handlers, next) {

        if (ioh->deleted)

            continue;

        FD_SET(ioh->fd, &rfds);

        FD_SET(ioh->fd, &wfds);

    }

 

       // select

    ret = select(nfds + 1, &rfds, &wfds, &xfds, &tv);

 

    // 调用节点对应的回调函数进行网络事件处理

    if (ret > 0) {

        IOHandlerRecord *pioh;

 

        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {

            if (ioh->fd_read && FD_ISSET(ioh->fd, &rfds)) {

                ioh->fd_read(ioh->opaque);

            }

            if (ioh->fd_write && FD_ISSET(ioh->fd, &wfds)) {

                ioh->fd_write(ioh->opaque);

            }

        }

}

qemu_run_all_timers();

}

以上代码遵循了select模型的基本处理步骤:FD_SET、select、process,所以非常容易理解。该代码的独特之处在于其实现方式支持动态管理网络连接,思想很简单:通过维护一个全局的网络连接列表io_handlers,每次select前都遍历此列表来获取需要查询的网络连接套接字。同时,该列表的每个元素还记录了针对该套接字的读写处理函数,其元素类型声明如下:

typedef void IOReadHandler(void *opaque, const uint8_t *buf, int size);

typedef int IOCanReadHandler(void *opaque);

typedef void IOHandler(void *opaque);

typedef struct IOHandlerRecord {

    int fd;                         // socket 描述符

    IOCanReadHandler *fd_read_poll; 

    IOHandler *fd_read;             // read 事件处理回调函数

    IOHandler *fd_write;            // write 事件处理回调函数

    int deleted;                    // 删除标记

    void *opaque;

    struct pollfd *ufd;

    QLIST_ENTRY(IOHandlerRecord) next; // 链表实现

} IOHandlerRecord;

 

io_handlers是一个IOHandlerRecord类型的元素的List头指针。

    当有新的网络连接建立后,只需要初始化一个IOHandlerRecord对象,将其插入到列表中即可。Qemu实现了一个共用函数来完成新连接对象的初始化和插入队列的动作:

int qemu_set_fd_handler2(int fd, IOCanReadHandler *fd_read_poll,

IOHandler *fd_read, IOHandler *fd_write, void *opaque)

{  

    // 新建一个节点对象,将其插入到List中

IOHandlerRecord *ioh;

    ioh = qemu_mallocz(sizeof(IOHandlerRecord));

    QLIST_INSERT_HEAD(&io_handlers, ioh, next);

        ioh->fd = fd;

        ioh->fd_read_poll = fd_read_poll;

        ioh->fd_read = fd_read;

        ioh->fd_write = fd_write;

        ioh->opaque = opaque;

        ioh->deleted = 0;

 

    return 0;

}

    通过以上封装,就可以将网络事件套接字的管理和网络事件的处理分离开来,管理的部分如上所述是一个统一的流程,不会因为具体业务的改变而改变。以Spice为例,Qemu中只需要负责网络事件的监听,具体的事件处理则交由此事件的注册者负责实现。

    网络事件的注册则又经过一层封装,最终我们看到的就是CoreInterface初始化中被赋值给core->watch_add函数指针的对应函数,封装如下:

    static SpiceWatch *watch_add(int fd, int event_mask, SpiceWatchFunc func, void *opaque)

{

    SpiceWatch *watch;

 

    watch = qemu_mallocz(sizeof(*watch));

    watch->fd     = fd;

    watch->func   = func;

    watch->opaque = opaque;

    QTAILQ_INSERT_TAIL(&watches, watch, next);

 

    {

      IOHandler *on_read = NULL;

      IOHandler *on_write = NULL;

 

      watch->event_mask = event_mask;

      if (watch->event_mask & SPICE_WATCH_EVENT_READ) {

        on_read = watch_read; //内部调用 func(SPICE_WATCH_EVENT_READ);

      }

      if (watch->event_mask & SPICE_WATCH_EVENT_WRITE) {

        on_read = watch_write; //内部调用 func(SPICE_WATCH_EVENT_WRITE);

      }

// 下面的函数实际上就是封装了qemu_set_fd_handler2

      qemu_set_fd_handler(watch->fd, on_read, on_write, watch);

        }

    return watch;

}

    经过以上封装之后,libspice的实现者就可以专心处理自己的事情,不需要再关心网络事件如何通知给自己的问题了。如果需要增加新的业务流程,比如增加远程USB设备支持,只需要将所有处理函数在libspice中实现好,客户端的USB模块发起网络连接后,libspice调用CoreInterface的watch_add回调,将此连接以及对应的处理函数注册到Qemu中即可。

    另外,要将Spice移植到其他平台,若要保持libSpice代码可以被重用,Qemu中网络处理部分是必须移植的。以上封装的实现使得网络处理的移植非常简单。

二、epoll模型处理

    该模型仅在显示处理线程中使用,用以处理进程内的网络消息。多次提到,显示处理在libspice中是通过一个单独的线程来实现的,这就涉及到多线程之间的通信问题。Spice通过socket pair的方式在进程内部创建了一个通信管道,pair的一端暴露给要与当前线程通信的模块,这些模块包括Qemu的虚拟显卡设备、libspice的消息dispatcher等;另一端则留给当前线程用来进行数据收发。此工作线程实现框架如下:

void *red_worker_main(void *arg)

{

    for (;;) {

        struct epoll_event events[MAX_EPOLL_SOURCES];

        int num_events;

        struct epoll_event *event;

        struct epoll_event *end;

        // 等待网络event

        num_events = epoll_wait(worker.epoll, events, MAX_EPOLL_SOURCES, worker.epoll_timeout);

 

        worker.epoll_timeout = INF_EPOLL_WAIT;

        // 处理所有的event

for (event = events, end = event + num_events; event < end; event++) {

            EventListener *evt_listener = (EventListener *)event->data.ptr;

 

            if (evt_listener->refs > 1) {

                evt_listener->action(evt_listener, event->events);

                if (--evt_listener->refs) {

                    continue;

                }

            }

            free(evt_listener); // refs == 0 , release it!

        }

 

        if (worker.running) {

            int ring_is_empty;

            red_process_cursor(&worker, MAX_PIPE_SIZE, &ring_is_empty);

            red_process_commands(&worker, MAX_PIPE_SIZE, &ring_is_empty);

        }

        red_push(&worker);

    }

    red_printf("exit");

    return 0;

}

三、Timer定时

    定时器是Qemu的另一个比较关键的事件触发机制,也是影响代码阅读的祸端之一。回到上面的main_loop_wait函数,最后有一句qemu_run_all_timers();该函数会遍历系统中的所有定时器,以执行到时定时器的触发函数。main_loop_wait函数则被封装在下面的main_loop函数中:

static void main_loop(void)

{

    for (;;) {

        do {

            bool nonblocking = false;

            main_loop_wait(nonblocking);

        } while (vm_can_run());

 

        // ……

}

    即:系统会不停的调用main_loop_wait函数来轮训网络事件和定时器。以上说明了Qemu定时器的触发机制,下面来看定时器的具体实现和使用方式。

    Qemu的qemu-timer.c专门用来实现定时器的代码,里面维护了一个全局的链表数组active_timers,该数组用来保存系统中各种不同类型的timer链表头指针,类似一个哈希表,所有timer链表都是按照每个timer的被激活时间排序过的,因此可以减少查询时间,最大限度的提高timer执行精确度。链表中timer节点数据结构定义如下:

struct QEMUTimer {

    QEMUClock *clock;     // timer 状态及类型

    int64_t expire_time;  // timer 激活时间

    QEMUTimerCB *cb;      // timer 激活时要执行的回调函数指针

    void *opaque;         // 用户数据,用作timer回调函数的入口参数

    struct QEMUTimer *next;

};

通过qemu_new_timer接口增加新的timer,但new操作并不把timer插入到全局数组中,只有当调用qemu_mod_timer时,才真正将timer插入链表中。通过以上方式注册的timer通常只会被执行一次,若要实现周期性定时器,只需要在timer的回调函数实现中将自己再次加入到timer链表中即可。CoreInterface的另外一组函数指针就是关于Timer的。这个timer应该是比较低效的,但平台依赖性要求很低。

 

某些网络连接建立起来以后,数据发送是通过Timer方式定时处理的,最为典型的就是音频数据的产生及往客户端推送。音频设备初始化后,会立即注册一个周期性定时器,将音频数据通过网络连接循环发往客户端。

抱歉!评论已关闭.