现在的位置: 首页 > 综合 > 正文

高性能Socket服务器编程-02

2013年12月01日 ⁄ 综合 ⁄ 共 10740字 ⁄ 字号 评论关闭

上一章,我向大家演示了一个最基本的socket服务器结构,它一次只能响应一个连接请求,而“能同时响应多个连接和请求”无疑是现实生活中对socket服务器的最基本要求。要如何让socket服务器可以同时响应多个连接和请求呢?多线和多进程程肯定是大部分人首先想到的,可能很多人不一定真正清楚多线程和多进程的socket服务器架构具体意味着什么,但是至少大家都或多或少听说过这两种技术。不过本章中,我们暂时还不会涉及到多线程和多进程的服务器架构,我它们归类为设计范畴,而我们暂时还没有脱离泥水匠身份,所以还要继续学习“泥沙之用途“,设计的事情需要等到我们泥水匠毕业,升级建筑设计师的时候再说。

那么本章具体的内容是什么呢?真是没有悬念,在上一章中我已经提前透露了:IO重用。下面就正式进入主题吧。

什么是IO重用

我们先来想像这样一个场景:一个只有一个柜台一个营业员的银行营业厅。这个银行所有业务都需要填表格,而且如果填了表格,熟练的营业员能在一瞬间帮你把事情办完。在繁忙的时候,大家排成长队等待轮到自己,当排到的时候从营业员手中拿到表格,然后填写一番,接着交给营业员处理。很显然,这个营业厅很低效,低效在哪里呢?它有优秀的营业员,但是缺少合理的运作模式。实际上后面的人排队等待的时间不是业务处理时间,而是前面的人的填表事件。万一遇到需要连续办多件事情的客户,他堵在那里填了一份又一份表格,后面的人只能一直等着了。

我们怎么改进这个营业厅呢?我们可以在大厅设置一个自行领取表格和填写表格的桌子,让大家先填好了表格再到柜台办理业务,这样高效的营业员就能非常快速的处理业务,就几乎不可能出现排队了(注意,我说的是"几乎"不是绝对)。

而我们前一章演示的socket服务器程序,就像改进之前的银行营业厅,它的低效在于它没有充分利用IO,而不是它有复杂的业务逻辑。所以人们为了解决这类问题就在设计操作系统的时候加入了IO重用机制,让编程人员可以更有效的利用IO,就像提供了领取和填写表格的桌子一样。

IO重用技术有很多种,有些是夸平台的,有些是平台独有的,这里就列举一些我知道的:

名称 平台
select Linux, *BSD, Mac OS X, Solaris, Windows
poll Linux, *BSD, Mac OS X
epoll Linux
/dev/poll Solaris
kqueue FreeBSD
IOCP Windows

每一种IO重用技术都是通过操作系统提供的一组特定的API函数调用来提供支持的,我们所要学的就是学会怎么用这些API并且了解每种技术背后的原理和来龙去脉。

本章就从最通用应该也是最早出现的IO重用技术select开始。

补充说明一点,IO重用并不局限于用在socket编程上,只要涉及到IO的编程都可以应用。

select的用法

select技术主要由一个函数和几个宏来提供支持,下面是它们的说明:

/*
 * 功能:监视多个文件描述符,直到有一个或多个文件描述符准备好做某种IO操作时返回
 * 返回:当调用成功时,返回已准备好的文件描述符个数;发生错误时返回-1,可以通过errno得到错误类型
 * 参数:
 *      nfds      - 后面三个文件描述符集合中最大的文件描述符加1(想想为什么?)
 *      readfds   - 等待进行读操作的文件描述符,指针传递,在select返回时会改变这个参数的值,只保留已准备好的文件描述符
 *      writefds  - 等待进行写操作的文件描述符,指针传递,在select返回时会改变这个参数的值,只保留已准备好的文件描述符
 *      exceptfds - 监视异常的文件描述符号,很少用到
 *      timeout   - 程序会在select调用的地方阻塞,你可以通过设置超时让程序可以在一定时间间隔后继续执行
 */
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

