现在的位置: 首页 > 综合 > 正文

UDP分包重组算法

2019年08月10日 ⁄ 综合 ⁄ 共 8998字 ⁄ 字号 评论关闭

UDP分包重组算法(BTW:Windows下用IOCP来发也可能会有同样的问题,所以本文同样适用于TCP -
IOCP下的分包及重组)

Packet.h

  1. #include "InetAddr.h" //对socket地址操作封装的类,比如char*IP转成ULONG  
  2. #include <vector>  
  3. using   namespace  std; 
  4.  
  5. //先定义包头
  6. typedef   struct  _HeadExt 
  7.     //每帧所有分片公用信息  
  8.     char    flag[5]; //可以任意字符或数字或其他方法,用来标志我们发送的数据  
  9.     UINT    m_nSeqNumber; //本帧序数  
  10.     USHORT  m_nTotalFragment; //本帧数据可分成的总片数  
  11.     USHORT  m_nTotalSize;  //本帧数据总长度  
  12.     //每片各自信息  
  13.     USHORT  m_nFragmentIndex;  //每帧分片序数,0 1 2 ... ,/  
  14.     USHORT  m_usPayloadSize; //本片数据长度  
  15.     USHORT  m_usFragOffset; //本片数据相对总数据的偏移量  
  16.     USHORT  m_bLastFragment; //是否最后一帧  
  17. //上 述数据有些重复,可以精简,有时间再修改  
  18.  }HeadExt; 
  19.  
  20.  
  21.  
  22. //从socket接收到的数据  
  23. class   PacketIn 
  24. public  : 
  25.     PacketIn(char * lpszBuffer=NULL,  UINT  usBufferSize=0,  UINT  nDataSize=0); 
  26.     virtual  ~PacketIn(); 
  27.     HeadExt head; 
  28.     BYTE *    m_lpszBuffer; 
  29.     ULONG    ip; 
  30.     UINT     port; 
  31.     CVLInetAddr addr; 
  32.     BOOL  Normalize(); 
  33.     UINT    m_nDataLen; 
  34.     UINT    m_usBufferSize; 
  35.  
  36. }; 
  37.  
  38. //接收到数据后开始重组使用的一个中间缓冲区,多个PacketIn合成一个Packet  
  39. //发送时从一个Packet分割出多片,不过本程序里直接发送,没有封成Packet  
  40. class  Packet  
  41. {
  42. public
  43.     enum  { TIMEOUT_LOCKFRAGMENTS = 1000 };
  44.     Packet( );
  45.     ~Packet();
  46.     void  reset();
  47.     void  Set( const  CVLInetAddr& iaFrom,  UINT  nSeqNumber );
  48.     BOOL  InsertFragment(PacketIn*  const  pFragment);
  49.     bool  m_bUsed; 
  50.     int  recvedpacks; 
  51.     int  recvedbytes; 
  52.     int  seqnum; 
  53.     BYTE * m_pBuffer;
  54.  
  55. //测试用程序,直接访问了类的变量,没有封装成函数。上述变量正常应该写成 SetXXX ,GetXXX     
  56. private
  57.     BOOL  IsFragComplete()  const ;
  58.     ULONG      m_ip;
  59.     USHORT    m_port; 
  60.     UINT         m_nSeqNumber; 
  61.     vector<int > SeqNumberList; 
  62. }; 

