现在的位置: 首页 > 综合 > 正文

Hadoop DataNode启动之DiskChecker(一)

2013年03月21日 ⁄ 综合 ⁄ 共 2962字 ⁄ 字号 评论关闭

  DataNode启动时要进行磁盘检测,在这个检测过程中会检测目录是否存在,当前用户是否有足够的权限,能否对这个目录进行读写等,一旦有一个校验没有成功,那么DN是不能成功启动的,下面从DN启动开始,介绍下这个检测阶段。DN启动和NN的启动有很多相似的地方,也是从main函数启动,位于DataNode.java中。

首先获得数据目录的位置,在instantiateDataNode函数中

public static DataNode instantiateDataNode(String args[],
                                      Configuration conf, 
                                      SecureResources resources) throws IOException {
   if (conf == null)
   //创建配置实例
      conf = new Configuration();
    if (!parseArguments(args, conf)) {
      printUsage();
      return null;
    }
    if (conf.get("dfs.network.script") != null) {
      LOG.error("This configuration for rack identification is not supported" +
          " anymore. RackID resolution is handled by the NameNode.");
      System.exit(-1);
    }
    //获取数据目录
    String[] dataDirs = conf.getStrings(DATA_DIR_KEY);
    //定义DN线程名
    dnThreadName = "DataNode: [" +
                        StringUtils.arrayToString(dataDirs) + "]";
    //初始化Metric信息
    DefaultMetricsSystem.initialize("DataNode");
    //开始创建DN实例
    return makeInstance(dataDirs, conf, resources);
    }
然后进入makeInstance函数
public static DataNode makeInstance(String[] dataDirs, Configuration conf, 
      SecureResources resources) throws IOException {
    //创建UGI信息
    UserGroupInformation.setConfiguration(conf);
    //创建本地文件系统
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    ArrayList<File> dirs = new ArrayList<File>();
    FsPermission dataDirPermission = 
      new FsPermission(conf.get(DATA_DIR_PERMISSION_KEY, 
                                DEFAULT_DATA_DIR_PERMISSION));
    for (String dir : dataDirs) {
      try {
        DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);
        dirs.add(new File(dir));
      } catch(IOException e) {
        LOG.warn("Invalid directory in " + DATA_DIR_KEY +  ": " + 
                 e.getMessage());
      }
    }
    if (dirs.size() > 0) 
      return new DataNode(conf, dirs, resources);
    LOG.error("All directories in " + DATA_DIR_KEY + " are invalid.");
    return null;
  }

创建本地文件系统稍显复杂,首先会根据fs.file.impl获得文件系统类文件,然后通过反射的方式创建该类的文件系统,再进行初始化操作,生成统计信息等。看下主要函数

private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    //获取参数配置fs.file.impl
    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
    LOG.debug("Creating filesystem for " + uri);
    if (clazz == null) {
      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
    }
    //生成FileSystem的实例
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    //初始化文件系统,主要是构建统计信息类
    fs.initialize(uri, conf);
    return fs;
  }

下面看下生成实例的步骤,其实在前面的文章里已经介绍过了,反射、动态代理都是hadoop里的常用机制

 public static <T> T newInstance(Class<T> theClass, Configuration conf) {
    T result;
    try {
      //获得构造函数
      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
      if (meth == null) {
        //如果没有缓存该函数,则通过反射获得,并把其放入缓存中
        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
        meth.setAccessible(true);
        //加入缓存中,以备下次使用
        CONSTRUCTOR_CACHE.put(theClass, meth);
      }
      //构建实例
      result = meth.newInstance();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    setConf(result, conf);
    return result;
  }

创建完文件系统实例后就进行初始化,生成该文件系统的统计信息类,这些操作都在FileSystem类中

public void initialize(URI name, Configuration conf) throws IOException {
    statistics = getStatistics(name.getScheme(), getClass());    
  }


  public static synchronized 
  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
     //获得统计信息
    Statistics result = statisticsTable.get(cls);
    if (result == null) {
    //如果没有则根据URI的scheme重新创建,然后放入缓存中
      result = new Statistics(scheme);
      statisticsTable.put(cls, result);
    }
    return result;
  }


 

 

抱歉!评论已关闭.