现在的位置: 首页 > 综合 > 正文

Apache Nutch 1.3 学习笔记五(Fetcher流程)

2012年01月16日 ⁄ 综合 ⁄ 共 8447字 ⁄ 字号 评论关闭

 

1. Fetcher模块的简单介绍

Fetcher这个模块在Nutch中有单独一个包在实现,在org.apache.nutch.fetcher,其中有Fetcher.java, FetcherOutput FetcherOutputFormat来组成,看上去很简单,但其中使用到了多线程,多线程的生产者与消费者模型,MapReduce的多路径输出等方法。

下面我们来看一下Fetcher的注释,从中我们可以得到很多有用的信息。
首先,这是一种基于队列的fetcher方法,它使用了一种经典的线程模型,生产者(a-QueueFeeder)与消费者(many-FetcherThread)模型,注意,这里有多个消费者。生产者从Generate产生的fetchlists中分类得到一批FetchItemQueue,每一个FetchItmeQueue都是由一类相同hostFetchItem组成,这些FetchItem是用来描述被抓取的对象。当一个FetchItemFetchItemQueue中取出后,QueueFeeder这个生产者会不断的向队列中加入新的FetchItem,直到这个队列满了为止或者已经没有fetchlist可读取,当队列中的所有FetchItem都被抓取完成后,所有抓取线程都会退出运行。每一个FetchItemQueue都有一套自己的抓取策略,如最大的并行抓取个数,两次抓取的间隔等,如果当FetcherThread向队列申请一个FetchItem时,FetchItemQueue发现当前的FetchItem没有满足抓取策略,那这里它就会返回null,表达当前FetchItem还没有准备好被抓取。如果这些所有FetchItem都没有准备好被抓取,那这时FetchThread就会进入等待状态,直到条件满足被促发或者是等待超时,它会认为任务已经被挂起,这时FetchThread会自动退出。

2. FetcherOutputFormat的介绍

这个类是用来把FetcherOutput对象切分到不同的Map文件中的,也就是说它会根据对象的类型来判断输出到哪一个文件中,这里用到了一个多文件的输出。
FetcherOutputFormat
继承自MapReduce框架的OutputFormat模板,其输出的<key,value>类型为<Text,NutchWritable>
这里的OutputFormat定义了Map-Reduce任务的输出描述,Map-Reduce框架依赖任务的OutputFormat来做如下二件事情,一是用来验证输出源的可用性,如是否已经建立了相应的目录,数据库是否已经连上;另一件事是提供RecordWriter抽象来对数据进行写出到特定的数据源,一般输出文件定义在FileSystem里面。

FetcherOutputFormat主要是实现了getRecordWriter这个方法,用于得到相应的数据写出对象,我们来分析一下其源代码:

 

  1. public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,  
  2.                                       final JobConf job,  
  3.                                       final String name,  
  4.                                       final Progressable progress) throws IOException {  
  5.     // 定义输出目录  
  6.     Path out = FileOutputFormat.getOutputPath(job);  
  7.     // 定义抓取的输出目录  
  8.     final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);  
  9.     // 定义抓取内容的输出目录  
  10.     final Path content = new Path(new Path(out, Content.DIR_NAME), name);  
  11.     // 定义数据压缩格式  
  12.     final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);  
  13.     
  14.     
  15.     // 定义抓取的输出抽象类  
  16.     final MapFile.Writer fetchOut =  
  17.       new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,  
  18.           compType, progress);  
  19.         
  20.     // 这里使用了inner class来定义相应的RecordWriter  
  21.     return new RecordWriter<Text, NutchWritable>() {  
  22.         private MapFile.Writer contentOut;  
  23.         private RecordWriter<Text, Parse> parseOut;  
  24.     
  25.     
  26.         {  
  27.             // 这里看如果Fetcher定义了输出内容,就生成相应的Content输出抽象  
  28.           if (Fetcher.isStoringContent(job)) {  
  29.             contentOut = new MapFile.Writer(job, fs, content.toString(),  
  30.                                             Text.class, Content.class,  
  31.                                             compType, progress);  
  32.           }  
  33.             // 如果Fetcher对抓取的内容进行了解析,这里就定义相应的解析输出抽象  
  34.             // 注意这里使用了ParseOutputFormatgetReocrdWriter,主要是解析网页,抽取其外链接  
  35.           if (Fetcher.isParsing(job)) {  
  36.             parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);  
  37.           }  
  38.         }  
  39.     
  40.     
  41.         public void write(Text key, NutchWritable value)  
  42.           throws IOException {  
  43.     
  44.     
  45.           Writable w = value.get();  
  46.           // 对对象类型进行判断,调用相应的抽象输出,写到不同的文件中去  
  47.           if (w instanceof CrawlDatum)  
  48.             fetchOut.append(key, w);  
  49.           else if (w instanceof Content)  
  50.             contentOut.append(key, w);  
  51.           else if (w instanceof Parse)  
  52.             parseOut.write(key, (Parse)w);  
  53.         }  
  54.     
  55.     
  56.         public void close(Reporter reporter) throws IOException {  
  57.           fetchOut.close();  
  58.           if (contentOut != null) {  
  59.             contentOut.close();  
  60.           }  
  61.           if (parseOut != null) {  
  62.             parseOut.close(reporter);  
  63.           }  
  64.         }  
  65.     
  66.     
  67.       };  

 

