DataNode启动时要进行磁盘检测,在这个检测过程中会检测目录是否存在,当前用户是否有足够的权限,能否对这个目录进行读写等,一旦有一个校验没有成功,那么DN是不能成功启动的,下面从DN启动开始,介绍下这个检测阶段。DN启动和NN的启动有很多相似的地方,也是从main函数启动,位于DataNode.java中。
首先获得数据目录的位置,在instantiateDataNode函数中
public static DataNode instantiateDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { if (conf == null) //创建配置实例 conf = new Configuration(); if (!parseArguments(args, conf)) { printUsage(); return null; } if (conf.get("dfs.network.script") != null) { LOG.error("This configuration for rack identification is not supported" + " anymore. RackID resolution is handled by the NameNode."); System.exit(-1); } //获取数据目录 String[] dataDirs = conf.getStrings(DATA_DIR_KEY); //定义DN线程名 dnThreadName = "DataNode: [" + StringUtils.arrayToString(dataDirs) + "]"; //初始化Metric信息 DefaultMetricsSystem.initialize("DataNode"); //开始创建DN实例 return makeInstance(dataDirs, conf, resources); }
然后进入makeInstance函数 public static DataNode makeInstance(String[] dataDirs, Configuration conf, SecureResources resources) throws IOException { //创建UGI信息 UserGroupInformation.setConfiguration(conf); //创建本地文件系统 LocalFileSystem localFS = FileSystem.getLocal(conf); ArrayList<File> dirs = new ArrayList<File>(); FsPermission dataDirPermission = new FsPermission(conf.get(DATA_DIR_PERMISSION_KEY, DEFAULT_DATA_DIR_PERMISSION)); for (String dir : dataDirs) { try { DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission); dirs.add(new File(dir)); } catch(IOException e) { LOG.warn("Invalid directory in " + DATA_DIR_KEY + ": " + e.getMessage()); } } if (dirs.size() > 0) return new DataNode(conf, dirs, resources); LOG.error("All directories in " + DATA_DIR_KEY + " are invalid."); return null; }
创建本地文件系统稍显复杂,首先会根据fs.file.impl获得文件系统类文件,然后通过反射的方式创建该类的文件系统,再进行初始化操作,生成统计信息等。看下主要函数
private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException { //获取参数配置fs.file.impl Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); LOG.debug("Creating filesystem for " + uri); if (clazz == null) { throw new IOException("No FileSystem for scheme: " + uri.getScheme()); } //生成FileSystem的实例 FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); //初始化文件系统,主要是构建统计信息类 fs.initialize(uri, conf); return fs; }
下面看下生成实例的步骤,其实在前面的文章里已经介绍过了,反射、动态代理都是hadoop里的常用机制
public static <T> T newInstance(Class<T> theClass, Configuration conf) { T result; try { //获得构造函数 Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); if (meth == null) { //如果没有缓存该函数,则通过反射获得,并把其放入缓存中 meth = theClass.getDeclaredConstructor(EMPTY_ARRAY); meth.setAccessible(true); //加入缓存中,以备下次使用 CONSTRUCTOR_CACHE.put(theClass, meth); } //构建实例 result = meth.newInstance(); } catch (Exception e) { throw new RuntimeException(e); } setConf(result, conf); return result; }
创建完文件系统实例后就进行初始化,生成该文件系统的统计信息类,这些操作都在FileSystem类中
public void initialize(URI name, Configuration conf) throws IOException { statistics = getStatistics(name.getScheme(), getClass()); } public static synchronized Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) { //获得统计信息 Statistics result = statisticsTable.get(cls); if (result == null) { //如果没有则根据URI的scheme重新创建,然后放入缓存中 result = new Statistics(scheme); statisticsTable.put(cls, result); } return result; }