现在的位置: 首页 > 综合 > 正文

Linux下select函数实现的聊天服务器

2013年04月27日 ⁄ 综合 ⁄ 共 15813字 ⁄ 字号 评论关闭

消息缓冲区类MessageBuffer,接收线程将受到的消息放入缓冲区,发送线程从缓冲区中取出消息

MessageBuffer.h

 

  1. //MessageBuffer.h  
  2. #ifndef _MESSAGE_BUF_INCLUDE_  
  3. #define _MESSAGE_BUF_INCLUDE_  
  4. #include <pthread.h>  
  5. #define MESSAGE_COUNT 16  
  6. #define MESSAGE_LENGTH 2048  
  7. class MessageBuffer{  
  8. private:  
  9.   pthread_mutex_t mutex;//访问缓冲的互斥量  
  10.   pthread_cond_t condition;//访问缓冲区的条件变量  
  11.   //消息缓冲区,循环队列  
  12.   char buf[MESSAGE_COUNT][MESSAGE_LENGTH];  
  13.   int rear; //循环队列的队尾  
  14.   int front; //循环队列的队首  
  15. public:  
  16.    bool toStop;  
  17.    //构造函数  
  18.    MessageBuffer();  
  19.    //析构函数  
  20.    virtual ~MessageBuffer();   
  21.    //将消息放入消息缓冲区,当缓冲区满时阻塞,toStop=true时返回-1  
  22.    int PutMessage(const char *message);  
  23.    //从消息缓冲区中获得消息,当缓冲区空时阻塞,toStop=true时返回-1  
  24.    int GetMessage(char *mbuf, int buflen);  
  25. };  
  26. #endif  

 

MessageBuffer.cpp

 

  1. //MessageBuffer.cpp  
  2. #include <stdio.h>  
  3. #include <string.h>  
  4. #include <time.h>  
  5. #include <pthread.h>  
  6. #include "MessageBuffer.h"  
  7. MessageBuffer::MessageBuffer() {  
  8.   toStop = false;  
  9.   pthread_mutex_init(&mutex,NULL);//初始化互斥量  
  10.   pthread_cond_init(&condition,NULL);//初始化条件变量  
  11.   rear = 0; //队尾指针指向0  
  12.   front = 0; //队首指针指向0  
  13.   printf("A MessageBuffer intance created./n");  
  14. }  
  15. MessageBuffer::~MessageBuffer(){  
  16.   pthread_mutex_destroy(&mutex);  
  17.   pthread_cond_destroy(&condition);  
  18.   printf("A MessageBuffer instance destroyed./n");  
  19. }  
  20. //将消息放入消息缓冲区  
  21. int MessageBuffer::PutMessage(const char *message){  
  22.   struct timespec t;  
  23.   //等待互斥量  
  24.   pthread_mutex_lock(&mutex);  
  25.   while(!toStop && (rear+1)%MESSAGE_COUNT==front){  
  26.      t.tv_sec = time(NULL)+1;  
  27.      t.tv_nsec = 0;  
  28.      pthread_cond_timedwait(&condition,&mutex,&t);  
  29.   }  
  30.   if(toStop){  
  31.     pthread_cond_broadcast(&condition);  
  32.     pthread_mutex_unlock(&mutex);  
  33.     return -1;  
  34.   }  
  35.   int messageLen = strlen(message);  
  36.   int copyLen = messageLen>=MESSAGE_LENGTH?MESSAGE_LENGTH-1:messageLen;  
  37.   memcpy(buf[rear],message,copyLen);  
  38.   buf[rear][copyLen]='/0';  
  39.   rear = (rear+1)%MESSAGE_COUNT;  
  40.   pthread_cond_signal(&condition);  
  41.   pthread_mutex_unlock(&mutex);  
  42.   return 0;  
  43. }  
  44. //从消息缓冲区中获得消息  
  45. int MessageBuffer::GetMessage(char *mbuf, int buflen){  
  46.   struct timespec t;  
  47.   pthread_mutex_lock(&mutex);  
  48.   while(!toStop && rear==front){  
  49.     t.tv_sec = time(NULL)+1;  
  50.     t.tv_nsec = 0;  
  51.     pthread_cond_timedwait(&condition,&mutex,&t);  
  52.   }  
  53.   if(toStop){  
  54.     pthread_cond_broadcast(&condition);  
  55.     pthread_mutex_unlock(&mutex);  
  56.     return -1;  
  57.   }  
  58.   int messageLen = strlen(buf[front]);  
  59.   int copyLen = messageLen>=buflen ? buflen-1 : messageLen;  
  60.   memcpy(mbuf,buf[front],copyLen);  
  61.   mbuf[copyLen]='/0';  
  62.   front = (front+1)%MESSAGE_COUNT;  
  63.   pthread_cond_signal(&condition);  
  64.   pthread_mutex_unlock(&mutex);  
  65.   return 0;  
  66. }  

 