3. 生产者QueueFeeder的介绍

这个类作用是用于生产被抓取的FetchItem对象,把其放入抓取队列中。下来我们来对其源代码进行分析

 

  1. // 这个类继承自Thread,是用一个单独的线程来做的  
  2. private static class QueueFeeder extends Thread {  
  3.    private RecordReader<Text, CrawlDatum> reader;  // 这里是InputFormat产生的ReocrdReader,用于读取Generate的产生的数据  
  4.    private FetchItemQueues queues;                 // 这是生产者与消费者所使用的共享队列,这个对列是分层的,分一层对应一个host  
  5.    private int size;                               // 队列的大小  
  6.    private long timelimit = -1;                 // 这是一个过滤机制的策略,用于过滤所有的FetchItem  
  7.        
  8. // 构造方法  
  9.    public QueueFeeder(RecordReader<Text, CrawlDatum> reader,  
  10.        FetchItemQueues queues, int size) {  
  11.      this.reader = reader;  
  12.      this.queues = queues;  
  13.      this.size = size;  
  14.      this.setDaemon(true);  
  15.      this.setName("QueueFeeder");  
  16.    }  
  17.        
  18.    public void setTimeLimit(long tl) {  
  19.      timelimit = tl;  
  20.    }  
  21.     
  22.     
  23. // 函数的run方法  
  24.    public void run() {  
  25.      boolean hasMore = true;    // while的循环条件  
  26.      int cnt = 0;  
  27.      int timelimitcount = 0;  
  28.      while (hasMore) {  
  29.       // 这里判断是否设置了这个过滤机制,如果设置了,判断相前时间是否大于这个timelimit,如果大于timelimit,过滤所有的FetchItem  
  30.        if (System.currentTimeMillis() >= timelimit && timelimit != -1) {  
  31.          // enough .. lets' simply  
  32.          // read all the entries from the input without processing them  
  33.          try {  
  34.           // 读出<key,value>对,过滤之  
  35.            Text url = new Text();  
  36.            CrawlDatum datum = new CrawlDatum();  
  37.            hasMore = reader.next(url, datum);  
  38.            timelimitcount++;  
  39.          } catch (IOException e) {  
  40.            LOG.fatal("QueueFeeder error reading input, record " + cnt, e);  
  41.            return;  
  42.          }  
  43.          continue; // 过滤之  
  44.        }  
  45.        int feed = size - queues.getTotalSize();  
  46.     // 判断剩余的队列空间是否为0  
  47.        if (feed <= 0) {  
  48.          // queues are full - spin-wait until they have some free space  
  49.          try {  
  50.           // 休息1秒种  
  51.            Thread.sleep(1000);  
  52.          } catch (Exception e) {};  
  53.          continue;  
  54.        } else {  
  55.          LOG.debug("-feeding " + feed + " input urls ...");  
  56.       // 如果队列还有空间(feed>0)并且recordRedder中还有数据(hasMore)  
  57.          while (feed > 0 && hasMore) {  
  58.            try {  
  59.              Text url = new Text();  
  60.              CrawlDatum datum = new CrawlDatum();  
  61.           // 读出<key,value>  
  62.              hasMore = reader.next(url, datum);  
  63.              if (hasMore) {  // 判断是否成功读出数据  
  64.                queues.addFetchItem(url, datum); // 放入对列,这个队列应该是thread-safe的,下面我们可以看到  
  65.                cnt++;    // 统计总数  
  66.                feed--;   // 剩余队列空间减1  
  67.              }  
  68.            } catch (IOException e) {  
  69.              LOG.fatal("QueueFeeder error reading input, record " + cnt, e);  
  70.              return;  
  71.            }  
  72.          }  
  73.        }  
  74.      }  
  75.      LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :"  
  76.          + timelimitcount);  
  77.    }  
  78.  }  


 
