现在的位置: 首页 > 综合 > 正文

Hadoop RPC机制

2013年07月29日 ⁄ 综合 ⁄ 共 4662字 ⁄ 字号 评论关闭

Hadoop RPC机制

转载:http://www.iteye.com/topic/709993

1、心跳机制 


心跳的机制大概是这样的: 
1) master启动的时候,会开一个ipc server在那里。 
2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。 

2、找到心跳的代码 

拿namenode和datanode来说,在datanode的offerService方法中,每隔3秒向namenode发送心跳的代码: 

Java代码  收藏代码
  1. /** 
  2.   * Main loop for the DataNode.  Runs until shutdown, 
  3.   * forever calling remote NameNode functions. 
  4.   */  
  5.  public void offerService() throws Exception {  
  6.       
  7.    ...  
  8.   
  9.    //  
  10.    // Now loop for a long time....  
  11.    //  
  12.   
  13.    while (shouldRun) {  
  14.      try {  
  15.        long startTime = now();  
  16.   
  17.        //  
  18.        // Every so often, send heartbeat or block-report  
  19.        //  
  20.          
  21. // 如果到了3秒钟,就向namenode发心跳  
  22.        if (startTime - lastHeartbeat > heartBeatInterval) {  
  23.          //  
  24.          // All heartbeat messages include following info:  
  25.          // -- Datanode name  
  26.          // -- data transfer port  
  27.          // -- Total capacity  
  28.          // -- Bytes remaining  
  29.          //  
  30.          lastHeartbeat = startTime;  
  31.          DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,  
  32.                                                       data.getCapacity(),  
  33.                                                       data.getDfsUsed(),  
  34.                                                       data.getRemaining(),  
  35.                                                       xmitsInProgress.get(),  
  36.                                                       getXceiverCount());  
  37.   
  38.   // 注意上面这行代码,“发送心跳”竟然就是调用namenode的一个方法??  
  39.   
  40.          myMetrics.heartbeats.inc(now() - startTime);  
  41.          //LOG.info("Just sent heartbeat, with name " + localName);  
  42.   
  43.   // 处理对心跳的返回值(namenode传给datanode的指令)  
  44.          if (!processCommand(cmds))  
  45.            continue;  
  46.        }  
  47.   
  48.     // 这里省略很多代码  
  49. ...  
  50.    } // while (shouldRun)  
  51.  } // offerService  

上面这段代码,如果是单机的程序,没什么值得奇怪的。但是,这是hadoop集群!datanode和namenode在2台不同的机器(或2个JVM)上运行!datanode机器竟然直接调用namenode的方法!这是怎么实现的?难道是传说中的RMI吗?? 

下面我们主要就来分析这个方法调用的细节。 

3、心跳的底层细节一:datanode怎么获得namenode对象的? 

首先,DataNode类中,有一个namenode的成员变量: 

Java代码  收藏代码
  1. public class DataNode extends Configured   
  2.     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {  
  3.   ...  
  4.   public DatanodeProtocol namenode = null;  
  5.   ...   
  6. }  

下面是NameNode类的定义: 

Java代码  收藏代码
  1. public class NameNode implements ClientProtocol, DatanodeProtocol,  
  2.                                  NamenodeProtocol, FSConstants,  
  3.                                  RefreshAuthorizationPolicyProtocol {  
  4.   ...   
  5. }  

注意:NameNode实现了DatanodeProtocol接口,DatanodeProtocol接口定义了namenode和datanode之间通信的方法。 

那么,DataNode类是怎么获取到NameNode类的引用呢? 

在Datanode端,为namenode变量赋值的代码: 

Java代码  收藏代码
  1. // connect to name node  
  2. this.namenode = (DatanodeProtocol)   
  3.   RPC.waitForProxy(DatanodeProtocol.class,  
  4.                    DatanodeProtocol.versionID,  
  5.                    nameNodeAddr,   
  6.                    conf);  

在继续去RPC类中追踪: 

Java代码  收藏代码
  1. VersionedProtocol proxy =  
  2.         (VersionedProtocol) Proxy.newProxyInstance(  
  3.             protocol.getClassLoader(), new Class[] { protocol },  
  4.             new Invoker(addr, ticket, conf, factory));  

现在,明白了! 
1) 对namenode的赋值,并不是真正的new了一个实现了DatanodeProtocol接口的对象,而是获得了一个动态代理!! 
2) 上面这段代码中,protocol的类型是DatanodeProtocol.class 
3) 对namenode的所有调用,都被委托(delegate)给了Invoker 

4、心跳的底层细节二:看看Invoker类 

Invoker类是org.apache.hadoop.ipc.RPC类的一个静态内部类: 

Java代码  收藏代码
  1. private static class Invoker implements InvocationHandler {  

在这个类中,看invoke方法: 
  

Java代码  收藏代码
  1. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
  2.             ...  
  3.   
  4.               ObjectWritable value = (ObjectWritable)  
  5.                 client.call(new Invocation(method, args), address,   
  6.                             method.getDeclaringClass(), ticket);  
  7.                 ...  
  8.               return value.get();  
  9.            }  

所有的方法调用又被delegate给client的call方法了! 

client是Invoker中的成员变量: 
  

Java代码  收藏代码
  1. private Client client;  

所以可以看出:DatanodeProtocol中的每个方法调用,都被包装成一个Invocation对象,再由client.call()调用 

5、心跳的底层细节三:Invocation类 

Invocation类是org.apache.hadoop.ipc.RPC类的一个静态内部类 

没有什么业务逻辑方法,主要作用就是一个VO 

6、心跳的底层细节四:client类的call方法 

接下来重点看client类的call方法: 

Java代码  收藏代码
  1. public Writable call(Writable param, InetSocketAddress addr,   
  2.                      Class<?> protocol, UserGroupInformation ticket)    
  3.                      throws InterruptedException, IOException {  
  4.   
  5.   Call call = new Call(param);     
  6. // 将Invocation转化为Call  
  7.   Connection connection = getConnection(addr, protocol, ticket, call);  
  8. // 连接远程服务器  
  9.   connection.sendParam(call);                 // send the parameter  
  10. // 将“序列化”后的call发给过去  
  11.   boolean interrupted = false;  
  12.   synchronized (call) {  
  13.     while (!call.done) {  
  14.       try {  
  15.         call.wait();                           // wait for the result  
  16. // 等待调用结果  

抱歉!评论已关闭.