PendingReplicationMonitor为PendingReplicationBlocks的内部类,先看PendingReplicationBlocks,主要记录被复制的块,周期性的检测块复制请求,主要维护以下几个变量:
private Map<Block, PendingBlockInfo> pendingReplications; private ArrayList<Block> timedOutItems; Daemon timerThread = null; private volatile boolean fsRunning = true;
检测时间的间隔默认为5分钟:
private long timeout = 5 * 60 * 1000; private long defaultRecheckInterval = 5 * 60 * 1000;
在创建PendingReplicationBlocks的时候会启动周期检测进程,由实现Runnable的PendingReplicationMonitor类实现:
void init() { pendingReplications = new HashMap<Block, PendingBlockInfo>(); timedOutItems = new ArrayList<Block>(); this.timerThread = new Daemon(new PendingReplicationMonitor()); timerThread.start(); }
下面看下周期监控线程的实现:
class PendingReplicationMonitor implements Runnable { public void run() { //在namenode运行期间一直检测 while (fsRunning) { long period = Math.min(defaultRecheckInterval, timeout); try { pendingReplicationCheck(); Thread.sleep(period);//检测间隔 } catch (InterruptedException ie) { FSNamesystem.LOG.debug( "PendingReplicationMonitor threadreceived exception. " + ie); } } } /** *注意这里是一次性检测所有被复制的块,并且是线程安全的 */ void pendingReplicationCheck() { synchronized (pendingReplications) { Iterator iter = pendingReplications.entrySet().iterator(); long now = FSNamesystem.now(); FSNamesystem.LOG.debug("PendingReplicationMonitor checking Q"); while (iter.hasNext()) { Map.Entry entry = (Map.Entry)iter.next(); PendingBlockInfo pendingBlock =(PendingBlockInfo) entry.getValue(); if (now > pendingBlock.getTimeStamp() + timeout) { Block block = (Block)entry.getKey(); synchronized (timedOutItems) { timedOutItems.add(block);//如果复制超时,则加入timedOutItems } FSNamesystem.LOG.warn( "PendingReplicationMonitor timed outblock " + block); iter.remove(); } } } } }
在主类里主要维护了pendingReplications和timedOutItems两个数据结构,如果有新的块需要复制,则通过add和remove来操作pendingReplications,这两个函数都是线程安全的,每次操作时都会锁住整个链表,但因为这种操作并发量不是很大,所以也不会对性能造成太大影响。