这个类主要负责向队列中放数据。

4. 下面我们来看一下这个队列是如果工作的

这里的共享对列主要如三个类组成,一个是FetchItem,存储队列中的元素;另一个是FetchItemQueue,用于存储相同hostFetchItem,最后一个是FetchItemQueues,看名字我们就知道,这是用于存储所有的FetchItemQueue的。

4.1 先让我们来看一下FetchItem的结构:

 

  1. FetchItem =>   
  2. {  
  3.     queueID:String,     // 用于存储队列的ID  
  4.     url:Text,           // 用于存储CrawlDatumurl地址  
  5.     u:URL,              //  也是存储url,但是以URL的类型来存储,不过我看了一下,这东东在判断RobotRules的时候用了一下  
  6.     datum:CrawlDatum    // 这是存储抓取对象的一些元数据信息àà  
  7. }  


下面我们来看一下它的create方法,是用来生成相应的FetchItem的,源代码如下:

 

  1. //从注释中我们可以看到,队列ID是由protocol+hotname或者是protocol+IP组成的  
  2. /** Create an item. Queue id will be created based on <code>byIP</code>  
  3.     * argument, either as a protocol + hostname pair, or protocol + IP  
  4.     * address pair.  
  5.     */  
  6.    public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {  
  7.      String queueID;  
  8.      URL u = null;  
  9.      try {  
  10.        u = new URL(url.toString());    // 得到其URL  
  11.      } catch (Exception e) {  
  12.        LOG.warn("Cannot parse url: " + url, e);  
  13.        return null;  
  14.      }  
  15.   // 得到协议号  
  16.      String proto = u.getProtocol().toLowerCase();  
  17.      String host;  
  18.      if (byIP) {  
  19.       // 如果是基于IP的,那得到其IP地址  
  20.        try {  
  21.          InetAddress addr = InetAddress.getByName(u.getHost());  
  22.          host = addr.getHostAddress();  
  23.        } catch (UnknownHostException e) {  
  24.          // unable to resolve it, so don't fall back to host name  
  25.          LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");  
  26.          return null;  
  27.        }  
  28.      } else {  
  29.       // 否则得到Hostname  
  30.        host = u.getHost();  
  31.        if (host == null) {  
  32.          LOG.warn("Unknown host for url: " + url + ", skipping.");  
  33.          return null;  
  34.        }  
  35.        hosthost = host.toLowerCase(); // 统一变小写  
  36.      }  
  37.   // 得成相应的队列ID号,放入FetchItemQueue  
  38.      queueID = proto + "://" + host;  
  39.      return new FetchItem(url, u, datum, queueID);  
  40.    }  

 

4.2 下面我们来看一下FetchQueue的组成结构

这个类主要是用于收集相同QueueIDFetchItem对象,对正在抓取的FetchItem进行跟踪,使用的是一个inProgress集合,还有计算两次请求的间隔时间,我们来看一下其结构:

 

  1. FetchQueue =>  
  2.     {  

抱歉!评论已关闭.