1. Fetcher模块的简单介绍
Fetcher这个模块在Nutch中有单独一个包在实现,在org.apache.nutch.fetcher,其中有Fetcher.java, FetcherOutput 和FetcherOutputFormat来组成,看上去很简单,但其中使用到了多线程,多线程的生产者与消费者模型,MapReduce的多路径输出等方法。
下面我们来看一下Fetcher的注释,从中我们可以得到很多有用的信息。
首先,这是一种基于队列的fetcher方法,它使用了一种经典的线程模型,生产者(a-QueueFeeder)与消费者(many-FetcherThread)模型,注意,这里有多个消费者。生产者从Generate产生的fetchlists中分类得到一批FetchItemQueue,每一个FetchItmeQueue都是由一类相同host的FetchItem组成,这些FetchItem是用来描述被抓取的对象。当一个FetchItem从FetchItemQueue中取出后,QueueFeeder这个生产者会不断的向队列中加入新的FetchItem,直到这个队列满了为止或者已经没有fetchlist可读取,当队列中的所有FetchItem都被抓取完成后,所有抓取线程都会退出运行。每一个FetchItemQueue都有一套自己的抓取策略,如最大的并行抓取个数,两次抓取的间隔等,如果当FetcherThread向队列申请一个FetchItem时,FetchItemQueue发现当前的FetchItem没有满足抓取策略,那这里它就会返回null,表达当前FetchItem还没有准备好被抓取。如果这些所有FetchItem都没有准备好被抓取,那这时FetchThread就会进入等待状态,直到条件满足被促发或者是等待超时,它会认为任务已经被挂起,这时FetchThread会自动退出。
2. FetcherOutputFormat的介绍
这个类是用来把FetcherOutput对象切分到不同的Map文件中的,也就是说它会根据对象的类型来判断输出到哪一个文件中,这里用到了一个多文件的输出。
FetcherOutputFormat继承自MapReduce框架的OutputFormat模板,其输出的<key,value>类型为<Text,NutchWritable>。
这里的OutputFormat定义了Map-Reduce任务的输出描述,Map-Reduce框架依赖任务的OutputFormat来做如下二件事情,一是用来验证输出源的可用性,如是否已经建立了相应的目录,数据库是否已经连上;另一件事是提供RecordWriter抽象来对数据进行写出到特定的数据源,一般输出文件定义在FileSystem里面。
FetcherOutputFormat主要是实现了getRecordWriter这个方法,用于得到相应的数据写出对象,我们来分析一下其源代码:
-
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
-
final JobConf job,
-
final String name,
-
final Progressable progress) throws IOException {
-
// 定义输出目录
-
Path out = FileOutputFormat.getOutputPath(job);
-
// 定义抓取的输出目录
-
final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
-
// 定义抓取内容的输出目录
-
final Path content = new Path(new Path(out, Content.DIR_NAME), name);
-
// 定义数据压缩格式
-
final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
-
-
-
// 定义抓取的输出抽象类
-
final MapFile.Writer fetchOut =
-
new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
-
compType, progress);
-
-
// 这里使用了inner class来定义相应的RecordWriter
-
return new RecordWriter<Text, NutchWritable>() {
-
private MapFile.Writer contentOut;
-
private RecordWriter<Text, Parse> parseOut;
-
-
-
{
-
// 这里看如果Fetcher定义了输出内容,就生成相应的Content输出抽象
-
if (Fetcher.isStoringContent(job)) {
-
contentOut = new MapFile.Writer(job, fs, content.toString(),
-
Text.class, Content.class,
-
compType, progress);
-
}
-
// 如果Fetcher对抓取的内容进行了解析,这里就定义相应的解析输出抽象
-
// 注意这里使用了ParseOutputFormat的getReocrdWriter,主要是解析网页,抽取其外链接
-
if (Fetcher.isParsing(job)) {
-
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
-
}
-
}
-
-
-
public void write(Text key, NutchWritable value)
-
throws IOException {
-
-
-
Writable w = value.get();
-
// 对对象类型进行判断,调用相应的抽象输出,写到不同的文件中去
-
if (w instanceof CrawlDatum)
-
fetchOut.append(key, w);
-
else if (w instanceof Content)
-
contentOut.append(key, w);
-
else if (w instanceof Parse)
-
parseOut.write(key, (Parse)w);
-
}
-
-
-
public void close(Reporter reporter) throws IOException {
-
fetchOut.close();
-
if (contentOut != null) {
-
contentOut.close();
-
}
-
if (parseOut != null) {
-
parseOut.close(reporter);
-
}
-
}
-
-
-
};
3. 生产者QueueFeeder的介绍
这个类作用是用于生产被抓取的FetchItem对象,把其放入抓取队列中。下来我们来对其源代码进行分析
-
// 这个类继承自Thread,是用一个单独的线程来做的
-
private static class QueueFeeder extends Thread {
-
private RecordReader<Text, CrawlDatum> reader; // 这里是InputFormat产生的ReocrdReader,用于读取Generate的产生的数据
-
private FetchItemQueues queues; // 这是生产者与消费者所使用的共享队列,这个对列是分层的,分一层对应一个host
-
private int size; // 队列的大小
-
private long timelimit = -1; // 这是一个过滤机制的策略,用于过滤所有的FetchItem
-
-
// 构造方法
-
public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
-
FetchItemQueues queues, int size) {
-
this.reader = reader;
-
this.queues = queues;
-
this.size = size;
-
this.setDaemon(true);
-
this.setName("QueueFeeder");
-
}
-
-
public void setTimeLimit(long tl) {
-
timelimit = tl;
-
}
-
-
-
// 函数的run方法
-
public void run() {
-
boolean hasMore = true; // while的循环条件
-
int cnt = 0;
-
int timelimitcount = 0;
-
while (hasMore) {
-
// 这里判断是否设置了这个过滤机制,如果设置了,判断相前时间是否大于这个timelimit,如果大于timelimit,过滤所有的FetchItem
-
if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
-
// enough .. lets' simply
-
// read all the entries from the input without processing them
-
try {
-
// 读出<key,value>对,过滤之
-
Text url = new Text();
-
CrawlDatum datum = new CrawlDatum();
-
hasMore = reader.next(url, datum);
-
timelimitcount++;
-
} catch (IOException e) {
-
LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
-
return;
-
}
-
continue; // 过滤之
-
}
-
int feed = size - queues.getTotalSize();
-
// 判断剩余的队列空间是否为0
-
if (feed <= 0) {
-
// queues are full - spin-wait until they have some free space
-
try {
-
// 休息1秒种
-
Thread.sleep(1000);
-
} catch (Exception e) {};
-
continue;
-
} else {
-
LOG.debug("-feeding " + feed + " input urls ...");
-
// 如果队列还有空间(feed>0)并且recordRedder中还有数据(hasMore)
-
while (feed > 0 && hasMore) {
-
try {
-
Text url = new Text();
-
CrawlDatum datum = new CrawlDatum();
-
// 读出<key,value>
-
hasMore = reader.next(url, datum);
-
if (hasMore) { // 判断是否成功读出数据
-
queues.addFetchItem(url, datum); // 放入对列,这个队列应该是thread-safe的,下面我们可以看到
-
cnt++; // 统计总数
-
feed--; // 剩余队列空间减1
-
}
-
} catch (IOException e) {
-
LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
-
return;
-
}
-
}
-
}
-
}
-
LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :"
-
+ timelimitcount);
-
}
-
}
这个类主要负责向队列中放数据。
4. 下面我们来看一下这个队列是如果工作的
这里的共享对列主要如三个类组成,一个是FetchItem,存储队列中的元素;另一个是FetchItemQueue,用于存储相同host的FetchItem,最后一个是FetchItemQueues,看名字我们就知道,这是用于存储所有的FetchItemQueue的。
4.1 先让我们来看一下FetchItem的结构:
-
FetchItem =>
-
{
-
queueID:String, // 用于存储队列的ID号
-
url:Text, // 用于存储CrawlDatum的url地址
-
u:URL, // 也是存储url,但是以URL的类型来存储,不过我看了一下,这东东在判断RobotRules的时候用了一下
-
datum:CrawlDatum // 这是存储抓取对象的一些元数据信息àà
-
}
下面我们来看一下它的create方法,是用来生成相应的FetchItem的,源代码如下:
-
//从注释中我们可以看到,队列ID是由protocol+hotname或者是protocol+IP组成的
-
/** Create an item. Queue id will be created based on <code>byIP</code>
-
* argument, either as a protocol + hostname pair, or protocol + IP
-
* address pair.
-
*/
-
public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {
-
String queueID;
-
URL u = null;
-
try {
-
u = new URL(url.toString()); // 得到其URL
-
} catch (Exception e) {
-
LOG.warn("Cannot parse url: " + url, e);
-
return null;
-
}
-
// 得到协议号
-
String proto = u.getProtocol().toLowerCase();
-
String host;
-
if (byIP) {
-
// 如果是基于IP的,那得到其IP地址
-
try {
-
InetAddress addr = InetAddress.getByName(u.getHost());
-
host = addr.getHostAddress();
-
} catch (UnknownHostException e) {
-
// unable to resolve it, so don't fall back to host name
-
LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
-
return null;
-
}
-
} else {
-
// 否则得到Hostname
-
host = u.getHost();
-
if (host == null) {
-
LOG.warn("Unknown host for url: " + url + ", skipping.");
-
return null;
-
}
-
hosthost = host.toLowerCase(); // 统一变小写
-
}
-
// 得成相应的队列ID号,放入FetchItemQueue中
-
queueID = proto + "://" + host;
-
return new FetchItem(url, u, datum, queueID);
-
}
4.2 下面我们来看一下FetchQueue的组成结构
这个类主要是用于收集相同QueueID的FetchItem对象,对正在抓取的FetchItem进行跟踪,使用的是一个inProgress集合,还有计算两次请求的间隔时间,我们来看一下其结构:
-
FetchQueue =>
-
{