/*
 * 功能:将指定文件描述符从指定的描述符集合中清除
 * 参数:
 *      fd  - 文件描述符
 *      set - 文件描述符集合
 */
void FD_CLR(int fd, fd_set *set);

/*
 * 功能:检查一个文件描述符是否在集合中
 * 返回:存在时返回非0整数,不存在则返回0
 * 参数:
 *      fd  - 文件描述符
 *      set - 文件描述符集合
 */
int  FD_ISSET(int fd, fd_set *set);

/*
 * 功能:设置一个文件描述符到集合中
 * 参数:
 *      fd  - 文件描述符
 *      set - 文件描述符集合
 */
void FD_SET(int fd, fd_set *set);

/*
 * 功能:将一个文件描述符集合清零
 * 参数:
 *      set - 文件描述符集合
 */
void FD_ZERO(fd_set *set);

在socket编程中,典型的select用法是:在创建监听的服务器socket文件描述符后,使用FD_ZERO初始化一个空的文件描述符集合,然后把监听的socket文件描述符通过FD_SET放入"读"集合中,然后进入服务器循环,当select返回时,用FD_ISSET检查readfds中是否有监听的socket文件描述符时,如果有说明有新的连接请求,这时候就调用accept接受连接,把accept返回的连接文件描述符放入自己维护的一个连接集合中,并把连接的文件描述符通过FD_SET放入"读"集合中,通过循环自己维护的连接集合中的文件描述符,调用FD_ISSET来判断是否某个连接有数据可读取,如果有就读取并处理。

上面这么说可能很笼统也很模糊,下面通过具体的代码演示select的用法和连接的维护让大家对select有一个具象的了解:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>


#define SD_PORT         10086
#define SD_BACK_LOG     10
#define SD_MAX_CLIENT   3


int sd_listener_fd;


