from: http://blog.csdn.net/chenbuaa/article/details/2301656
emule中节点加入Kad网络过程(源代码详解)
程序启动:
EmuleDlg.cpp中函数BOOL CemuleDlg::OnInitDialog(),此函数用于对话框的初始化,在这个函数里添加了定时器:VERIFY( (m_hTimer = ::SetTimer(NULL, NULL, 300, StartupTimer)) != NULL );
在这里添加了函数void CALLBACK CemuleDlg::StartupTimer(HWND /*hwnd*/, UINT /*uiMsg*/, UINT /*idEvent*/, DWORD /*dwTime*/),
case 2:
theApp.Kad_Dlg->status++;
if(!theApp.listensocket->StartListening())
ASSERT(0);
if(!theApp.clientudp->Create())
ASSERT(0);
theApp.Kad_Dlg->status++;
break;
[PS: 现在已经不是这样了,没有了Kad_Dlg, 在cemuleDlg.cpp的2087行调用了Kad的Start()函数]
在StartupTimer这个函数里,添加了一个ListenSocket的侦听端,并且在本地节点创建了一个CClientUDPSocket* clientudp;
然后程序启动。
顺便说一句,在CEmule类中定义了许多的类的实例,这都在今后使用到:
UploadBandwidthThrottler* uploadBandwidthThrottler;
CClientList* clientlist;
CClientUDPSocket* clientudp;
CListenSocket* listensocket;
CSharedFileList* sharedfiles;
CDownloadQueue* downloadqueue;
CUploadQueue* uploadqueue;
CServerList* serverlist;
LastCommonRouteFinder* lastCommonRouteFinder;
CServerConnect* serverconnect;
CIPFilter* ipfilter;
CClientCreditsList* clientcredits;
CSearchList* searchlist;
CKnownFileList* knownfiles;
CMMServer* mmserver;
AppState m_app_state; // defines application state for shutdown
CMutex hashing_mut;
CString m_strCurVersionLong;
CPeerCacheFinder* m_pPeerCache;
CFriendList* friendlist;
CFirewallOpener* m_pFirewallOpener;//hyper added
节点加入网络:
[emuledlg.cpp的:2087行 ]
Emule连接Kad网络时,调用函数:Kademlia::CKademlia::Start(); Start()这个函数没有做什么实际意义上的事情,主要是new了几个类:
m_pInstance = new CKademlia();
m_pInstance->m_pPrefs = pPrefs;
m_pInstance->m_pUDPListener = NULL;
m_pInstance->m_pRoutingZone = NULL;
m_pInstance->m_pIndexed = new CIndexed();
m_pInstance->m_pRoutingZone = new CRoutingZone();
m_pInstance->m_pUDPListener = new CKademliaUDPListener();
并且更改了几个定时器的时间。
接着程序转入到routingzone.cpp中执行。
在上面那部分的Start ()函数体内部初始化了CRoutingZone这个类,这个类的构造函数CRoutingZone::CRoutingZone()体中调用函数 Init(NULL, 0, CUInt128((ULONG)0));来初始化根节点(应该就是本地节点)。
// Can only create routing zone after prefs
// Set our KadID for creating the contact tree
CKademlia ::GetPrefs ()-> GetKadID(& uMe );
m_sFilename = szFilename ;
// Init our root node.
Init (NULL ,
0, CUInt128(( ULONG )0));
在void CRoutingZone::Init(CRoutingZone *pSuper_zone, int iLevel, const CUInt128 &uZone_index)函数体内部创建了一个新的m_pBin = new CRoutingBin();
// Init all Zone vars
// Set this zones parent
m_pSuperZone = pSuper_zone ;
// Set this zones level
m_uLevel = iLevel ;
// Set this zones CUInt128 Index
m_uZoneIndex = uZone_index ;
// Mark this zone has having now leafs.
m_pSubZones [0] = NULL ;
m_pSubZones [1] = NULL ;
// Create a new contact bin as this is a leaf.
m_pBin = new CRoutingBin();
// Set timer so that zones closer to the root are processed earlier.
m_tNextSmallTimer = time ( NULL)
+ m_uZoneIndex .Get32BitChunk (3);
// Start this zone.
StartTimer ();
// If we are initializing the root node, read in our saved contact list.
if ((m_pSuperZone == NULL)
&& ( m_sFilename .GetLength () > 0))
ReadFile ();
接着调用函数StartTime(),用来开始这个区域。在StartTime()函数内部添加事件CKademlia::AddEvent(this);
time_t tNow = time( NULL );
// Start filling the tree, closest bins first.
m_tNextBigTimer = tNow + SEC(10);
CKademlia ::AddEvent ( this);
在调用完函数StartTime()函数后,从文件中读取以前保存的联系人。
在调用完函数Kademlia::CKademlia::Start();之后,Kademlia开始处理,转入函数Kademlia:: CKademlia::Process()开始执行,在函数void CKademlia::Process()中调用函数pZone->OnSmallTimer();即CRoutingZone中 OnSmallTimer().。
line 274:
if (pZone -> m_tNextSmallTimer <= tNow )
{
pZone ->OnSmallTimer ();
pZone ->m_tNextSmallTimer = MIN2S(1)
+ tNow ;
}
CRoutingZone中OnSmallTimer(),在此函数体内,当判断联系人为非空时,调用函数 CKademlia::GetUDPListener()->SendMyDetails_KADEMLIA2(KADEMLIA2_HELLO_REQ, pContact->GetIPAddress(), pContact->GetUDPPort());来发送本地节点的一些信息,其中函数的第一个参数是消息的类型,
KADEMLIA2_HELLO_REQ表明是Kademlia 2.0网络的加入请求,相当于TCP/IP中的ACK,即表明这个消息是用来加入网络的。第二个参数是本地节点的IP,第三个节点是本地节点的端口。
if (pContact != NULL)
{
pContact ->CheckingType ();
if (pContact -> GetVersion()
>= 6){ /*48b*/
if (thePrefs . GetDebugClientKadUDPLevel()
> 0)
DebugSend ("KADEMLIA2_HELLO_REQ" , pContact ->GetIPAddress (), pContact-> GetUDPPort ());
CUInt128 uClientID = pContact-> GetClientID ();
CKademlia ::GetUDPListener ()-> SendMyDetails( KADEMLIA2_HELLO_REQ , pContact ->GetIPAddress (), pContact-> GetUDPPort (), pContact -> GetVersion(), pContact ->GetUDPKey (),
& uClientID, false );
if (pContact -> GetVersion()
>= KADEMLIA_VERSION8_49b ){
// FIXME:
// This is a bit of a work arround for statistic values. Normally we only count values from
incoming HELLO_REQs for
// the firewalled statistics in order to get numbers from nodes which have us on their routing
table,
// however if we send a HELLO due to the timer, the remote node won't send a HELLO_REQ itself
anymore (but
// a HELLO_RES which we don't count), so count those statistics here. This isn't really accurate,
but it should
// do fair enough. Maybe improve it later for example by putting a flag into the contact
and make the answer count
CKademlia ::GetPrefs ()-> StatsIncUDPFirewalledNodes( false );
CKademlia ::GetPrefs ()-> StatsIncTCPFirewalledNodes( false );
}
接着转入KademliaUDPListener.cpp中函数void CKademliaUDPListener::SendMyDetails_KADEMLIA2(byte byOpcode,
uint32 uIP, uint16 uUDPPort)运行,主要是调用函数SendPacket(byPacket, uLen, uIP, uUDPPort);,SendPacket(byPacket, uLen, uIP, uUDPPort);函数在KademliaUDPListener.cpp内部,此函数体内部调用函数theApp.clientudp->
SendPacket(pPacket, ntohl(uDestinationHost), uDestinationPort);来发送包。
uint32 uLen = sizeof( byPacket )
- byteIOResponse . GetAvailable();
if (byKadVersion >= KADEMLIA_VERSION6_49aBETA){
if (isnulmd4 ( uCryptTargetID-> GetDataPtr ())){
DebugLogWarning (_T ( "Sending
hello response to crypt enabled Kad Node which provided an empty NodeID: %s (%u)"), ipstr (ntohl ( uIP)), byKadVersion );
SendPacket (byPacket , uLen, uIP , uUDPPort , targetUDPKey, NULL );
}
else
SendPacket (byPacket , uLen, uIP , uUDPPort , targetUDPKey, uCryptTargetID );
}
else {
SendPacket (byPacket , uLen, uIP , uUDPPort ,
0, NULL);
ASSERT ( targetUDPKey . IsEmpty()
);
}
KademliaUDPListener.cpp内部CKademliaUDPListener ::SendPacket之一:
{
if (uLenData <
2) {
ASSERT (0);
return ;
}
AddTrackedOutPacket (uDestinationHost , pbyData[1]);
Packet * pPacket = new Packet (OP_KADEMLIAHEADER );
pPacket ->opcode = pbyData[1];
pPacket ->pBuffer = new char [uLenData +8];
memcpy (pPacket -> pBuffer, pbyData +2, uLenData -2);
pPacket ->size = uLenData-2;
if ( uLenData >
200 )
pPacket ->PackPacket ();
theStats .AddUpDataOverheadKad ( pPacket-> size );
theApp .clientudp -> SendPacket( pPacket , ntohl ( uDestinationHost), uDestinationPort , true
, ( uCryptTargetID != NULL )
? uCryptTargetID-> GetData () : NULL
, true , targetUDPKey . GetKeyValue( theApp .GetPublicIP ( false)));
}
ClientUDPSocket.cpp中(565line)函数theApp.clientudp->SendPacket(pPacket, ntohl(uDestinationHost), uDestinationPort);体内部将刚才的消息包(或者叫数据包)加入到controlpacket_queue的队尾,
controlpacket_queue.AddTail(newpending); // line586
controlpacket_queue是一个链表,类型是CTypedPtrList<CPtrList, UDPPack*> controlpacket_queue;,
CTypedPtrList <CPtrList , UDPPack*> controlpacket_queue ;
// ZZ:UploadBandWithThrottler (UDP) -->
sendLocker. Lock ();
controlpacket_queue .AddTail ( newpending);
sendLocker. Unlock ();
theApp. uploadBandwidthThrottler ->QueueForSendingControlPacket ( this);
return true ;
// <-- ZZ:UploadBandWithThrottler (UDP)
是通过模板来实现的。接着继续调用函数theApp.uploadBandwidthThrottler- >QueueForSendingControlPacket(this);此时数据包在链表UploadBandwidthThrottler* uploadBandwidthThrottler;中排队。
类UploadBandwidthThrottler继承自CWinThread类,主要是作为线程来运行的。
类在初始化,在构造函数中调用函数 UINT AFX_CDECL UploadBandwidthThrottler::RunProc(LPVOID pParam),
UploadBandwidthThrottler ::UploadBandwidthThrottler ( void)
{
m_SentBytesSinceLastCall =
0;
m_SentBytesSinceLastCallOverhead =
0;
m_highestNumberOfFullyActivatedSlots =
0;
threadEndedEvent = new CEvent(0,
1);
pauseEvent = new CEvent( TRUE , TRUE );
doRun = true ;
AfxBeginThread (RunProc ,
( LPVOID) this );
}
UINT AFX_CDECL UploadBandwidthThrottler:: RunProc (LPVOID pParam)
{
DbgSetThreadName ("UploadBandwidthThrottler" );
InitThreadLocale ();
UploadBandwidthThrottler * uploadBandwidthThrottler =
( UploadBandwidthThrottler*) pParam ;
return uploadBandwidthThrottler -> RunInternal();