Packet.cpp

  1. #include "Packet.h"  
  2.  
  3. PacketIn::PacketIn(char * lpszBuffer /*=NULL*/ UINT  usBufferSize /*=0*/ ,
  4.   UINT  nDataSize /*=0*/
  5.  
  6.     m_lpszBuffer   = (BYTE *)lpszBuffer; 
  7.     m_usBufferSize = usBufferSize; //数据最大长度,其实没什么用,已经设置了这个值最大为64000  
  8.     m_nDataLen = nDataSize; 
  9. PacketIn::~PacketIn() 
  10. BOOL  PacketIn::Normalize()   //判断收到的数据是否有效  
  11.     const   USHORT  usHeaderSize =  sizeof (HeadExt); 
  12.     if  ( !m_lpszBuffer || m_usBufferSize < usHeaderSize ) //没什么用  
  13.     { 
  14.         return  FALSE; 
  15.     } 
  16.  
  17.     HeadExt* pHeader = (HeadExt*)( m_lpszBuffer  ); 
  18.     if  ( pHeader->m_usPayloadSize != m_nDataLen - usHeaderSize ) 
  19.     { 
  20.         return  FALSE; 
  21.     } 
  22.  
  23.     head = *pHeader; 
  24.     if  ( pHeader->m_usPayloadSize > 0 ) 
  25.     { 
  26. memmove( m_lpszBuffer, m_lpszBuffer + usHeaderSize, pHeader->m_usPayloadSize ); 
  27.     } 
  28.     
  29.     return  TRUE; 
  30.  
  31. /////////////  
  32. //这里是重组数据的临时缓冲,只看这里必然迷惑,先看socket收数据那里,再回来查看  
  33. void  Packet::Set(  const  CVLInetAddr& iaFrom,  UINT  nSeqNumber ) 
  34.     m_iaFrom =  iaFrom; 
  35.     m_nSeqNumber = nSeqNumber; 
  36.     if (m_pBuffer) 
  37.         delete  []m_pBuffer; 
  38.  
  39. void  Packet::reset() 
  40.     m_bUsed = false
  41.     recvedpacks = 0; 
  42.     seqnum = 0; 
  43.     recvedbytes = 0; 
  44.     m_pBuffer = NULL; 
  45.     m_pBuffer = new   BYTE [64000]; 
  46.     SeqNumberList.clear(); 
  47. Packet::Packet() 
  48.     //calculate the ttl  
  49.     //m_nTTL = RUDP_REASSEMBLE_TIMEOUT*CRudpPeer::PR_SLOWHZ;  
  50.     reset(); 
  51.  
  52. Packet::~Packet() 
  53.     if (m_pBuffer) 
  54.         delete  []m_pBuffer; 
  55.  
  56. BOOL  Packet::InsertFragment(PacketIn*  const  pFragment) 
  57.     int  nSize = SeqNumberList.size(); 
  58.     for ( int  i = 0; i< nSize ;i ++) 
  59.     { 
  60.         if (nSize ==SeqNumberList[i] ) //收到重复数据包  
  61.         { 
  62.             return  FALSE; 
  63.         } 
  64.     } 
  65.     SeqNumberList.push_back(pFragment->head.m_nFragmentIndex); 
  66.  
  67.     memcpy( m_pBuffer + pFragment->head.m_usFragOffset, 
  68. pFragment->m_lpszBuffer, pFragment->head.m_usPayloadSize); 
  69.     recvedbytes += pFragment->head.m_usPayloadSize; 
  70.     recvedpacks++; 
  71.     m_bUsed = true
  72.  
  73.     CString str; 
  74. str.Format("收到数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d, 
  75. m_usFragOffset %d,m_nTotalSize %d,recvedbytes %d/r/n", 
  76. pFragment->head.m_nSeqNumber,pFragment->head.m_nTotalFragment, 
  77. pFragment->head.m_nFragmentIndex,pFragment->head.m_usFragOffset, 
  78. pFragment->head.m_nTotalSize,pFragment->head.m_usPayloadSize); 
  79.         OutputDebugString(str); 
  80.  
  81.     if (recvedbytes == pFragment->head.m_nTotalSize ) 
  82.         return  TRUE; 
  83.     
  84.     return  FALSE; 

发送时的操作:

  1. #iclude "packet.h"  
  2.  
  3. const   int  RTP_SPLIT_PACKSIZE = 1300;
  4. //udp一般最大包为1500,一般这里都设置为1400,当然1300也没错  
  5. LONGLONG  index = rand();
  6. //帧序数,从一个随机数开始,我机器上生成的是41  
  7. //分包发送,逻辑相当简单,按最常规的方法分割发送  
  8. void  Send( BYTE * pBuff, UINT  nLen, ULONG  ip, USHORT  port) 
  9.     HeadExt head; 
  10.     strcpy(head.flag ,"aaaa" ); 
  11.       
  12.     head.m_nSeqNumber = index++; //帧序数  
  13.  
  14.     head.m_nTotalFragment = nLen/RTP_SPLIT_PACKSIZE; 
  15.     head.m_nTotalFragment  += nLen%RTP_SPLIT_PACKSIZE?1:0;
  16. //整除的话就是0,否则多出一个尾数据  
  17.     head.m_nFragmentIndex = 0;//第一个分段从0开始  
  18.     head.m_nTotalSize = nLen; 
  19.     head.m_bLastFragment = 0; 
  20.     head.m_usFragOffset = 0; 
  21.  
  22.     char  tem[RTP_SPLIT_PACKSIZE+ sizeof (HeadExt)]; 
  23.  
  24.     if (head.m_nTotalFragment == 1) 
  25.     { 
  26.         memcpy(tem,&head,sizeof (HeadExt)); 
  27.         memcpy(tem+sizeof (HeadExt),pBuff,nLen); 
  28.         head.m_usPayloadSize = nLen; 
  29.  
  30. //用UDP发送,常规做法,先对UDP做一个封装的类,这里调用类对象。本代码只说明原理,类似伪代友  
  31.        sendto(tem,nLen+sizeof (HeadExt),ip,port); 
  32.  
  33.         CString str; 
  34.         str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
  35. m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n"
  36. head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex,
  37. head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize); 
  38.         OutputDebugString(str); 
  39.  
  40.         return
  41.     } 
  42.  
  43.     int  i = 0; 
  44.     for ( i = 0; i < head.m_nTotalFragment-1; i++) //开始分割,最后一个单独处理  
  45.     { 
  46.         head.m_bLastFragment = 0; 
  47.         head.m_nFragmentIndex = i; 
  48.         head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE; 
  49.         head.m_usPayloadSize = RTP_SPLIT_PACKSIZE; 
  50.         memcpy(tem,&head,sizeof (HeadExt)); 
  51.         memcpy(tem+sizeof (HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,RTP_SPLIT_PACKSIZE); 
  52.        sendto(tem,nLen+sizeof (HeadExt),ip,port); 
  53.  
  54.         CString str; 
  55.         str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
  56. m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n"
  57. head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex,
  58. head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize); 
  59.         OutputDebugString(str); 
  60.  
  61.     } 
  62.  
  63.     head.m_bLastFragment = 1; 
  64.     head.m_nFragmentIndex = i; 
  65.     head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE; 
  66.     head.m_usPayloadSize = nLen - (i*RTP_SPLIT_PACKSIZE); 
  67.  
  68.     memcpy(tem,&head,sizeof (HeadExt)); 
  69.     memcpy(tem+sizeof (HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,nLen - (i*RTP_SPLIT_PACKSIZE)); 
  70.  
  71.     sendto(tem,nLen+sizeof (HeadExt),ip,port); 
  72.  
  73.     CString str; 
  74. str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
  75. m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d/r/n" ,head.m_nSeqNumber,
  76. head.m_nTotalFragment,head.m_nFragmentIndex,head.m_usFragOffset,
  77. head.m_nTotalSize,head.m_usPayloadSize); 
  78.  
  79. OutputDebugString(str); 

