现在的位置: 首页 > 综合 > 正文

Nutch抓取源码分析之Crawl类

2013年08月29日 ⁄ 综合 ⁄ 共 3174字 ⁄ 字号 评论关闭

1、初始化一个Configuration实例设置抓取工作配置;

2、设置一些默认抓取工作参数,比如进程数threads、抓取深度depth、抓取网页数量topN;

3、创建抓取工作抓取到的网页文件的存放目录(crawldb、linkdb、segments、indexes、index),用来存放原生网页,以及网页解析出的文本内容及其其它相关数据;

4、在抓取工作及其索引过程中,这通过初始化一些实现了这些操作的类的实例来完成的,例如:InjectorGeneratorFetcherParseSegment、CrawlDb、LinkDb、Indexer、DeleteDuplicates、IndexMerger。

5、最后,就开始执行相关操作了,包括初始化CrawlDb、生成抓取工作列表、抓取网页文件、更新CrawlDb、倒排Links、建立索引、复制索引文件、合并索引文件。

//初始化配置文件
    Configuration conf = NutchConfiguration.createCrawlConfiguration();
    JobConf job = new NutchJob(conf);
// urls存放目录
    Path rootUrlDir = null;
Path dir = new Path("crawl-" + getDate());
//抓取线程数目
int threads = job.getInt("fetcher.threads.fetch", 10);
//抓取工作遍历深度
    int depth = 5;
    long topN = Long.MAX_VALUE;
    String indexerName = "lucene";
    String solrUrl = null;
    
    for (int i = 0; i < args.length; i++) {
      if ("-dir".equals(args[i])) {
        dir = new Path(args[i+1]);
        i++;
      } else if ("-threads".equals(args[i])) {
        threads = Integer.parseInt(args[i+1]);
        i++;
      } else if ("-depth".equals(args[i])) {
        depth = Integer.parseInt(args[i+1]);
        i++;
      } else if ("-topN".equals(args[i])) {
          topN = Integer.parseInt(args[i+1]);
          i++;
      } else if ("-solr".equals(args[i])) {
        indexerName = "solr";
        solrUrl = StringUtils.lowerCase(args[i + 1]);
        i++;
      } else if (args[i] != null) {
        rootUrlDir = new Path(args[i]);
      }
    }
FileSystem fs = FileSystem.get(job);
//抓取工作的不同操作需要的五个目录
    Path crawlDb = new Path(dir + "/crawldb");
    Path linkDb = new Path(dir + "/linkdb");
    Path segments = new Path(dir + "/segments");
    Path indexes = new Path(dir + "/indexes");
    Path index = new Path(dir + "/index");
    Path tmpDir = job.getLocalPath("crawl"+Path.SEPARATOR+getDate());
    Injector injector = new Injector(conf);
    Generator generator = new Generator(conf);
    Fetcher fetcher = new Fetcher(conf);
    ParseSegment parseSegment = new ParseSegment(conf);
    CrawlDb crawlDbTool = new CrawlDb(conf);
    LinkDb linkDbTool = new LinkDb(conf);
    // 初始化crawlDb
    injector.inject(crawlDb, rootUrlDir);
    int i;
for (i = 0; i < depth; i++) {             // generate new segment
//生成抓取工作列表,文件名以时间挫命名
      Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
          .currentTimeMillis());
      if (segs == null) {
        LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
        break;
      }
      //抓取网页内容
      fetcher.fetch(segs[0], threads, org.apache.nutch.fetcher.Fetcher.isParsing(conf));  // fetch it
      if (!Fetcher.isParsing(job)) {
//解析网页内容
        parseSegment.parse(segs[0]);    // parse it, if needed
      }
//更新CrawlDB
      crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
}

    if (i > 0) {
      linkDbTool.invert(linkDb, segments, true, true, false); // invert links
      // index, dedup & merge
      FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs));
      if (isSolrIndex) {
        SolrIndexer indexer = new SolrIndexer(conf);
        indexer.indexSolr(solrUrl, crawlDb, linkDb, 
            Arrays.asList(HadoopFSUtil.getPaths(fstats)));
      }
      else {
        DeleteDuplicates dedup = new DeleteDuplicates(conf);        
        if(indexes != null) {
          // Delete old indexes
          if (fs.exists(indexes)) {
            LOG.info("Deleting old indexes: " + indexes);
            fs.delete(indexes, true);
          }
          // Delete old index
          if (fs.exists(index)) {
            LOG.info("Deleting old merged index: " + index);
            fs.delete(index, true);
          }
        }
        //索引过程
        Indexer indexer = new Indexer(conf);
        indexer.index(indexes, crawlDb, linkDb, 
            Arrays.asList(HadoopFSUtil.getPaths(fstats)));
        
        IndexMerger merger = new IndexMerger(conf);
        if(indexes != null) {
          //复制索引文件
          dedup.dedup(new Path[] { indexes });
          fstats = fs.listStatus(indexes, HadoopFSUtil.getPassDirectoriesFilter(fs));
          //将索引目录index 中的索引文件合并后写入到indexes 目录中
          merger.merge(HadoopFSUtil.getPaths(fstats), index, tmpDir);
        }
      }    
    } 

 

抱歉!评论已关闭.