现在的位置: 首页 > 综合 > 正文

Hadoop NameNode启动之ReplicationMonitor(七–1)

2013年03月31日 ⁄ 综合 ⁄ 共 7705字 ⁄ 字号 评论关闭

  

   该类是一个FSNameSystem内部类,负责清理无效块、维护副本数的一个线程,相对于前面提到的线程,这是个大块头,线程运行也是周期性的,主要由两个函数来实现,而这两个函数也是比较大的,我们分开讲。

          computeDatanodeWork();

          processPendingReplications();

先看computeDatanodeWork(),主要用来计算需要复制或清理的块,填充相应集合,在下次心跳时发送给DataNode,有Datanode来执行相应操作。

public int computeDatanodeWork() throws IOException {
    int replicationWorkFound = 0;
    int invalidationWorkFound = 0;
    int blocksToProcess = 0;
    int nodesToProcess = 0;
 
    // 刚开始启动时,系统处于安全模式两个函数直接返回
    if (isInSafeMode()) {
      replmon.replicateQueueStats.checkRestart();
      replmon.invalidateQueueStats.checkRestart();
      return 0;
    }
synchronized(heartbeats) {
/**
*之所以进行下面的计算,是要防止HDFS的后台操作影响前台用户的体验,现在是用户体验
*为王的年代,所以后台操作要经过精心计算的,既要保证前台用户体验,又要完成系统的
*处理,其他系统也能找到类似的做法,比如Oracle里的增量检查点。
**/
  //一次需要处理的块数量,默认为活跃DD*2
      blocksToProcess = (int)(heartbeats.size()
       *ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
      //处理无效块的节点数,默认为ceil(活跃节点数×32%)
nodesToProcess = (int)Math.ceil((double)heartbeats.size()
          * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
    }
//进入三板斧阶段 开始循环 计算工作量 结束循环 ,在处理无效块的时候
//也是这个流程,在startCycle的函数中会计算处理队列的状态
    replmon.replicateQueueStats.startCycle(blocksToProcess);
    replicationWorkFound =computeReplicationWork(blocksToProcess);
    replmon.replicateQueueStats.endCycle(replicationWorkFound);
   
    // Update FSNamesystemMetrics counters
    synchronized (this) {
      pendingReplicationBlocksCount = pendingReplications.size();
      underReplicatedBlocksCount = neededReplications.size();
      scheduledReplicationBlocksCount = replicationWorkFound;
      corruptReplicaBlocksCount = corruptReplicas.size();
    }
   
    replmon.invalidateQueueStats.startCycle(nodesToProcess);
    invalidationWorkFound =computeInvalidateWork(nodesToProcess);
    replmon.invalidateQueueStats.endCycle(invalidationWorkFound);
 
    return replicationWorkFound +invalidationWorkFound;
  }
 

//块复制三板斧之第一斧:在处理块复制或无效块删除前计算队列状态,这是个父类函数

 public void startCycle(int maxWorkToProcess) {
    //队列里所有的数据都已经处理完则直接返回
    if (state == State.DONE_COLLECTING) return;
   
    //记录当前循环开始时间,并检测是否能一次处理完毕
    startTimeCurrentCycle = now();
    boolean preDetectLastCycle =preCheckIsLastCycle(maxWorkToProcess);
    //这里不用多说了,看一下State这个枚举就明白,主要记录队列状态
    switch (state) {
    case BEGIN_COLLECTING:
      startTime = startTimeCurrentCycle;
      state = preDetectLastCycle ? State.IN_SOLE_CYCLE : State.IN_FIRST_CYCLE;
      break;
    default:
      if (preDetectLastCycle)
        state = State.IN_LAST_CYCLE;
      break;
    }
  }

//块复制三板斧之第二斧:计算需要被处理的块,并放入DD的集合,等下次心跳时会把指令发送给DD

private int computeReplicationWork(
                  int blocksToProcess) throws IOException {
// 无事可做,直接返回
if (stallReplicationWork)  {
      return 0;
    }
   
    // 计算那些块要复制,放在一个二维列表里,关于如何选择块,下面会讲
    List<List<Block>>blocksToReplicate =
     chooseUnderReplicatedBlocks(blocksToProcess);
 
    // replicate blocks
    int scheduledReplicationCount = 0;
    for (int i=0; i<blocksToReplicate.size(); i++) {
      for(Block block : blocksToReplicate.get(i)) {
        if (computeReplicationWorkForBlock(block, i)) {
          scheduledReplicationCount++;
        }
      }
    }
    return scheduledReplicationCount;
  }


/**

*       获得被复制块的集合

**/

synchronized List<List<Block>>chooseUnderReplicatedBlocks(intblocksToProcess) {
    // 构建一个空格二级块链表用于返回
    List<List<Block>>blocksToReplicate =
      newArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);
    for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {
      blocksToReplicate.add(new ArrayList<Block>());
    }
   
    synchronized(neededReplications) {
      if (neededReplications.size() == 0) {
        missingBlocksInCurIter = 0;
        missingBlocksInPrevIter = 0;
        return blocksToReplicate;
      }
     
      // 获取各优先级链表的迭代器,优先级的上限值为3
      //根据level的值确定优先级,分别为0 1 2,其中0最高
      BlockIterator neededReplicationsIterator =
      neededReplications.iterator();
      // 跳过第一个块
      for(int i=0; i < replIndex && neededReplicationsIterator.hasNext();
        i++) {
        neededReplicationsIterator.next();
      }
      // 获得真正本次处理的块数
      blocksToProcess = Math.min(blocksToProcess,
      neededReplications.size());
      //遍历块把相应块添加到blocksToReplicate
      for (int blkCnt = 0; blkCnt < blocksToProcess;blkCnt++, replIndex++) {
        if( ! neededReplicationsIterator.hasNext()) {
          // start from the beginning
          replIndex = 0;
          missingBlocksInPrevIter = missingBlocksInCurIter;
          missingBlocksInCurIter = 0;
          blocksToProcess = Math.min(blocksToProcess,
          neededReplications.size());
          if(blkCnt >= blocksToProcess)
            break;
          neededReplicationsIterator = neededReplications.iterator();
          assert neededReplicationsIterator.hasNext() :
                                  "neededReplications should not beempty.";
        }
        //获得一个块,如果其优先级正常,才加入队列
        Block block =neededReplicationsIterator.next();
        int priority =neededReplicationsIterator.getPriority();
        if (priority < 0 || priority >=blocksToReplicate.size()) {
          LOG.warn("Unexpected replication priority:" + priority + " "
                                        + block);
        } else {
         blocksToReplicate.get(priority).add(block);
        }
      } // end for
    } // end synchronized
    returnblocksToReplicate;
 }

/***

* computeReplicationWorkForBlock这个函数用来计算数据块的源节点和目的节点,

* 更新源节点的replicateBlocks,从而告知源节点要把块往哪里复制

*
函数比较长,但流程还算清楚,如下:

**/

boolean computeReplicationWorkForBlock(Block block, int priority) {
    int requiredReplication, numEffectiveReplicas;
    List<DatanodeDescriptor>containingNodes;
    DatanodeDescriptor srcNode;
   
    synchronized (this) {
      synchronized (neededReplications) {
        // 获得块对应文件的源数据
        INodeFile fileINode = blocksMap.getINode(block);
        // 如果文件不存在,则把该块信息删除,然后返回
        if(fileINode == null || fileINode.isUnderConstruction()) {
         
          neededReplications.remove(block, priority);
          replIndex--;
          return false;
        }
        //获得目标副本数
        requiredReplication =fileINode.getReplication();
 
        // 获得该块的源节点
        containingNodes = new ArrayList<DatanodeDescriptor>();
        NumberReplicas numReplicas = new NumberReplicas();
        srcNode = chooseSourceDatanode(block,containingNodes, numReplicas);
        //如果活跃块数加退役节点块数小于0,则说明该块丢失
        if ((numReplicas.liveReplicas() +numReplicas.decommissionedReplicas())
            <= 0) {         
          missingBlocksInCurIter++;
        }
        //源节点为空,那没什么好说了,直接返回
        if(srcNode == null) // block can not be replicated from any node
          return false;
 
        // 注意有效副本数=活跃副本数+已经在复制的副本数
        numEffectiveReplicas =numReplicas.liveReplicas() +
                                pendingReplications.getNumReplicas(block);
       //如果有效副本数大于期望值,直接跳过了
        if(numEffectiveReplicas >=requiredReplication) {
         neededReplications.remove(block, priority);
          replIndex--;
          NameNode.stateChangeLog.info("BLOCK* "
              + "Removing block " + block
              + " from neededReplications as it hasenough replicas.");
          return false;
        }
      }
    }
 
    // 真正开始选择目标节点,有兴趣的朋友自己看下这个函数
    //选择完毕后,如果发现没有有效目标节点,返回
    DatanodeDescriptor targets[] = replicator.chooseTarget(
        requiredReplication -numEffectiveReplicas,
        srcNode, containingNodes, null, block.getNumBytes());
    if(targets.length == 0)
      return false;
 
    synchronized (this) {
      synchronized (neededReplications) {
        // Recheck since global lock was released
        // block should belong to a file
        INodeFile fileINode = blocksMap.getINode(block);
        // abandoned block or block reopened forappend
        if(fileINode == null || fileINode.isUnderConstruction()) {
          neededReplications.remove(block, priority); // remove from neededReplications
          replIndex--;
          return false;
        }
        requiredReplication =fileINode.getReplication();
 
        // do not schedule more if enough replicasis already pending
        NumberReplicas numReplicas =countNodes(block);
        numEffectiveReplicas =numReplicas.liveReplicas() +
        pendingReplications.getNumReplicas(block);
        if(numEffectiveReplicas >=requiredReplication) {
          neededReplications.remove(block, priority); // remove from neededReplications
          replIndex--;
          NameNode.stateChangeLog.info("BLOCK* "
              + "Removing block " + block
              + " from neededReplications as it hasenough replicas.");
          return false;
        }
 
        // 注意了,这里才真正把块信息和目标节点信息加入源节点,历经了九九八十一难!!!
        srcNode.addBlockToBeReplicated(block,targets);
 
        for (DatanodeDescriptor dn : targets) {
          dn.incBlocksScheduled();
        }
       
        // 把块信息加入pendingReplications,之所以加入到这个集合,
        //是因为如果一次不成功下次还会继续复制
        pendingReplications.add(block, targets.length);
        NameNode.stateChangeLog.debug(
            "BLOCK* block " + block
            + " is moved from neededReplications topendingReplications");
 
        // 删除neededReplications里的块信息
        if(numEffectiveReplicas + targets.length >= requiredReplication) {
          neededReplications.remove(block, priority); // remove from neededReplications
          replIndex--;
        }
 
        //下面这段日志应该很熟悉了,因为我们在namenode日志里经常看到
        if (NameNode.stateChangeLog.isInfoEnabled()) {
          StringBuffer targetList = new StringBuffer("datanode(s)");
          for (int k = 0; k < targets.length; k++) {
            targetList.append(' ');
           targetList.append(targets[k].getName());
          }
          NameNode.stateChangeLog.info(
                    "BLOCK* ask "
                    + srcNode.getName() + " to replicate "
                    + block + " to " + targetList);
          NameNode.stateChangeLog.debug(
                    "BLOCK* neededReplications = " + neededReplications.size()
                    + " pendingReplications = " + pendingReplications.size());
        }
      }
    }
   
    return true;
  }

//块复制三板斧之第三斧:最后在endCycle函数中更新统计信息,函数内部较简单,这里就不贴源码了


抱歉!评论已关闭.