Storm【实践系列-如何写一个爬虫】 - ParserBolt

阅读背景: 如果您对爬虫,或则web前端不够了解,请自行google。

代码前提:您需要参阅本ID 所写的前面两篇博文:  Storm【实践系列-如何写一个爬虫】 - Fetcher

本章主题: ParserBolt 如何完成的解析,并且如何从前面的组件得到数据,并emit出去。

博文流程:  博文将整个 爬虫系列公开,其过程为:

1 : 代码实现。

2 : 对代码的细节进行解析。

3 : 对真个设计进行回顾,并作总结。

如果您在参看本ID的博文的过程之中,只存在流程 1。那么请继续等待。一旦公司业务不处于饱和阶段。

本ID将保证每日一篇。

package com.digitalpebble.storm.crawler.bolt.parser;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.tika.Tika;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.sax.BodyContentHandler;
import org.apache.tika.sax.Link;
import org.apache.tika.sax.LinkContentHandler;
import org.apache.tika.sax.TeeContentHandler;
import org.slf4j.LoggerFactory;
import org.xml.sax.ContentHandler;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.codahale.metrics.Timer;
import com.digitalpebble.storm.crawler.StormConfiguration;
import com.digitalpebble.storm.crawler.filtering.URLFilters;
import com.digitalpebble.storm.crawler.util.Configuration;
import com.digitalpebble.storm.crawler.util.HistogramMetric;
import com.digitalpebble.storm.crawler.util.MeterMetric;
import com.digitalpebble.storm.crawler.util.TimerMetric;
import com.digitalpebble.storm.crawler.util.URLUtil;

/**
 * Uses Tika to parse the output of a fetch and extract text + metadata
 ***/

@SuppressWarnings("serial")
public class ParserBolt extends BaseRichBolt {

    private Configuration config;

    private Tika tika;

    private URLFilters filters = null;

    private OutputCollector collector;

    private static final org.slf4j.Logger LOG = LoggerFactory
            .getLogger(ParserBolt.class);

    private MeterMetric eventMeters;
    private HistogramMetric eventHistograms;
    private TimerMetric eventTimers;

    private boolean ignoreOutsideHost = false;
    private boolean ignoreOutsideDomain = false;

    public void prepare(Map conf, TopologyContext context,
            OutputCollector collector) {
        config = StormConfiguration.create();

        String urlconfigfile = config.get("urlfilters.config.file",
                "urlfilters.json");
        if (urlconfigfile != null)
            try {
                filters = new URLFilters(urlconfigfile);
            } catch (IOException e) {
                LOG.error("Exception caught while loading the URLFilters");
            }

        ignoreOutsideHost = config.getBoolean(
                "parser.ignore.outlinks.outside.host", false);
        ignoreOutsideDomain = config.getBoolean(
                "parser.ignore.outlinks.outside.domain", false);

        // instanciate Tika
        long start = System.currentTimeMillis();
        tika = new Tika();
        long end = System.currentTimeMillis();

        LOG.debug("Tika loaded in " + (end - start) + " msec");

        this.collector = collector;

        this.eventMeters = context.registerMetric("parser-meter",
                new MeterMetric(), 5);
        this.eventTimers = context.registerMetric("parser-timer",
                new TimerMetric(), 5);
        this.eventHistograms = context.registerMetric("parser-histograms",
                new HistogramMetric(), 5);

    }

