现在的位置: 首页 > 综合 > 正文

nutch源码分析1——inject(续续)

2013年11月17日 ⁄ 综合 ⁄ 共 1582字 ⁄ 字号 评论关闭

再来看第二步:合并url,去重

先来看代码, mergeJob.setReducerClass(InjectReducer.class);

主要是InjectReducer这个类的reduce方法:

  public void reduce(Text key, Iterator<CrawlDatum> values,
                       OutputCollector<Text, CrawlDatum> output, Reporter reporter)
      throws IOException {
      boolean oldSet = false;
      while (values.hasNext()) {
        CrawlDatum val = values.next();
        if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
          injected.set(val);
          injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
        } else {
          old.set(val);
          oldSet = true;
        }

      }
      CrawlDatum res = null;
      if (oldSet) res = old; // don't overwrite existing value
      else res = injected;

      output.collect(key, res);
    }

去重判断很简单,通过状态的来表明这个url是否已经出现过,如果出现过那么则使用old,否则,把状态从STATUS_INJECTED改为STATUS_DB_UNFETCHED

ruduce过程结束之后,inject方法调用了

    CrawlDb.install(mergeJob, crawlDb);

将url以及元数据信息存入crawldb中,并删除旧的,更新。

install方法代码如下:

  public static void install(JobConf job, Path crawlDb) throws IOException {
    Path newCrawlDb = FileOutputFormat.getOutputPath(job);
    FileSystem fs = new JobClient(job).getFs();
    Path old = new Path(crawlDb, "old");
    Path current = new Path(crawlDb, CURRENT_NAME);             //目录crawldb/current
    if (fs.exists(current)) {                                        //如果current目录已经存在,那么把current目录改为old目录(当然如果old目录本身已经存在,那么先把old删除)
      if (fs.exists(old)) fs.delete(old, true);
      fs.rename(current, old);
    }
    fs.mkdirs(crawlDb);
    fs.rename(newCrawlDb, current);            //把新的reduce的数据的目录改为current目录,也即把数据更新了
    if (fs.exists(old)) fs.delete(old, true);         //删除old
    Path lock = new Path(crawlDb, LOCK_NAME);
    LockUtil.removeLockFile(fs, lock);
  }

抱歉!评论已关闭.