Hadoop集群中,不同进程之间通信需要使用合适的协议才能够进行交互,之前对Hadoop给出的协议接口做了分析。在协议接口中约定了通信双方的特定行为,那么,在实现这些通信协议的实现类中,就能看到指定进程是如何实现协议接口中约定的行为的。这里,阅读分析org.apache.hadoop.hdfs.server.namenode.Namenode实现类。
首先,看一下Namenode类实现的接口,下面是该类声明:
可以看到,Namenode主要实现了ClientProtocol,DatanodeProtocol,NamenodeProtocol这三个用来通信的协议。其中,RefreshAuthorizationPolicyProtocol接口是定义所使用的认证策略,并能根据不同的应用场景来自动刷新其级别,以适应实际应用的需要。
Namenode是HDFS集群中的中心服务器,对于服务器的配置选项,可以通过加载配置文件来进行配置,所以该类中有如下加载配置资源的两行代码:
Namenode类中定义属性如下:
static NameNodeMetrics myMetrics; // 用来维护Namenode服务器活动状况的统计数据实体
对上面几个重要的属性简单说明一下。
FSNamesystem也是org.apache.hadoop.hdfs.server.namenode包中的类,在Namenode类中是核心的、最重要的。该类主要的功能是对Datanode结点的一些状态等进行登记,便于Namenode进程能够快速获取到指定的Datanode结点的状态等的详细信息,以便进行任务的调度与分配。它主要跟踪如下几个重要的数据表:
1)文件系统到块的映射表(有效文件系统名字-->块列表);
2)全部有效块的列表;
3)块到主机的映射表(块-->主机列表);
4)主机到块的映射表(主机-->块列表);
5)LRU缓存(已经更新心跳状态的主机都放到LRU Cache中)。
HttpServer在org.apache.hadoop.http包中,提供了将Jetty服务器内置于Namenode服务器中,以便可以通过HTTP请求的方式来获取当前Namenode提供服务的信息。
Namenode提供了一个执行初始化的方法,如下所示:
if (serviceAuthEnabled = conf.getBoolean(erviceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
PolicyProvider policyProvider =
(PolicyProvider)(ReflectionUtils.newInstance(conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, HDFSPolicyProvider.class, PolicyProvider.class), conf));
SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider)); // 设置安全策略
}
this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf); // 创建RPC服务器
this.serverAddress = this.server.getListenerAddress();
FileSystem.setDefaultUri(conf, getUri(serverAddress));
LOG.info("Namenode up at: " + this.serverAddress);
myMetrics = new NameNodeMetrics(conf, this); // 初始化NameNodeMetrics
this.namesystem = new FSNamesystem(this, conf);
startHttpServer(conf);
this.server.start(); // 启动RPC服务器
startTrashEmptier(conf);
}
在Namenode类实现中,其中实现的操作基本上都是由FSNamesystem来完成的。在这里,我们先不关心Namenode具体是如何实现那些基本操作的,而只是关注Namenode的功能特性,在后面再对FSNamesystem类进行分析。这里使用方式就是,根据Namenode所实现的接口中定义的操作,来分析Namenode服务器所具备的基本功能,或者说提供的基本服务。
NamenodeProtocol实现
在Namenode类中,实现了NamenodeProtocol协议接口中定义的getBlocks方法,如下所示:
给定DatanodeInfo datanode,它是一个描述Datanode的状态的实体类对象,通过getBlocks方法,可以获取到总的块大小为size的BlocksWithLocations,即描述这些块的位置信息的BlocksWithLocations对象。
ClientProtocol实现
实现ClientProtocol协议接口中定义的方法,如下所示:
/**
* 在制定的文件系统命名空间中创建一个文件入口(entry)
*/
public void create(String src, FsPermission masked, String clientName, boolean overwrite, short replication, long blockSize) throws IOException {
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file " +src+" for "+clientName+" at "+clientMachine);
}
if (!checkPathLength(src)) {
throw new IOException("create: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.startFile(src,
new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
null, masked),
clientName, clientMachine, overwrite, replication, blockSize); // 调用:执行文件的创建操作
myMetrics.numFilesCreated.inc();
myMetrics.numCreateFileOps.inc();
}
/**
* 对指定文件执行追加写操作
*/
public LocatedBlock append(String src, String clientName) throws IOException {
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.append: file " +src+" for "+clientName+" at "+clientMachine);
}
LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine); // 调用:执行文件的追加写操作
myMetrics.numFilesAppended.inc();
return info;
}
/**
* 设置副本因子
*/
public boolean setReplication(String src, short replication) throws IOException {
return namesystem.setReplication(src, replication); // 调用:设置副本因子
}
/**
* 设置权限
*/
public void setPermission(String src, FsPermission permissions) throws IOException {
namesystem.setPermission(src, permissions); // 调用:设置权限
}
/**
* 设置文件属主
*/
public void setOwner(String src, String username, String groupname) throws IOException {
namesystem.setOwner(src, username, groupname); // 调用:设置文件属主
}
/**
* 向指定的文件中写入数据块
*/
public LocatedBlock addBlock(String src, String clientName) throws IOException {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " +src+" for "+clientName);
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName); // 调用:获取并执行写入操作
if (locatedBlock != null)
myMetrics.numAddBlockOps.inc();
return locatedBlock;
}
/**
* 客户端放弃对指定块的操作
*/
public void abandonBlock(Block b, String src, String holder) throws IOException {
stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: " +b+" of file "+src);
if (!namesystem.abandonBlock(b, src, holder)) { // 调用
throw new IOException("Cannot abandon block during write to " + src);
}
}
/**
* 客户端完成对指定文件的写操作
*/
public boolean complete(String src, String clientName) throws IOException {
stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName);
CompleteFileStatus returnCode = namesystem.completeFile(src, clientName); // 调用:执行写操作,写完成之后关闭该文件流
if (returnCode == CompleteFileStatus.STILL_WAITING) {
return false;
} else if (returnCode == CompleteFileStatus.COMPLETE_SUCCESS) {
return true;
} else {
throw new IOException("Could not complete write to file " + src + " by " + clientName);
}
}
/**
* 客户端向Namenode报告corrupted块的信息
*/
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
for (int i = 0; i < blocks.length; i++) {
Block blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
for (int j = 0; j < nodes.length; j++) {
DatanodeInfo dn = nodes[j];
namesystem.markBlockAsCorrupt(blk, dn); // 调用:标记属于该Datanode的corrupted状态的块
}
}
}
/**
* 重命名文件
*/
public boolean rename(String src, String dst) throws IOException {
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
if (!checkPathLength(dst)) {
throw new IOException("rename: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
boolean ret = namesystem.renameTo(src, dst); // 调用:重命名文件
if (ret) {
myMetrics.numFilesRenamed.inc();
}
return ret;
}
/**
* 删除文件
*/
public boolean delete(String src, boolean recursive) throws IOException {
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src + ", recursive=" + recursive);
}
boolean ret = namesystem.delete(src, recursive); // 调用:删除文件
if (ret)
myMetrics.numDeleteFileOps.inc();
return ret;
}
/**
* 管理客户端状态,如果Namenode在一段时间内接收不到客户端心跳状态,标记为挂掉;经过周期检测,又获取到对应客户端心跳状态,为其解锁。
*/
public void renewLease(String clientName) throws IOException {
namesystem.renewLease(clientName); // 调用
}
/**
* 获取指定目录下文件列表
*/
public FileStatus[] getListing(String src) throws IOException {
FileStatus[] files = namesystem.getListing(src); // 调用
if (files != null) {
myMetrics.numGetListingOps.inc();
}
return files;
}
/**
* 获取指定文件的状态信息
*/
public FileStatus getFileInfo(String src) throws IOException {
return namesystem.getFileInfo(src); // 调用
}
/**
* 获取文件系统的统计信息
*/
public long[] getStats() throws IOException {
return namesystem.getStats(); // 调用
}
/**
* 创建目录
*/
public boolean mkdirs(String src, FsPermission masked) throws IOException {
stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
if (!checkPathLength(src)) {
throw new IOException("mkdirs: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
return namesystem.mkdirs(src,
new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
null, masked)); // 调用
}
/**
* 设置安全模式
*/
public boolean setSafeMode(SafeModeAction action) throws IOException {
return namesystem.setSafeMode(action); // 调用
}
/**
* 保存命名空间映像
*/
public void saveNamespace() throws IOException {
namesystem.saveNamespace(); // 调用
}
/**
* 刷新Datanode结点的列表
*/
public void refreshNodes() throws IOException {
namesystem.refreshNodes(new Configuration()); // 调用
}
/**
* 完成升级操作(删除在升级期间保存的文件系统的状态,一旦执行该方法,改变将不可逆转)
*/
public void finalizeUpgrade() throws IOException {
namesystem.finalizeUpgrade(); // 调用
}
/**
* 报告升级进度
*/
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException {
return namesystem.distributedUpgradeProgress(action); // 调用
}
/**
* 保存Namenode的状态信息,写入到指定的文件中
*/
public void metaSave(String filename) throws IOException {
namesystem.metaSave(filename); // 调用
}
/**
* 设置指定文件或目录的内容摘要信息
*/
public ContentSummary getContentSummary(String path) throws IOException {
return namesystem.getContentSummary(path); // 调用
}
/**
* 设置指定目录的配额
*/
public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException {
namesystem.setQuota(path, namespaceQuota, diskspaceQuota); // 调用
}
/**
* 将指定文件的元数据写入到持久存储中
*/
public void fsync(String src, String clientName) throws IOException {
namesystem.fsync(src, clientName); // 调用
}
/**
* 设置文件修改时间
*/
public void setTimes(String src, long mtime, long atime) throws IOException {
namesystem.setTimes(src, mtime, atime); // 调用:设置文件修改时间
}
DatanodeProtocol实现
Namenode需要与Datanode进行通信,所以必须实现与DatanodeProtocol协议接口。
下面是对DatanodeProtocol协议接口中定义的基本操作的实现:
/**
* Datanode向Namenode发送心跳状态报告,显示其当前状态
*/
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
long capacity,
long dfsUsed,
long remaining,
int xmitsInProgress,
int xceiverCount) throws IOException {
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining, xceiverCount, xmitsInProgress); // 调用:处理Datanode发送的心跳信息
}
/**
* 指定Datanode向Namenode报告其上的块状态报告
*/
public DatanodeCommand blockReport(DatanodeRegistration nodeReg, long[] blocks) throws IOException {
verifyRequest(nodeReg); // 验证Datanode向Namenode请求的合法性
BlockListAsLongs blist = new BlockListAsLongs(blocks);
stateChangeLog.debug("*BLOCK* NameNode.blockReport: " +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" blocks");
namesystem.processReport(nodeReg, blist); // 调用:处理块状态报告
if (getFSImage().isUpgradeFinalized())
return DatanodeCommand.FINALIZE;
return null;
}
/**
* Datanode向Namenode报告最近接收到的数据块,以及对数据块的操作信息
*/
public void blockReceived(DatanodeRegistration nodeReg, Block blocks[], String delHints[]) throws IOException {
verifyRequest(nodeReg); // 验证请求是否合法
stateChangeLog.debug("*BLOCK* NameNode.blockReceived: " +"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
for (int i = 0; i < blocks.length; i++) {
namesystem.blockReceived(nodeReg, blocks[i], delHints[i]); // 调用
}
}
/**
* 报告错误信息
*/
public void errorReport(DatanodeRegistration nodeReg, int errorCode, String msg) throws IOException {
String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName());
LOG.info("Error report from " + dnName + ": " + msg); // 将来自指定的Datanode的出错信息登录到日志文件中
if (errorCode == DatanodeProtocol.NOTIFY) {
return;
}
verifyRequest(nodeReg);
if (errorCode == DatanodeProtocol.DISK_ERROR) { // 如果是对应的Datanode出现了磁盘错误
namesystem.removeDatanode(nodeReg); // 调用:删除该Datanode的描述信息
}
}
/**
* 在分布式系统升级过程中,向Namenode发送命令
*/
public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
return namesystem.processDistributedUpgradeCommand(comm); // 调用
}
其它实现
用于Namenode与客户端通信,还定义了如下几个与Namenode管理文件系统命名空间相关的操作,主要是对EditLog和FsImage文件的操作:
/**
* 回滚EditLog文件
*/
public CheckpointSignature rollEditLog() throws IOException {
return namesystem.rollEditLog(); // 调用
}
/**
* 回滚FsImage映像文件
*/
public void rollFsImage() throws IOException {
namesystem.rollFSImage(); // 调用
}
/**
* 获取FsImage映像文件名称
*/
public File getFsImageName() throws IOException {
return getFSImage().getFsImageName();
}
/**
* 获取FsImage映像文件
*/
public FSImage getFSImage() {
return namesystem.dir.fsImage;
}
/**
* 获取到FsImage映像文件的所有检查点
*/
public File[] getFsImageNameCheckpoint() throws IOException {
return getFSImage().getFsImageNameCheckpoint();
}
在Namenode上,对文件系统名字空间进行的任何操作,Hadoop采用记录事务日志的方式来保存这些操作的记录,对应的事务日志文件就是EditLog文件,该文件存放在磁盘上。而FsImage映像文件,是Hadoop集群工作的过程中使用的,便于Namenode进程管理文件系统的命名空间。当Namenode启动的时候,会首先读取EditLog与FsImage文件,并将EditLog作用于FsImage,因为此时FsImage文件一定是最新的事务日志记录文件的映像,并将EditLog文件删除(因为它可能会在Hadoop集群工作过程中变成旧的文件,也就是不是最新的事务记录);在Namenode启动之后工作的过程中,仍然会有对文件系统命名空间执行操作的事务记录,这些记录都被保存到一个新的EditLog事务日志文件中,以便下次启动Namenode的时候,将最新的事务日志记录作用于FsImage上。
另外,Namenode类中实现了创建一个Namenode实例的方法createNameNode,该方法是static的,通过命令行参数一个Hadoop配置类实例来创建一个Namenode实例,方法实现如下所示:
switch (startOpt) { // 根据设置的参数选项,进行处理
case FORMAT:
boolean aborted = format(conf, true);
System.exit(aborted ? 1 : 0);
case FINALIZE:
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);
default:
}
NameNode namenode = new NameNode(conf); // 构造一个Namenode实例
return namenode;
}
关于配置启动选项,可以查看org.apache.hadoop.hdfs.server.common.HdfsConstants接口中定义了枚举类StartupOption,如下所示:
private String name = null;
private StartupOption(String arg) {this.name = arg;}
public String getName() {return name;}
}
上面创建一个Namenode,在解析完命令行参数后,得到一个枚举类实例,然后根据它来设置Namenode启动选项,如下所示:
接着,在构造Namenode实例之前,根据设置的附加启动选项,做一个预处理(格式化或某次升级之后的确认动作),如果是格式化操作,具体操作为:
FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat, editDirsToFormat), conf); // 基于构造的FsImage文件实例来创建文件系统命名空间
nsys.dir.fsImage.format(); // 对FsImage文件进行格式化
return false;
}
关于FsImage与EditLog在文件系统中的所在位置的选择,在org.apache.hadoop.hdfs.server.namenode.FSNamesystem类中可以看到详细的情况。
如果是确认升级完成命令行,则实现如下所示:
关于FsImage文件的操作,可以查看org.apache.hadoop.hdfs.server.namenode.FSImage类的具体实现。在后面会专门对FSImage类进行详细阅读分析的。
下面,对Namenode类的实现做一个总结:
从Namenode实现类来看,它主要是实现了Namenode服务器进程与Datanode进程、文件系统客户端进程,以及Secondary NameNode进程进行交互过程中一些重要的基本的操作,具体表现为,Namenode实现了ClientProtocol协议来与客户端交互,实现了DatanodeProtocol协议来与Datanode进行交互,实现了NamenodeProtocol协议来与Secondary NameNode进行交互。而且,该类还给出了一个static方法,通过命令行的方式用来构造一个Namenode实例,并启动Namenode服务器进程。