客户类Clients,用于维护套接字socket和套接字地址struct sockaddr_in之间的对应关系,并维护用户的姓名。

Clients.h

 

  1. //Clients.h  
  2. #ifndef _CLIENTS_INCLUDE_  
  3. #define _CLIENTS_INCLUDE_  
  4. #include <sys/types.h>  
  5. #include <netinet/in.h>  
  6. #include <pthread.h>  
  7. #define NAME_LEN 50  
  8. #define MAX_CLIENT 30  
  9. typedef struct client_info{  
  10.    int sock;  
  11.    struct sockaddr_in clientAddr;  
  12.    char name[NAME_LEN];  
  13. }CLIENT_INFO;  
  14. class Clients{  
  15. private:  
  16.   pthread_mutex_t mutex;  
  17.   CLIENT_INFO client[MAX_CLIENT];  
  18.   int clientCount;  
  19.   int IPtoString(unsigned long ip, char *buf, int buflen);   
  20.   int Search(int sock);  
  21. public:  
  22.   Clients();//构造函数  
  23.   virtual ~Clients();//析构函数  
  24.     
  25.   int GetClientCount();  
  26.     
  27.   bool PutClient(int sock,const struct sockaddr_in &clientAddr);  
  28.   void RemoveClient(int sock);  
  29.   bool GetAddrBySocket(int sock,struct sockaddr_in *addr);  
  30.   bool PutName(int sock,const char *name, int namelen);  
  31.   bool GetName(int sock, char *name, int namelen);  
  32.   int GetAllSocket(int* sockArray, int arrayLen );  
  33. };  
  34. #endif  

 