void 
sd_init ()
{
    int reuse = 1;
    
    struct sockaddr_in addr;

    if ((sd_listener_fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("Create listener socket failed");
        exit(-1);
    }

    if (setsockopt(sd_listener_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1)
    {
        perror("Setup listener socket failed");
        exit(-1);
    }

    bzero(&(addr.sin_zero), 8);

    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(SD_PORT);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);

    if (bind(sd_listener_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
    {
        perror("Bind listener socket address failed");
        exit(-1);
    }

    if (listen(sd_listener_fd, SD_BACK_LOG) == -1)
    {
        perror("Listen port failed");
        exit(-1);
    }
}


void   
sd_loop ()
{
    char buf[1024];

    int i = 0, j = 0;

    int ret = 0;

    int client_fd;

    int client_addr_len;

    int client_fds[SD_MAX_CLIENT];

    int client_fd_max_i = -1;

    int max_fd = sd_listener_fd;

    struct sockaddr_in client_addr;

    fd_set read_fds, ready_read_fds;

    FD_ZERO(&read_fds);
    FD_SET(sd_listener_fd, &read_fds);

    for (i = 0; i < SD_MAX_CLIENT; i++)
    {
        client_fds[i] = -1;
    }

    printf("Waiting connect on port %d\n", SD_PORT);

    for (;;)
    {
        ready_read_fds = read_fds;

        ret = select(max_fd + 1, &ready_read_fds, NULL, NULL, NULL);

        if (ret == -1)
        {
            perror("Select failed");
            break;
        }

        if (ret == 0)
            continue;

        if (FD_ISSET(sd_listener_fd, &ready_read_fds))
        {
            client_fd = accept(sd_listener_fd, (struct sockaddr *)&client_addr, &client_addr_len);

            FD_SET(client_fd, &read_fds);

            for (i = 0; i < SD_MAX_CLIENT; i++)
            {
                if (client_fds[i] == -1)
                {
                    client_fds[i] = client_fd;
                    break;
                }
            }

            printf("Client connected\n");

            if (i == SD_MAX_CLIENT - 1)
            {
                FD_CLR(sd_listener_fd, &read_fds);

                printf("Connection is full\n");
            }

            if (client_fd > max_fd)
                max_fd = client_fd;

            if (i > client_fd_max_i)
                client_fd_max_i = i;

            if (-- ret == 0)
                continue;
        }

        for (i = 0; i <= client_fd_max_i; i ++)
        {
            client_fd = client_fds[i];

            if (client_fd == -1)
                continue;

            if (FD_ISSET(client_fd, &ready_read_fds))
            {
                if ((ret = read(client_fd, buf, 1024)) == 0)
                {
                    close(client_fd);
    
                    FD_CLR(client_fd, &read_fds);

                    client_fds[i] = -1;

                    printf("Client closed\n");

                    if (!FD_ISSET(sd_listener_fd, &read_fds))
                    {
                        FD_SET(sd_listener_fd, &read_fds);

                        printf("Listener come back\n");
                    }

                    max_fd = sd_listener_fd;

                    for (j = 0; j <= client_fd_max_i; j ++)
                    {
                        if (client_fds[j] > max_fd)
                            max_fd = client_fds[j];
                    }

                    if (i == client_fd_max_i)
                    {
                        while ((client_fd_max_i -= 1) < -1)
                        {
                            if (client_fds[client_fd_max_i] != -1)
                                break;
                        }
                    }
                }
                else
                {
                    write(client_fd, buf, ret);
                }

                if (-- ret == 0)
                    break;
            }
        }
    }
}


void
sd_down ()
{
    close(sd_listener_fd);

    printf("Server shutdown\n");
}


int
main (int argc, char *argv[])
{
    sd_init();

    sd_loop();

    sd_down();

    return 1;
}

上面的代码比前一章的例子复杂了很多,因为现在我们的echo服务器已经具备了同时响应多个客户端的能力。编译方式还是跟前一章一样,这里就不再重复说明。你现在可以同时用多个telnet连接到服务器上进行测试,为了方便测试,我通过SD_MAX_CLIENT指定了最大连接数是3,当达到最大连接数时,服务器就不再接受新的连接了。

比起之前的例子,代码增加都在sd_loop中,看起来很长,但如果分解开来其实逻辑很清晰。前面一大块是变量声明,接着进入无限循环,循环周期的开头阻塞在select调用,直到有文件描述准备好做操作select才返回,然后就是一个if包围的新连接接入处理,接着是for循环包围的连接请求处理。

因为select返回时会改变参数的值,所以在每次select之前,我们都会把read_fds赋值给ready_read_fds,然后把ready_read_fds传递给select函数,这样read_fds本身就不会受到影响了。

示例中没有的东西

上面的示例只有一个模式,而select函数实际上有三种使用模式,具体内容大家可以通过man select查阅文档。

示例中代码只体现了select的用法,并没有直接告诉我们select背后的机制和原理,不过我们通过使用方式来自己推测个大概。当然,我们也可以通过阅读Linux内核代码做到100%了解,但这就离我们主题有些远了,留给大家自己研究吧。我们把监听的文件描诉符集合丢给select,自然select内部会遍历监视这些文件描诉符,然后把具备特定状态的文件描述符保留在监视集合中其余的清除,然后把集合返回给调用者。所以select内部应该是一个遍历过程,而遍历过程需要有遍历结束的判断,所以才会需要我们传入maxfd作为参数。

示例中不能体现出的还有一点就是select的局限性,select能较好的解决io重用的问题,至少大幅度的提高了我们程序中io的使用效率(从无到有),但是它并不是完美的,它也有一些需要改进的地方。select在有些操作系统上有单个进程监听的文件描述符个数限制,至少通过万能的网络,我可以确认在Linux和Windows内核中的确有这样的限制,Linux内核代码通过一个FD_SETSIZE的宏约束了文件描诉符集合的元素上限,这个值默认是1024,也就是说我们示例程序监视的客户端连接最多只能有1024个,网上也有人提供各种奇技淫巧来突破这个限制,不过我不推荐这么做,因为这样做不自然,并且还有其他的io重用技术可以让我们选用。

我们的示例代码业务逻辑很简单,它只是简单返回收到的内容,但假设我们现在做的是一个MMORPG游戏的服务器,它在收到请求是可能需要执行复杂的游戏操作逻辑,那么socket服务器是不是很可能在业务逻辑处理的地方阻塞了呢?就像本章开头的银行里子中,我们假设了营业员充分高效,但现实生活中并不一定是这样。这就是有待我们解决的问题了。

示例中只用到select的readfds参数。原因很简单,为了让代码简单,如果用上writefds意味着就要区分开客户端的读和写处理,这会让代码更加复杂,对于我们演示select的基本使用来说这是不利的。但是对于高性能socket服务器来说区分读和写是必需的,因为你得用尽一切办法避免阻塞,上面的例子在实际应用中,很可能会在write的地方由于客户端没有准备好或者网络不畅导致程序在此位置阻塞,这就像是银行那个例子中有人没填好表格就跑到柜台前占着位置填表格,其实没有在做正事但后面的人都得等他一样,会影响到整体的执行效率。

下面是经过进一步抽象和细化的echo服务器,它已经将读写事件区分开,并把服务器状态和连接抽象成不同的数据结构,由不同的函数负责不同操作:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>


#define SD_PORT        10086
#define SD_BACK_LOG    10
#define SD_MAX_CONN    10
#define sd_MAX_CONN    2000


typedef struct
{
    int      buf_len;
    char*    buf;
}
sd_conn_state;


typedef struct
{
    int                   id;
    int                   fd;
    struct sockaddr_in    addr;

    sd_conn_state*        state;
}
sd_conn;


typedef struct
{
    int         max_fd;
    int         listener_fd;

    fd_set      read_fds;
    fd_set      can_read_fds;

    fd_set      write_fds;
    fd_set      can_write_fds;

    int         conn_max;
    int         conn_free;
    int         conn_count;

    sd_conn*    conn_items;
}
sd_state;


void 
sd_init (sd_state* state)
{
    int i;

    sd_conn* conn;

    int reuse = 1;
    
    struct sockaddr_in addr;

    if ((state->listener_fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("Create listener socket failed");
        exit(1);
    }

    if (setsockopt(state->listener_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1)
    {
        perror("Setup listener socket failed");
        exit(1);
    }

    bzero(&(addr.sin_zero), 8);

    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(SD_PORT);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);

    if (bind(state->listener_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
    {
        perror("Bind listener socket address failed");
        exit(1);
    }

    if (listen(state->listener_fd, SD_BACK_LOG) == -1)
    {
        perror("Listen port failed");
        exit(1);
    }

    state->max_fd = state->listener_fd;

    FD_ZERO(&state->read_fds);
    FD_ZERO(&state->can_read_fds);

    FD_ZERO(&state->write_fds);
    FD_ZERO(&state->can_write_fds);

    FD_SET(state->listener_fd, &state->read_fds);

    state->conn_max   = -1;
    state->conn_free  = SD_MAX_CONN;
    state->conn_count = SD_MAX_CONN;
    state->conn_items = calloc(SD_MAX_CONN, sizeof(sd_conn));

    for (i = 0; i < SD_MAX_CONN; i++)
    {
        conn = &state->conn_items[i];

        conn->id = -1;
        conn->fd = -1;
        conn->state = NULL;
    }
}

void
sd_conn_accept (sd_state* state)
{
    int i;

    int addr_size;

    sd_conn* conn;

    for (i = 0; i < state->conn_count; i ++)
    {
        conn = &state->conn_items[i];

        if (conn->fd >= 0)
            continue;

        conn->id = i;
        conn->state = NULL;

        addr_size = sizeof(conn->addr);

        conn->fd = accept(state->listener_fd, (struct sockaddr *)&conn->addr, &addr_size);

        if (conn->fd > state->max_fd)
            state->max_fd = conn->fd;

        if (conn->id > state->conn_max)
            state->conn_max = conn->id;

        FD_SET(conn->fd, &state->read_fds);

        state->conn_free --;

        printf("Client connected\n");

        break;
    }
}

void
sd_conn_close (sd_state* state, sd_conn* conn)
{
    int i;

    sd_conn* temp_conn;

    if (conn->id == state->conn_max)
    {
        for (i = conn->id - 1; i >= 0; i --)
        {
            if (i < 0)
            {
                state->conn_max = -1;

                break;
            }

            temp_conn = &state->conn_items[i];

            if (temp_conn->fd > 0)
            {
                state->conn_max = temp_conn->id;

                break;
            }
        }
    }

    if (conn->fd == state->max_fd)
    {
        if (state->conn_max >= 0)
        {
            state->max_fd = state->listener_fd;

            for (i = 0; i <= state->conn_max; i ++)
            {
                temp_conn = &state->conn_items[i];

                if (temp_conn->fd > state->max_fd)
                {
                    state->max_fd = temp_conn->fd;
                }
            }
        }
    }

    FD_CLR(conn->fd, &state->read_fds);
    FD_CLR(conn->fd, &state->write_fds);

    close(conn->fd);

    conn->id = -1;
    conn->fd = -1;
    
    if (conn->state != NULL)
    {
        free(conn->state);
    }

    state->conn_free ++;
    
    printf("Client close\n");
}

void
sd_conn_proc (sd_state* state, sd_conn* conn)
{
    int ret;

    char* buf;

    buf = calloc(1024, sizeof(char));

    if ((ret = read(conn->fd, buf, 1024)) == 0)
    {
        sd_conn_close(state, conn);

        free(buf);
    }
    else
    {
        conn->state = calloc(1, sizeof(sd_conn_state));

        conn->state->buf = buf;
        conn->state->buf_len = ret;

        FD_SET(conn->fd, &state->write_fds);
    }
}

void
sd_conn_repo (sd_state* state, sd_conn* conn)
{
    if (conn->state == NULL)
        return;

    write(conn->fd, conn->state->buf, conn->state->buf_len);

    free(conn->state->buf);
    free(conn->state);

    conn->state = NULL;

    FD_CLR(conn->fd, &state->write_fds);
}


void   
sd_loop (sd_state* state)
{
    int i = 0;

    sd_conn* conn;

    for (;;)
    {
        state->can_read_fds = state->read_fds;
        state->can_write_fds = state->write_fds;

        int num_ready = select(state->max_fd + 1, &state->can_read_fds, &state->can_write_fds, NULL, NULL);

        if (num_ready == 0)
            continue;

        if (num_ready == -1)
        {
            perror("Select failed\n");
            exit(1);
        }

        if (FD_ISSET(state->listener_fd, &state->can_read_fds))
        {
            sd_conn_accept(state);

            if (-- num_ready == 0)
                continue;
        }

        for (i = 0; i <= state->conn_max; i ++)
        {
            conn = &state->conn_items[i];

            if (conn->fd == -1)
                continue;

            if (FD_ISSET(conn->fd, &state->can_read_fds))
            {
                sd_conn_proc(state, conn);

                if (-- num_ready == 0)
                    break;
            }

            if (FD_ISSET(conn->fd, &state->can_write_fds))
            {
                sd_conn_repo(state, conn);

                if (-- num_ready == 0)
                    break;
            }
        }
    }
}


void
sd_down (sd_state* state)
{
    close(state->listener_fd);

    printf("Server shutdown\n");
}


int
main (int argc, char *argv[])
{
    sd_state state;

    sd_init(&state);

    sd_loop(&state);

    sd_down(&state);

    return 1;
}

本章总结

本章通过演示select的使用向大家展示了io重用技术是怎样提高io使用效率的,让我们的socket服务器程序的可用向前迈了一大步。

但是迈出这步后,还有无数的挑战的等着我们,例如上面说的select的限制、复杂业务逻辑阻塞,等等。

后续我可能会继续花一到两章来描述poll和epoll,但也可能直接就介绍如何使用夸平台的libev和libevent库。特别是poll实际上它对select来说没有改进的地方,属于同一水平,实际上它们只是起源不一样,但是时代差不多所以水平也就差不多,而后续的epoll、dev/poll、iocp则是百家争鸣时期各个操作系统平台为了进一步提高io重用效率而设计的新机制,它们才本质上对select和poll等老模式进行了改进,所以我可能会跳过poll介绍epoll。

libev的实验代码我其实已经做好了,poll和epoll有点懒得重复了,具体怎么样还不确定就留个悬念吧,呵呵。

抱歉!评论已关闭.