Nutch源码阅读进程4---parseSegment

前面依次看了nutch的准备工作inject和generate部分,抓取的fetch部分的代码,趁热打铁,我们下面来一睹parse即页面解析部分的代码,这块代码主要是集中在ParseSegment类里面,Let‘s go~~~

上期回顾:上回主要讲的是nutch的fetch部分的功能代码实现,主要是先将segments目录下的指定文件夹作为输入,读取里面将要爬取的url信息存入爬取队列,再根据用户输入的爬取的线程个数thread决定消费者的个数,线程安全地取出爬取队列里的url,然后在执行爬取页面,解析页面源码得出url等操作,最终在segments目录下生成content和crawl_fetch三个文件夹,下面来瞧瞧nutch的parse是个怎么回事……

1.parse部分的入口从代码 parseSegment.parse(segs[0]);开始,进入到ParseSegment类下的parse方法后,首先设置一个当前时间(方便后面比较结束时间之差来得到整个parse所需的时间)。然后就是一个mapreduce过程,初始化了一个job,具体代码如下:

JobConf job = new NutchJob(getConf());
job.setJobName("parse " + segment);

FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ParseSegment.class);
job.setReducerClass(ParseSegment.class);

FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormat(ParseOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ParseImpl.class);

JobClient.runJob(job);

可以看出设置的输入为segment文件夹下的文件,输出也是segment文件夹,当然变化的是segment下生成了新的文件夹,提交的mapper和reducer都是parsesegment类。

2.下面就来分别看看ParseSegment类的map和reducer方法。map()方法中首先是一些判定的代码,该函数的主要功能还是集中在以下代码中:

ParseResult parseResult = null;
try {
parseResult = new ParseUtil(getConf()).parse(content);
} catch (Exception e) {
LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
return;
}

for (Entry<Text, Parse> entry : parseResult) {
Text url = entry.getKey();//http://www.ahu.edu.cn/
Parse parse = entry.getValue();
ParseStatus parseStatus = parse.getData().getStatus();//success(1,0)
long start = System.currentTimeMillis();

reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[parseStatus.getMajorCode()], 1);

if (!parseStatus.isSuccess()) {
LOG.warn("Error parsing: " + key + ": " + parseStatus);
parse = parseStatus.getEmptyParse(getConf());
}

// pass segment name to parse data
parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, 
getConf().get(Nutch.SEGMENT_NAME_KEY));

// compute the new signature
byte[] signature = 
SignatureFactory.getSignature(getConf()).calculate(content, parse); 
parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, 
StringUtil.toHexString(signature));

try {
scfilters.passScoreAfterParsing(url, content, parse);
} catch (ScoringFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error passing score: "+ url +": "+e.getMessage());
}
}
long end = System.currentTimeMillis();
LOG.info("Parsed (" + Long.toString(end - start) + "ms):" + url);

output.collect(url, new ParseImpl(new ParseText(parse.getText()), 
parse.getData(), parse.isCanonical()));
}

其中parseResult 是通过new ParseUtil(getConf()).parse(content);产生的,进入ParseUtil我们可以看出该函数全貌如下:

public ParseUtil(Configuration conf) {
this.parserFactory = new ParserFactory(conf);
MAX_PARSE_TIME=conf.getInt("parser.timeout", 30);
}

而ParserFactory就是调用一个插件来解决页面解析这部分问题的,ParseFactory的代码如下:

public ParserFactory(Configuration conf) {
this.conf = conf;
ObjectCache objectCache = ObjectCache.get(conf);
this.extensionPoint = PluginRepository.get(conf).getExtensionPoint(
Parser.X_POINT_ID);
this.parsePluginList = (ParsePluginList)objectCache.getObject(ParsePluginList.class.getName());
if (this.parsePluginList == null) {
this.parsePluginList = new ParsePluginsReader().parse(conf);
objectCache.setObject(ParsePluginList.class.getName(), this.parsePluginList);
}

if (this.extensionPoint == null) {
throw new RuntimeException("x point " + Parser.X_POINT_ID + " not found.");
}
if (this.parsePluginList == null) {
throw new RuntimeException(
"Parse Plugins preferences could not be loaded.");
}
}