    public void execute(Tuple tuple) {
        eventMeters.scope("tuple_in").mark();

        byte[] content = tuple.getBinaryByField("content");
        eventHistograms.scope("content_bytes").update(content.length);

        String url = tuple.getStringByField("url");
        HashMap<String, String[]> metadata = (HashMap<String, String[]>) tuple
                .getValueByField("metadata");

        // TODO check status etc...

        Timer.Context timer = eventTimers.scope("parsing").time();

        // rely on mime-type provided by server or guess?

        ByteArrayInputStream bais = new ByteArrayInputStream(content);
        Metadata md = new Metadata();

        String text = null;

        LinkContentHandler linkHandler = new LinkContentHandler();
        ContentHandler textHandler = new BodyContentHandler();
        TeeContentHandler teeHandler = new TeeContentHandler(linkHandler,
                textHandler);
        ParseContext parseContext = new ParseContext();
        // parse
        try {
            tika.getParser().parse(bais, teeHandler, md, parseContext);
            text = textHandler.toString();
        } catch (Exception e) {
            LOG.error("Exception while parsing " + url, e.getMessage());
            eventMeters.scope(
                    "error_content_parsing_" + e.getClass().getSimpleName())
                    .mark();
            collector.fail(tuple);
            eventMeters.scope("tuple_fail").mark();
            return;
        } finally {
            try {
                bais.close();
            } catch (IOException e) {
                LOG.error("Exception while closing stream", e);
            }
        }

        long duration = timer.stop();

        LOG.info("Parsed " + url + " in " + duration + " msec");

        // get the outlinks and convert them to strings (for now)
        String fromHost;
        URL url_;
        try {
            url_ = new URL(url);
            fromHost = url_.getHost().toLowerCase();
        } catch (MalformedURLException e1) {
            // we would have known by now as previous
            // components check whether the URL is valid
            LOG.error("MalformedURLException on " + url);
            eventMeters.scope(
                    "error_outlinks_parsing_" + e1.getClass().getSimpleName())
                    .mark();
            collector.fail(tuple);
            eventMeters.scope("tuple_fail").mark();
            return;
        }

        List<Link> links = linkHandler.getLinks();
        Set<String> slinks = new HashSet<String>(links.size());
        for (Link l : links) {
            if (StringUtils.isBlank(l.getUri()))
                continue;
            String urlOL = null;
            try {
                URL tmpURL = URLUtil.resolveURL(url_, l.getUri());
                urlOL = tmpURL.toExternalForm();
            } catch (MalformedURLException e) {
                LOG.debug("MalformedURLException on " + l.getUri());
                eventMeters.scope(
                        "error_out_link_parsing_"
                                + e.getClass().getSimpleName()).mark();
                continue;
            }

            // filter the urls
            if (filters != null) {
                urlOL = filters.filter(urlOL);
                if (urlOL == null) {
                    eventMeters.scope("outlink_filtered").mark();
                    continue;
                }
            }

            if (urlOL != null && ignoreOutsideHost) {
                String toHost;
                try {
                    toHost = new URL(urlOL).getHost().toLowerCase();
                } catch (MalformedURLException e) {
                    toHost = null;
                }
                if (toHost == null || !toHost.equals(fromHost)) {
                    urlOL = null; // skip it
                    eventMeters.scope("outlink_outsideHost").mark();
                    continue;
                }
            }

            if (urlOL != null) {
                slinks.add(urlOL);
                eventMeters.scope("outlink_kept").mark();
            }
        }

        // add parse md to metadata
        for (String k : md.names()) {
            // TODO handle mutliple values
            String[] values = md.getValues(k);
            metadata.put("parse." + k, values);
        }

        collector.emit(tuple, new Values(url, content, metadata, text.trim(), slinks));
        collector.ack(tuple);
        eventMeters.scope("tuple_success").mark();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // output of this module is the list of fields to index
        // with at least the URL, text content

        declarer.declare(new Fields("url", "content", "metadata", "text",
                "outlinks"));
    }

}

Storm【实践系列-如何写一个爬虫】 - ParserBolt

时间: 2024-08-03 14:38:58

Storm【实践系列-如何写一个爬虫】 - ParserBolt的相关文章

Storm【实践系列-如何写一个爬虫- 对于Protocol进行的封装】