Clients.cpp

 

  1. //Clients.cpp  
  2. #include <stdio.h>  
  3. #include <string.h>  
  4. #include <arpa/inet.h>  
  5. #include "Clients.h"  
  6. Clients::Clients() {  
  7.   pthread_mutex_init(&mutex, NULL);  
  8.   clientCount = 0;  
  9.   printf("Clients created./n");  
  10. }  
  11. Clients::~Clients() {  
  12.   pthread_mutex_destroy(&mutex);  
  13.   printf("Clients destroyed./n");  
  14. }  
  15.     
  16. int Clients::Search(int sock){  
  17.   int index = -1;  
  18.   for(int i=0; i<clientCount; i++) {  
  19.      if(client[i].sock==sock){  
  20.         index = i;  
  21.         break;  
  22.      }  
  23.   }  
  24.   return index;  
  25. }  
  26. int Clients::IPtoString(unsigned long ip,char *buf,int buflen){  
  27.     unsigned char *p = (unsigned char*)&ip;  
  28.     if(buflen<16){  
  29.        return -1;  
  30.     }  
  31.     sprintf(buf,"%u.%u.%u.%u",*p,*(p+1),*(p+2),*(p+3));  
  32.     return strlen(buf);  
  33. }  
  34. int Clients::GetClientCount(){  
  35.    return clientCount;  
  36. }  
  37.     
  38. bool Clients::PutClient(int sock,const struct sockaddr_in &clientAddr) {  
  39.   if(clientCount==MAX_CLIENT){  
  40.     return false;  
  41.   }  
  42.   pthread_mutex_lock(&mutex);  
  43.   client[clientCount].sock = sock;  
  44.   client[clientCount].clientAddr = clientAddr;  
  45.   int buflen = sizeof(client[clientCount].name);  
  46.   int pos = IPtoString(clientAddr.sin_addr.s_addr,client[clientCount].name,buflen);    
  47.   sprintf(&client[clientCount].name[pos],":%d",ntohs(clientAddr.sin_port));  
  48.     
  49.   clientCount++;  
  50.   pthread_mutex_unlock(&mutex);  
  51.   return true;  
  52. }  
  53. void Clients::RemoveClient(int sock){  
  54.   pthread_mutex_lock(&mutex);  
  55.   int index = Search(sock);  
  56.   if(index!=-1){  
  57.     for(int i=index; i<clientCount-1; i++){  
  58.     client[i] = client[i+1];  
  59.     }  
  60.     clientCount--;  
  61.   }    
  62.   pthread_mutex_unlock(&mutex);  
  63. }  
  64.     
  65. bool Clients::GetAddrBySocket(int sock,struct sockaddr_in *addr){  
  66.   pthread_mutex_lock(&mutex);  
  67.   int index = Search(sock);  
  68.   if(index!=-1){  
  69.     memcpy(addr,&client[index].clientAddr,sizeof(struct sockaddr_in));  
  70.   }  
  71.   pthread_mutex_unlock(&mutex);  
  72.   return index!=-1;  
  73. }  
  74. bool Clients::PutName(int sock,const char *name,int namelen) {  
  75.   pthread_mutex_lock(&mutex);  
  76.   int index = Search(sock);  
  77.   if(index!=-1){  
  78.     int copyLen = namelen>=NAME_LEN ? NAME_LEN-1:namelen;  
  79.     memcpy(client[index].name,name,copyLen);  
  80.     client[index].name[copyLen]='/0';  
  81.   }  
  82.   pthread_mutex_unlock(&mutex);  
  83.   return index!=-1;  
  84. }  
  85. bool Clients::GetName(int sock, char *name, int namelen) {  
  86.   pthread_mutex_lock(&mutex);  
  87.   int index = Search(sock);  
  88.   if(index!=-1){  
  89.     int msgLen = strlen(client[index].name);  
  90.     int copyLen = (msgLen<namelen)? msgLen:(namelen-1);  
  91.     memcpy(name,client[index].name,copyLen);  
  92.     name[copyLen]='/0';  
  93.   }  
  94.   pthread_mutex_unlock(&mutex);  
  95.   return index!=-1;  
  96. }  
  97. int Clients::GetAllSocket(int* sockArray, int arrayLen ) {  
  98.   pthread_mutex_lock(&mutex);  
  99.   int copyCount = arrayLen>clientCount ? clientCount : arrayLen;  
  100.   for(int i=0; i<copyCount; i++){  
  101.     sockArray[i] = client[i].sock;  
  102.   }  
  103.   pthread_mutex_unlock(&mutex);  
  104.   return copyCount;  
  105. }  

 