当然了,如何调用插件来解决这个问题作者还不是很清楚,但是隐约从代码中已经看到了PluginRepository(插件仓库)、extensionPoint (扩展点)这样的名词了。

让我们再回到map方法,通过调试我们可以看到ParseResult包含了以下信息:

Version: -1
url: http://www.ahu.edu.cn/
base: http://www.ahu.edu.cn/
contentType: application/xhtml+xml
metadata: Date=Sat, 02 Aug 2014 13:46:36 GMT nutch.crawl.score=1.0 _fst_=33 nutch.segment.name=20140802214742 Content-Type=text/html Connection=close Accept-Ranges=bytes Server=Apache/2.2.8 (Unix) mod_ssl/2.2.8 OpenSSL/0.9.8e-fips-rhel5 DAV/2 Resin/3.0.25 
Content:
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />……

随后再通过一个for循环,遍历出其中的解析的详细内容,我们可以看到 Text url = entry.getKey();就是得到当前要解析的url,紧接着执行Parse parse = entry.getValue();其中的Text属性就是解析后的网页的主体信息即过滤了一些网页标签后的结果。剩下的代码主要实现将解析的内容collect出去。

3.执行完map方法后就是reduce,reducer的代码很简洁就一行: output.collect(key, (Writable)values.next()); // collect first value,自带的注解“collect first value”大概的意思就是map中每次只针对某一个url进行处理,所以收集到的解析的<text,parse>也就是唯一一个,自己的拙见啦~~~至此整个parse的过程就执行完毕了。

4.关于segment文件夹下的crawl_parse,parse_data,parse_text三个文件夹是如何生成的,我们可以看看上面job的输出ParseOutputFormat类。进入该类的主体方法getRecordWriter(),首先是一些初始化和变量的赋值,比如url过滤器、url规格化对象的生成,时间间隔、解析的上限等变量的赋值。然后通过以下三行代码定义输出目录:

Path text = new Path(new Path(out, ParseText.DIR_NAME), name);  // parse_text

Path data = new Path(new Path(out, ParseData.DIR_NAME), name);//parse_data     Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//crawl_parse

然后再通过以下三个方法生成这三个目录

final MapFile.Writer textOut =
new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class,
CompressionType.RECORD, progress);
final MapFile.Writer dataOut =
new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,
compType, progress);
final SequenceFile.Writer crawlOut =
SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,
compType, progress);

以上就是对于parse过程的一个简单解析,相比前面的三个流程来说,parse模块的实现逻辑相对简单。。。

(备注:涉及到ParseOutputFormat部分还有一些东西没有搞懂,下面的参考博文给了详细的解释,有兴趣可以拜读下)

参考博文:http://blog.csdn.net/amuseme_lu/article/details/6727516

Nutch源码阅读进程4---parseSegment

时间: 2024-12-26 05:08:58

Nutch源码阅读进程4---parseSegment的相关文章

Nutch源码阅读进程3---fetch

走了一遍Inject和Generate,基本了解了nutch在执行爬取前的一些前期预热工作,包括url的过滤.规则化.分值计算以及其与mapreduce的联系紧密性等,自我感觉nutch的整个流程是很缜密的,起码从前面两个过程看是这样的. 前期回顾:上一期主要是讲解了nutch的第二个环节Generate,该环节主要完成获取将要抓取的url列表,并写入到segments目录下,其中一些细节的处理包括每个job提交前的输入输出以及执行的map和reducer类具体做了那些工作都可以参考上一篇.接下

Nutch源码阅读进程1---inject