本章描述:对于Protocol的封装 package com.digitalpebble.storm.crawler.fetcher; import com.digitalpebble.storm.crawler.util.Configuration; public interface Protocol {     public ProtocolResponse getProtocolOutput(String url) throws Exception;          public voi

Storm【实践系列-如何写一个爬虫- Metric 系列】1

package com.digitalpebble.storm.crawler; import backtype.storm.Config; import backtype.storm.metric.MetricsConsumerBolt; import backtype.storm.metric.api.IMetricsConsumer; import backtype.storm.task.IErrorReporter; import backtype.storm.task.OutputCo

通过写一个爬虫来学习大前端(草稿)

## 为什么说写爬虫能够磨练编程技艺呢?- 我们平时很不喜欢看别人的网站,通过写爬虫我们可以快速的对别人的网站布局有一个更好的理解- 可以提高我们的正则的水平- 提高http协议的水平- 我们可以充分的利用node的异步的优势- 可以玩一下一些好玩的node中间件- 考虑编码的问题- 我们会采用angular和bootstrap作为前端框架- 使用angular-router,做一个列表页和内容页- 熟悉一些开源工具像bower+npm+git- 当然这只是一个练习,我们把数据抓取下来之后,直接

用Scrapy写一个爬虫

昨天用python谢了一个简单爬虫,抓取页面图片: 但实际用到的爬虫需要处理很多复杂的环境,也需要更加的智能,重复发明轮子的事情不能干, 再说python向来以爬虫作为其擅长的一个领域,想必有许多成熟的第三方框架,百度后选用了 Scrapy作为平台构建复杂爬虫. Scarpy的下载安装不必细说,话说当前只支持python2.x版本,很郁闷,下载安装了python2.7. 安装完后,按照<Scrapy Tutorial>和Scrapy at a glance两篇帖子作为学习范本. 概念及步骤简要

自己写一个爬虫 copider

copider 模仿scrapy的一些写法,当然我这个是单进程的,不是异步的 1.目录 copider/copider.py #coding=utf-8 ''' Created on 2015年10月8日 @author: snt1 ''' import urllib2 import lxml.html import StringIO class Spider(object): def __init__(self, url, meta=None): self.URL = url self.MET

用Python写一个最简单的网络爬虫

什么是网络爬虫?这是百度百科的解释: 网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动的抓取万维网信息的程序或者脚本.另外一些不常使用的名字还有蚂蚁,自动索引,模拟程序或者蠕虫. 爬虫可以做什么?爬虫可以帮助我们在茫茫互联网中爬取我们需要的特定数据,这个特定数据可以是任何想获得的数据. 爬虫是一个让人热血的话题,因为当你在写爬虫的时候,你会感觉到自己是在做一件很NB的事,而每当写出一个爬虫,就会在此基础上不断尝试写出更NB的爬虫,有

用Python实现一个爬虫爬取ZINC网站进行生物信息学数据分析

最近接到实验室的导师交给我的一个任务,就是他们手头有很多smile表达式,格式类似这种:C(=C(c1ccccc1)c1ccccc1)c1ccccc1(这是生物信息学中表达小分子结构的一种常用表达式),他们需要对每个smile表达式在ZINC网站(生物信息学数据网站)上进行搜索,然后找到对应的ZINC号.小分子供应商.构象预测等信息.基本步骤如下: 点击查找之后网页就会跳转到详细信息,我们需要获取它的ZINC号.小分子供应商.构象预测.CAS号等信息,如下: 这一套流程要是靠人工手动完成的话有点

筹划开始写一个系列的东西

最近晚上的时间相对来说是比较宽裕的,所以想写一个系列的东西出来,以便于巩固自己当前的知识体系. 目前我的计划如下: 1. WebAPI 系列讲解. 2. Signalr 系列讲解. 3. MySQL数据库系列讲解. 4. ABP框架系列讲解. 5. 自学DDD系列讲解. 6. 敏捷开发之我所见所闻系列讲解. 7. 自学TDD系列讲解. 8. KnockOut.JS系列讲解. 9. 自己读的和技术无关的一些书籍方面的系列. 我非常喜欢研究DDD框架,SOA框架和基于事件消息模型的中间件框架,同时对

也写一个简单的网络爬虫

引子 在cnblogs也混了许久,不过碍于平日工作太忙,一篇随笔也没有写过.最近经常感觉到自己曾经积累过的经验逐步的丢失,于是开通了博客,主要是记录一下自己在业余时间里玩的一些东西. 缘起 言归正传.某次在在某高校网站闲逛,看到了一些有趣的东西想要保存起来,但是却分散在各个页面,难以下手.使用baidu,google却有无法避免的搜索到此站点之外的内容.于是就想如果有一个爬虫,可以抓取指定域名的某些感兴趣的内容,不是很好.在网上简单搜索了一下,简单的都不满意,功能强大的又太复杂,就想自己写一个.