聊天室服务器主程序Server.cpp

 

  1. /*server.c*/  
  2. #include <stdio.h>  
  3. #include <string.h>  
  4. #include <stdlib.h>  
  5. #include <sys/types.h>  
  6. #include <netinet/in.h>  
  7. #include <sys/socket.h>  
  8. #include <sys/select.h>  
  9. #include <pthread.h>  
  10. #include <unistd.h>  
  11. #include <netdb.h>  
  12. #include <arpa/inet.h>  
  13. #include "MessageBuffer.h"  
  14. #include "Clients.h"  
  15. using namespace std;  
  16. #define SERVER_PORT 8000  
  17. #define BUFFER_SIZE 4096  
  18. #ifndef MAX_CLIENT  
  19. #define MAX_CLIENT 30  
  20. #endif  
  21. #ifndef NAME_LEN  
  22. #define NAME_LEN 50  
  23. #endif  
  24. MessageBuffer messageBuffer;  
  25. Clients clients;  
  26. void* ListenThread(void*);  
  27. void* RecvThread(void*);  
  28. void* SendThread(void*);   
  29. void ProcessMessage(int sock,char buf[],int bufsize,int bytes);  
  30. bool toStop=false;  
  31. int main(int argc,char* argv[]) {  
  32.    
  33.   if(argc!=2){  
  34.     printf("Usage: %s PortNumber/n",argv[0]);  
  35.     return -1;  
  36.   }  
  37.   unsigned short port;  
  38.   if((port = atoi(argv[1]))==0){  
  39.      printf("incorrect port number./n");  
  40.      return -1;  
  41.   }  
  42.   int s;  
  43.   struct sockaddr_in serverAddr;  
  44.    
  45.   s = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
  46.   if(s==-1){  
  47.     fprintf(stderr,"create socket failed./n");  
  48.     return -1;  
  49.   }  
  50.   bzero(&serverAddr,sizeof(struct sockaddr_in));  
  51.   serverAddr.sin_family = AF_INET;  
  52.   serverAddr.sin_port = htons(port);  
  53.   serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);  
  54.   if(bind(s,(struct sockaddr*)&serverAddr,sizeof(serverAddr))==-1){  
  55.     fprintf(stderr,"bind socket to port %d failed./n",port);  
  56.     return -1;  
  57.   }  
  58.   if(listen(s,SOMAXCONN)==-1){  
  59.     fprintf(stderr,"listen failed./n");  
  60.     return -1;  
  61.   }  
  62.   printf("Server  is listening on ");  
  63.   char hostname[255];  
  64.   if(gethostname(hostname,sizeof(hostname))){  
  65.     printf("gethostname() failed./n");  
  66.     return -1;  
  67.   }  
  68.   struct hostent* pHost = gethostbyname(hostname);  
  69.   if(pHost){  
  70.       for(int i=0; pHost->h_addr_list[i]; i++){  
  71.          printf("%s ",inet_ntoa(*(in_addr*)pHost->h_addr_list[i]));  
  72.       }  
  73.   }  
  74.   printf("/nport: %d/n",port);  
  75.   pthread_t tListenId;  
  76.   if(pthread_create(&tListenId,NULL,ListenThread,&s)){  
  77.     printf("failed to create listen thread./n");  
  78.     return -1;  
  79.   }  
  80.   pthread_t tRecvId;  
  81.   if(pthread_create(&tRecvId,NULL,RecvThread,NULL)){  
  82.     printf("failed to create recv thread./n");  
  83.     return -1;  
  84.   }  
  85.   pthread_t tSendId;  
  86.   if(pthread_create(&tSendId,NULL,SendThread,NULL)){  
  87.     printf("failed to create send thread./n");  
  88.     return -1;  
  89.   }  
  90.     
  91.   while(getchar()!='q');  
  92.     
  93.   toStop = true;  
  94.   messageBuffer.toStop = true;  
  95.    
  96.   pthread_join(tListenId,NULL);  
  97.   pthread_join(tRecvId,NULL);  
  98.   pthread_join(tSendId,NULL);  
  99.   close(s);  
  100.   int sock[MAX_CLIENT];  
  101.   int count = clients.GetAllSocket(sock,MAX_CLIENT);  
  102.   for(int i=0;i<count;i++){  
  103.     close(sock[i]);  
  104.   }  
  105.     
  106.   printf("server stopped./n");  
  107.     
  108.   return 0;  
  109. }  
  110. void* ListenThread(void*ps){  
  111.   int s=*(int*)ps;  
  112.   fd_set listenSet;  
  113.   int sock;  
  114.   struct sockaddr_in clientAddr;  
  115.   struct timeval timeout;  
  116.   while(!toStop){  
  117.      FD_ZERO(&listenSet);  
  118.      FD_SET(s,&listenSet);  
  119.      timeout.tv_sec = 5;  
  120.      timeout.tv_usec = 0;  
  121.      int ret = select(s+1,&listenSet,NULL,NULL,&timeout);  
  122.      if(toStop){  
  123.           printf("ListenThread: exit./n");  
  124.           return NULL;  
  125.      }  
  126.      if(ret==-1){  
  127.         printf("ListenThread: select() failed!/n");  
  128.      }else if(ret==0){  
  129.         printf("ListenThread: select() time out./n");  
  130.      }else{  
  131.         if(FD_ISSET(s,&listenSet)){  
  132.            socklen_t addrlen = sizeof(struct sockaddr_in);  
  133.            memset(&clientAddr,0,sizeof(struct sockaddr_in));  
  134.            if((sock=accept(s,(struct sockaddr*)&clientAddr,&addrlen))==-1){  
  135.                fprintf(stderr,"accept failed./n");  
  136.            }  
  137.            if(!clients.PutClient(sock,clientAddr)){  
  138.                printf("max client limited. MAX_CLIENT=%d/n",MAX_CLIENT);  
  139.                close(sock);  
  140.            }  
  141.            printf("accept a connection from %s:%u/n",  
  142.                    inet_ntoa(*(struct in_addr*)&(clientAddr.sin_addr.s_addr)),  
  143.                    ntohs(clientAddr.sin_port));  
  144.            printf("new socket is: %u/n",sock);  
  145.          }  
  146.      }  
  147.    }  
  148.    return NULL;  
  149. }  
  150. void* RecvThread(void*){  
  151.   fd_set readSet;  
  152.   int sock[MAX_CLIENT];  
  153.   char buf[BUFFER_SIZE];  
  154.   struct timeval timeout;  
  155.   while(!toStop){  
  156.     int count = clients.GetAllSocket(sock,MAX_CLIENT);  
  157.     if(count==0){  
  158.       sleep(2);  
  159.       if(toStop){  
  160.         printf("RecvThread: exit./n");  
  161.         return NULL;  
  162.       }  
  163.       continue;    
  164.     }  
  165.     FD_ZERO(&readSet);  
  166.     int maxfd=0;  
  167.     for(int i=0;i<count;i++){  
  168.        printf("--%d",sock[i]);  
  169.        FD_SET(sock[i],&readSet);  
  170.        if(sock[i]>maxfd){  
  171.          maxfd = sock[i];  
  172.        }  
  173.     }  
  174.     printf("/n");  
  175.     timeout.tv_sec = 2;  
  176.     timeout.tv_usec = 0;  
  177.     int ret = select(maxfd+1,&readSet,NULL,NULL,&timeout);  
  178.     if(toStop){  
  179.         printf("RecvThread: exit./n");  
  180.         return NULL;  
  181.     }  
  182.     if(ret==-1){  
  183.       printf("RecvThread: select() failed!/n");  
  184.     }else if(ret==0){  
  185.       printf("RecvThread: select() time out./n");  
  186.     }else{  
  187.       for(int i=0; i<count; i++){  
  188.          if(FD_ISSET(sock[i],&readSet)){  
  189.             int bytes=recv(sock[i],buf,sizeof(buf)-1,0);  
  190.             if(bytes==-1){  
  191.               printf("RecvThread: recv failed./n");  
  192.               clients.RemoveClient(sock[i]);  
  193.               close(sock[i]);  
  194.             }else if(bytes==0){  
  195.               printf("RecvThread: socket closed by the other side./n");  
  196.               clients.RemoveClient(sock[i]);  
  197.               close(sock[i]);  
  198.             }else{  
  199.               ProcessMessage(sock[i],buf,sizeof(buf),bytes);  
  200.             }  
  201.          }  
  202.       }  
  203.     }  
  204.       
  205.   }  
  206.    
  207.  return NULL;  
  208. }  
  209. void* SendThread(void*){  
  210.    fd_set writeSet;  
  211.    int sock[MAX_CLIENT];  
  212.    char buf[BUFFER_SIZE];  
  213.    struct timeval timeout;  
  214.    while(!toStop){  
  215.        int ret = messageBuffer.GetMessage(buf,sizeof(buf));  
  216.        printf("get a message from buffer./n");  
  217.        if(ret==-1){  
  218.           printf("SendThread: exit./n");  
  219.           return NULL;  
  220.        }  
  221.        int count = clients.GetAllSocket(sock,MAX_CLIENT);  
  222.        FD_ZERO(&writeSet);  
  223.        int maxfd = 0;  
  224.        for(int i=0;i<count;i++){  
  225.           FD_SET(sock[i],&writeSet);  
  226.           if(sock[i]>maxfd){  
  227.               maxfd = sock[i];  
  228.           }  
  229.        }  
  230.        timeout.tv_sec = 2;  
  231.        timeout.tv_usec = 0;  
  232.        ret = select(maxfd+1,NULL,&writeSet,NULL,&timeout);  
  233.        if(toStop){  
  234.           printf("SendThread: exit./n");  
  235.           return NULL;  
  236.        }  
  237.        if(ret==-1){  
  238.          printf("SendThread: select() failed!/n");  
  239.        }else if(ret==0){  
  240.          printf("SendThread: select() time out./n");  
  241.        }else{  
  242.          for(int i=0;i<count;i++){  
  243.             if(FD_ISSET(sock[i],&writeSet)){  
  244.                 int messageLen = strlen(buf);  
  245.                 int bytes = send(sock[i],buf,messageLen,0);  
  246.                 if(bytes==-1){  
  247.                    printf("SendThread: send() failed./n");  
  248.                 }else if(bytes!=messageLen){  
  249.                    printf("SendThread: send message trunked.");  
  250.                 }else{  
  251.                    //do nothing  
  252.                 }  
  253.             }  
  254.          }  
  255.        }  
  256.    }  
  257.    return NULL;  
  258. }  
  259. void ProcessMessage(int sock,char buf[],int bufsize,int bytes){  
  260.   struct sockaddr_in clientAddr;  
  261.   if(!clients.GetAddrBySocket(sock,&clientAddr)){  
  262.      printf("ProcessMessage: can not find socket address./n");  
  263.      return;  
  264.   }  
  265.   char ipString[16];  
  266.   unsigned char *ip = (unsigned char*)&clientAddr.sin_addr.s_addr;  
  267.   sprintf(ipString,"%u.%u.%u.%u",*ip,*(ip+1),*(ip+2),*(ip+3));  
  268.   unsigned short port = ntohs(clientAddr.sin_port);  
  269.   buf[bytes]='/0';  
  270.   printf("Message from %s:%d: %s/n",ipString,port,buf);  
  271.   const char* CMD_BYE="bye";  
  272.   if(strcmp(buf,CMD_BYE)==0){  
  273.     send(sock,CMD_BYE,strlen(CMD_BYE),0);  
  274.     clients.RemoveClient(sock);  
  275.     close(sock);  
  276.     printf("%s:%u disconnected./n", ipString, port);  
  277.     return;  
  278.   }else{  
  279.         char bufWithName[BUFFER_SIZE+NAME_LEN];  
  280.     char cmdname[5];  
  281.         char name[NAME_LEN];  
  282.     memcpy(cmdname, buf, 4);  
  283.     cmdname[4] = '/0';  
  284.         const char* CMD_NAME="name";  
  285.     if(strcmp(cmdname,CMD_NAME)==0){  
  286.        char newname[NAME_LEN];  
  287.            int nameLen = strlen(buf+5);  
  288.            int copyLen;  
  289.            if(nameLen>=NAME_LEN){  
  290.               copyLen = NAME_LEN-1;  
  291.            }else{  
  292.               copyLen = nameLen;  
  293.            }  
  294.            memcpy(newname,buf+5,copyLen);  
  295.            newname[copyLen]='/0';  
  296.        clients.GetName(sock,name,sizeof(name));  
  297.        sprintf(bufWithName,"%s change name to %s",name,newname);  
  298.        clients.PutName(sock,newname,strlen(newname));  
  299.        messageBuffer.PutMessage(bufWithName);  
  300.         }else{             
  301.            clients.GetName(sock,name,sizeof(name));  
  302.            sprintf(bufWithName,"%s: %s",name,buf);  
  303.        messageBuffer.PutMessage(bufWithName);  
  304.         }  
  305.   }  
  306. }  

 

编译脚本文件compile

g++ -c MessageBuffer.cpp
g++ -c Clients.cpp
g++ -c Server.cpp
g++ -lpthread -o server MessageBuffer.o Clients.o Server.o

chmod +x compile

./compile 就可以编译并链接

运行服务器

./server 8000

注意Linux下的防火墙iptables服务是否已经启动,如果启动了,需要在/etc/sysconfig/iptables中加入例外端口8000,并重启启动防火墙

/etc/init.d/iptables restart

 

from::http://blog.csdn.net/microtong/archive/2009/12/12/4989902.aspx

抱歉!评论已关闭.