接收数据的回调

//代码很简单,收到数据后,形成PacketIn,然后找一个链表,插进去
//因为收到的数据的顺序是不定的,所以先创建一个链表,每帧首数据到来时,分配一个节点,其他分片到来的时候,都加到//这个节点里,等数据都到齐了,这个节点就是一个完整的包,回调出去。

  1. void  RecvCallback( BYTE * pBuff, UINT  nLen, ULONG  ip, USHORT  port) 
  2.     if (nLen <  sizeof (HeadExt)) 
  3.         return ;
  4.     HeadExt* pHead = (HeadExt*)pBuff; 
  5.     CString str = pHead->flag; 
  6.     if (str !=  "aaaa"
  7.     {
  8.         return
  9.     }
  10.     if (pHead->m_nTotalFragment == 1) //只有一帧,不分片  
  11.     { 
  12.         //回调,上层处理  
  13.         if (m_pfDispatch)
  14. m_pfDispatch(pBuff+sizeof (HeadExt),nLen- sizeof (HeadExt),ip,port,m_lpParam);
  15.         return
  16.     } 
  17.  
  18.     PacketIn data( (char *)pBuff, 64000, nLen ); 
  19.     if  ( !data.Normalize() ) 
  20.         return ;
  21.     if  ( data.head.m_nTotalFragment>1 ) 
  22.     {
  23.         Packet* pTemp = GetPartialPacket( iaRemote, data.head.m_nSeqNumber ); 
  24.         if  ( pTemp ) 
  25.         { 
  26.             if  ( pTemp ->InsertFragment( &data ) ) 
  27.             { 
  28.                 m_pfDispatch(pTemp ->m_pBuffer, 
  29.                     pTemp ->recvedbytes,ip,port,m_lpParam); 
  30.             } 
  31.         }//end of if  
  32.     } 

//上面用到的一些变量可以在一个.h里面定义,比如:
Packet
TempPacket[16];//分配一个数组,每个节点是一个重组后临时缓冲
Packet* GetPartialPacket(const
CVLInetAddr& iaFrom, UINT nSeqNumber);//从数组里取出一个缓冲节点

  1. //根据这几个关键字查找,不过只用  
  2. //到了nSeqNumber,要是3人以上的视频聊天,ip和port是必要的  
  3. Packet* GetPartialPacket(const   ULONG  ip, USHORT  port,  UINT  nSeqNumber) 
  4.     Packet* tmp = NULL; 
  5.     int  i=0; 
  6.     while (tmp==NULL && i<16) 
  7.     { 
  8.         //该包所属的帧已有其它数据包到达  
  9.         if (TempPacket[i].seqnum==nSeqNumber && TempPacket[i].recvedpacks>0) 
  10.         { 
  11.             tmp = &TempPacket[i]; 
  12.             break
  13.         } 
  14.         i++; 
  15.     } 
  16.     if (tmp == NULL)         //新的帧  
  17.     { 
  18.         //查找空闲元素  
  19.         for (i=0;i<16;i++) 
  20.         { 
  21.             if (!TempPacket[i].m_bUsed) 
  22.                 break
  23.         } 
  24.  
  25.         if (i>=16) 
  26.         { 
  27.             //没有空闲的元素,丢掉一个最早  
  28.             tmp = &TempPacket[0]; 
  29.             int  j = 0; 
  30.             for (i=1;i<16;i++) 
  31.             { 
  32.                 if (TempPacket[i].seqnum < tmp->seqnum) 
  33.                 { 
  34.                     tmp = &TempPacket[i]; 
  35.                     j = i; 
  36.                 } 
  37.             } 
  38.             //找到最早的一帧  
  39.             if (tmp->m_pBuffer) 
  40.             { 
  41.                 delete  []tmp->m_pBuffer; 
  42.                 tmp->reset(); 
  43.             } 
  44.         } 
  45.         else  
  46.             tmp = &TempPacket[i]; 
  47.     } 
  48.  
  49.     InsertFragment 
  50.     tmp->m_bUsed = true
  51.     tmp->seqnum = nSeqNumber; 
  52.  
  53.     return  tmp; 

整个示例最重要的是两个函数:

GetPartialPacket:取到一个临时缓冲节点

InsertFragment:把收到的每个数据段插入临时缓冲组成一个完整帧

当然发送方也要按一定规则分割发送。

抱歉!评论已关闭.