再来看第二步:合并url,去重
先来看代码, mergeJob.setReducerClass(InjectReducer.class);
主要是InjectReducer这个类的reduce方法:
public void reduce(Text key, Iterator<CrawlDatum> values,
OutputCollector<Text, CrawlDatum> output, Reporter reporter)
throws IOException {
boolean oldSet = false;
while (values.hasNext()) {
CrawlDatum val = values.next();
if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
injected.set(val);
injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
} else {
old.set(val);
oldSet = true;
}
}
CrawlDatum res = null;
if (oldSet) res = old; // don't overwrite existing value
else res = injected;
output.collect(key, res);
}
去重判断很简单,通过状态的来表明这个url是否已经出现过,如果出现过那么则使用old,否则,把状态从STATUS_INJECTED改为STATUS_DB_UNFETCHED。
ruduce过程结束之后,inject方法调用了
CrawlDb.install(mergeJob, crawlDb);
将url以及元数据信息存入crawldb中,并删除旧的,更新。
install方法代码如下:
public static void install(JobConf job, Path crawlDb) throws IOException {
Path newCrawlDb = FileOutputFormat.getOutputPath(job);
FileSystem fs = new JobClient(job).getFs();
Path old = new Path(crawlDb, "old");
Path current = new Path(crawlDb, CURRENT_NAME); //目录crawldb/current
if (fs.exists(current)) { //如果current目录已经存在,那么把current目录改为old目录(当然如果old目录本身已经存在,那么先把old删除)
if (fs.exists(old)) fs.delete(old, true);
fs.rename(current, old);
}
fs.mkdirs(crawlDb);
fs.rename(newCrawlDb, current); //把新的reduce的数据的目录改为current目录,也即把数据更新了
if (fs.exists(old)) fs.delete(old, true); //删除old
Path lock = new Path(crawlDb, LOCK_NAME);
LockUtil.removeLockFile(fs, lock);
}