该类是一个FSNameSystem内部类,负责清理无效块、维护副本数的一个线程,相对于前面提到的线程,这是个大块头,线程运行也是周期性的,主要由两个函数来实现,而这两个函数也是比较大的,我们分开讲。
computeDatanodeWork();
processPendingReplications();
先看computeDatanodeWork(),主要用来计算需要复制或清理的块,填充相应集合,在下次心跳时发送给DataNode,有Datanode来执行相应操作。
public int computeDatanodeWork() throws IOException { int replicationWorkFound = 0; int invalidationWorkFound = 0; int blocksToProcess = 0; int nodesToProcess = 0; // 刚开始启动时,系统处于安全模式两个函数直接返回 if (isInSafeMode()) { replmon.replicateQueueStats.checkRestart(); replmon.invalidateQueueStats.checkRestart(); return 0; } synchronized(heartbeats) { /** *之所以进行下面的计算,是要防止HDFS的后台操作影响前台用户的体验,现在是用户体验 *为王的年代,所以后台操作要经过精心计算的,既要保证前台用户体验,又要完成系统的 *处理,其他系统也能找到类似的做法,比如Oracle里的增量检查点。 **/ //一次需要处理的块数量,默认为活跃DD*2 blocksToProcess = (int)(heartbeats.size() *ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION); //处理无效块的节点数,默认为ceil(活跃节点数×32%) nodesToProcess = (int)Math.ceil((double)heartbeats.size() * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100); } //进入三板斧阶段 开始循环 计算工作量 结束循环 ,在处理无效块的时候 //也是这个流程,在startCycle的函数中会计算处理队列的状态 replmon.replicateQueueStats.startCycle(blocksToProcess); replicationWorkFound =computeReplicationWork(blocksToProcess); replmon.replicateQueueStats.endCycle(replicationWorkFound); // Update FSNamesystemMetrics counters synchronized (this) { pendingReplicationBlocksCount = pendingReplications.size(); underReplicatedBlocksCount = neededReplications.size(); scheduledReplicationBlocksCount = replicationWorkFound; corruptReplicaBlocksCount = corruptReplicas.size(); } replmon.invalidateQueueStats.startCycle(nodesToProcess); invalidationWorkFound =computeInvalidateWork(nodesToProcess); replmon.invalidateQueueStats.endCycle(invalidationWorkFound); return replicationWorkFound +invalidationWorkFound; }
//块复制三板斧之第一斧:在处理块复制或无效块删除前计算队列状态,这是个父类函数
public void startCycle(int maxWorkToProcess) { //队列里所有的数据都已经处理完则直接返回 if (state == State.DONE_COLLECTING) return; //记录当前循环开始时间,并检测是否能一次处理完毕 startTimeCurrentCycle = now(); boolean preDetectLastCycle =preCheckIsLastCycle(maxWorkToProcess); //这里不用多说了,看一下State这个枚举就明白,主要记录队列状态 switch (state) { case BEGIN_COLLECTING: startTime = startTimeCurrentCycle; state = preDetectLastCycle ? State.IN_SOLE_CYCLE : State.IN_FIRST_CYCLE; break; default: if (preDetectLastCycle) state = State.IN_LAST_CYCLE; break; } }
//块复制三板斧之第二斧:计算需要被处理的块,并放入DD的集合,等下次心跳时会把指令发送给DD
private int computeReplicationWork( int blocksToProcess) throws IOException { // 无事可做,直接返回 if (stallReplicationWork) { return 0; } // 计算那些块要复制,放在一个二维列表里,关于如何选择块,下面会讲 List<List<Block>>blocksToReplicate = chooseUnderReplicatedBlocks(blocksToProcess); // replicate blocks int scheduledReplicationCount = 0; for (int i=0; i<blocksToReplicate.size(); i++) { for(Block block : blocksToReplicate.get(i)) { if (computeReplicationWorkForBlock(block, i)) { scheduledReplicationCount++; } } } return scheduledReplicationCount; }
/**
* 获得被复制块的集合
**/
synchronized List<List<Block>>chooseUnderReplicatedBlocks(intblocksToProcess) { // 构建一个空格二级块链表用于返回 List<List<Block>>blocksToReplicate = newArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL); for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) { blocksToReplicate.add(new ArrayList<Block>()); } synchronized(neededReplications) { if (neededReplications.size() == 0) { missingBlocksInCurIter = 0; missingBlocksInPrevIter = 0; return blocksToReplicate; } // 获取各优先级链表的迭代器,优先级的上限值为3 //根据level的值确定优先级,分别为0 1 2,其中0最高 BlockIterator neededReplicationsIterator = neededReplications.iterator(); // 跳过第一个块 for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) { neededReplicationsIterator.next(); } // 获得真正本次处理的块数 blocksToProcess = Math.min(blocksToProcess, neededReplications.size()); //遍历块把相应块添加到blocksToReplicate for (int blkCnt = 0; blkCnt < blocksToProcess;blkCnt++, replIndex++) { if( ! neededReplicationsIterator.hasNext()) { // start from the beginning replIndex = 0; missingBlocksInPrevIter = missingBlocksInCurIter; missingBlocksInCurIter = 0; blocksToProcess = Math.min(blocksToProcess, neededReplications.size()); if(blkCnt >= blocksToProcess) break; neededReplicationsIterator = neededReplications.iterator(); assert neededReplicationsIterator.hasNext() : "neededReplications should not beempty."; } //获得一个块,如果其优先级正常,才加入队列 Block block =neededReplicationsIterator.next(); int priority =neededReplicationsIterator.getPriority(); if (priority < 0 || priority >=blocksToReplicate.size()) { LOG.warn("Unexpected replication priority:" + priority + " " + block); } else { blocksToReplicate.get(priority).add(block); } } // end for } // end synchronized returnblocksToReplicate; }
/***
* computeReplicationWorkForBlock这个函数用来计算数据块的源节点和目的节点,
* 更新源节点的replicateBlocks,从而告知源节点要把块往哪里复制
*
函数比较长,但流程还算清楚,如下:
**/
boolean computeReplicationWorkForBlock(Block block, int priority) { int requiredReplication, numEffectiveReplicas; List<DatanodeDescriptor>containingNodes; DatanodeDescriptor srcNode; synchronized (this) { synchronized (neededReplications) { // 获得块对应文件的源数据 INodeFile fileINode = blocksMap.getINode(block); // 如果文件不存在,则把该块信息删除,然后返回 if(fileINode == null || fileINode.isUnderConstruction()) { neededReplications.remove(block, priority); replIndex--; return false; } //获得目标副本数 requiredReplication =fileINode.getReplication(); // 获得该块的源节点 containingNodes = new ArrayList<DatanodeDescriptor>(); NumberReplicas numReplicas = new NumberReplicas(); srcNode = chooseSourceDatanode(block,containingNodes, numReplicas); //如果活跃块数加退役节点块数小于0,则说明该块丢失 if ((numReplicas.liveReplicas() +numReplicas.decommissionedReplicas()) <= 0) { missingBlocksInCurIter++; } //源节点为空,那没什么好说了,直接返回 if(srcNode == null) // block can not be replicated from any node return false; // 注意有效副本数=活跃副本数+已经在复制的副本数 numEffectiveReplicas =numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); //如果有效副本数大于期望值,直接跳过了 if(numEffectiveReplicas >=requiredReplication) { neededReplications.remove(block, priority); replIndex--; NameNode.stateChangeLog.info("BLOCK* " + "Removing block " + block + " from neededReplications as it hasenough replicas."); return false; } } } // 真正开始选择目标节点,有兴趣的朋友自己看下这个函数 //选择完毕后,如果发现没有有效目标节点,返回 DatanodeDescriptor targets[] = replicator.chooseTarget( requiredReplication -numEffectiveReplicas, srcNode, containingNodes, null, block.getNumBytes()); if(targets.length == 0) return false; synchronized (this) { synchronized (neededReplications) { // Recheck since global lock was released // block should belong to a file INodeFile fileINode = blocksMap.getINode(block); // abandoned block or block reopened forappend if(fileINode == null || fileINode.isUnderConstruction()) { neededReplications.remove(block, priority); // remove from neededReplications replIndex--; return false; } requiredReplication =fileINode.getReplication(); // do not schedule more if enough replicasis already pending NumberReplicas numReplicas =countNodes(block); numEffectiveReplicas =numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); if(numEffectiveReplicas >=requiredReplication) { neededReplications.remove(block, priority); // remove from neededReplications replIndex--; NameNode.stateChangeLog.info("BLOCK* " + "Removing block " + block + " from neededReplications as it hasenough replicas."); return false; } // 注意了,这里才真正把块信息和目标节点信息加入源节点,历经了九九八十一难!!! srcNode.addBlockToBeReplicated(block,targets); for (DatanodeDescriptor dn : targets) { dn.incBlocksScheduled(); } // 把块信息加入pendingReplications,之所以加入到这个集合, //是因为如果一次不成功下次还会继续复制 pendingReplications.add(block, targets.length); NameNode.stateChangeLog.debug( "BLOCK* block " + block + " is moved from neededReplications topendingReplications"); // 删除neededReplications里的块信息 if(numEffectiveReplicas + targets.length >= requiredReplication) { neededReplications.remove(block, priority); // remove from neededReplications replIndex--; } //下面这段日志应该很熟悉了,因为我们在namenode日志里经常看到 if (NameNode.stateChangeLog.isInfoEnabled()) { StringBuffer targetList = new StringBuffer("datanode(s)"); for (int k = 0; k < targets.length; k++) { targetList.append(' '); targetList.append(targets[k].getName()); } NameNode.stateChangeLog.info( "BLOCK* ask " + srcNode.getName() + " to replicate " + block + " to " + targetList); NameNode.stateChangeLog.debug( "BLOCK* neededReplications = " + neededReplications.size() + " pendingReplications = " + pendingReplications.size()); } } } return true; }
//块复制三板斧之第三斧:最后在endCycle函数中更新统计信息,函数内部较简单,这里就不贴源码了