最近在Ubuntu下配置好了nutch和solr的环境,也用nutch爬取了一些网页,通过solr界面呈现,也过了一把自己建立小搜索引擎的瘾,现在该静下心来好好看看nutch的源码了,先从Inject开始吧~~~ 1.从crawl.java的main函数进入,执行: Configuration conf = NutchConfiguration.create(): 再进入NutchConfiguration(NutchConfiguration负责加载管理nutch的配置文件信息,该类继承自Co

Nutch源码阅读进程2---Generate

继之前仓促走完nutch的第一个流程Inject后,再次起航,Debug模式走起,进入第二个预热阶段Generate~~~ 上期回顾:Inject主要是将爬取列表中的url转换为指定格式<Text,CrawlDatum>存在CrawlDb中,主要做了两件事,一是读取种子列表中的url,对其进行了url过滤.规范化,当然这其中用的是hadoop的mapreduce模式提交job到jobtracker,因为没有研读hadoop源码,所以这块先放放,理清nutch的大体思路后再去啃hadoop的ma

Nutch源码阅读进程5---update

看nutch的源码仿佛就是一场谍战片,而构成这精彩绝伦的谍战剧情的就是nutch的每一个从inject->generate->fetch->parse->update的环节,首先我党下派任务给优秀的地下工作者(inject),地下工作者经过一系列处理工作(告诉自己媳妇孩子要出差什么的……)以及加入自己的主观能动性(generate),随后深入敌方进行fetch侦查工作,这其中会获得敌方的大量信息,不是一般农民工能看懂的,需要工作者凭借自己渊博的知识储备和经验进行parse,去伪存真

Nutch源码阅读进程3

走了一遍Inject和Generate,基本了解了nutch在执行爬取前的一些前期预热工作,包括url的过滤.规则化.分值计算以及其与mapreduce的联系紧密性等,自我感觉nutch的整个流程是很缜密的,起码从前面两个过程看是这样的. 前期回顾:上一期主要是讲解了nutch的第二个环节Generate,该环节主要完成获取将要抓取的url列表,并写入到segments目录下,其中一些细节的处理包括每个job提交前的输入输出以及执行的map和reducer类具体做了那些工作都可以参考上一篇.接下

Nutch源码阅读进程5

看nutch的源码仿佛就是一场谍战片,而构成这精彩绝伦的谍战剧情的就是nutch的每一个从inject->generate->fetch->parse->update的环节,首先我党下派任务给优秀的地下工作者(inject),地下工作者经过一系列处理工作(告诉自己媳妇孩子要出差什么的--)以及加入自己的主观能动性(generate),随后深入敌方进行fetch侦查工作,这其中会获得敌方的大量信息,不是一般农民工能看懂的,需要工作者凭借自己渊博的知识储备和经验进行parse,去伪存真

Nutch源码阅读进程4

前面依次看了nutch的准备工作inject和generate部分,抓取的fetch部分的代码,趁热打铁,我们下面来一睹parse即页面解析部分的代码,这块代码主要是集中在ParseSegment类里面,Let's go~~~ 上期回顾:上回主要讲的是nutch的fetch部分的功能代码实现,主要是先将segments目录下的指定文件夹作为输入,读取里面将要爬取的url信息存入爬取队列,再根据用户输入的爬取的线程个数thread决定消费者的个数,线程安全地取出爬取队列里的url,然后在执行爬取页

【原】SDWebImage源码阅读(三)

[原]SDWebImage源码阅读(三) 本文转载请注明出处 —— polobymulberry-博客园 1.SDWebImageDownloader中的downloadImageWithURL 我们来到SDWebImageDownloader.m文件中,找到downloadImageWithURL函数.发现代码不是很长,那就一行行读.毕竟这个函数大概做什么我们是知道的.这个函数大概就是创建了一个SDWebImageSownloader的异步下载器,根据给定的URL下载image. 先映入眼帘的

【Java】【Fulme】Flume-NG源码阅读之SpoolDirectorySource

org.apache.flume.source.SpoolDirectorySource是flume的一个常用的source,这个源支持从磁盘中某文件夹获取文件数据.不同于其他异步源,这个源能够避免重启或者发送失败后数据丢失.flume可以监控文件夹,当出现新文件时会读取该文件并获取数据.当一个给定的文件被全部读入到通道中时,该文件会被重命名以标志已经完成.同时,该源需要一个清理进程来定期移除完成的文件. 通道可选地将一个完成路径的原始文件插入到每个事件的hearder域中.在读取文件时,sou