Storm【实践系列-如何写一个爬虫- Metric 系列】1

package com.digitalpebble.storm.crawler;

import backtype.storm.Config;
import backtype.storm.metric.MetricsConsumerBolt;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectWriter;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author Enno Shioji ([email protected])
 */
public class DebugMetricConsumer implements IMetricsConsumer {
	private static final Logger log = LoggerFactory
			.getLogger(DebugMetricConsumer.class);
	private IErrorReporter errorReporter;
	private Server server;

	// Make visible to servlet threads
	private volatile TopologyContext context;
	private volatile ConcurrentMap<String, Number> metrics;
	private volatile ConcurrentMap<String, Map<String, Object>> metrics_metadata;

	public void prepare(Map stormConf, Object registrationArgument,
			TopologyContext context, IErrorReporter errorReporter) {
		this.context = context;
		this.errorReporter = errorReporter;
		this.metrics = new ConcurrentHashMap<String, Number>();
		this.metrics_metadata = new ConcurrentHashMap<String, Map<String, Object>>();

		try {
			// TODO Config file not tested
			final String PORT_CONFIG_STRING = "topology.metrics.consumers.debug.servlet.port";
			Integer port = (Integer) stormConf.get(PORT_CONFIG_STRING);
			if (port == null) {
				log.warn("Metrics debug servlet‘s port not specified, defaulting to 7070. You can specify it via "
						+ PORT_CONFIG_STRING + " in storm.yaml");
				port = 7070;
			}
			server = startServlet(port);
		} catch (Exception e) {
			log.error("Failed to start metrics server", e);
			throw new AssertionError(e);
		}
	}

	private static final Joiner ON_COLONS = Joiner.on("::");

	public void handleDataPoints(TaskInfo taskInfo,
			Collection<DataPoint> dataPoints) {
		// In order
		String componentId = taskInfo.srcComponentId;
		Integer taskId = taskInfo.srcTaskId;
		Integer updateInterval = taskInfo.updateIntervalSecs;
		Long timestamp = taskInfo.timestamp;
		for (DataPoint point : dataPoints) {
			String metric_name = point.name;
			try {
				Map<String, Number> metric = (Map<String, Number>) point.value;
				for (Map.Entry<String, Number> entry : metric.entrySet()) {
					String metricId = ON_COLONS.join(componentId, taskId,
							metric_name, entry.getKey());
					Number val = entry.getValue();
					metrics.put(metricId, val);
					metrics_metadata.put(metricId, ImmutableMap
							.<String, Object> of("updateInterval",
									updateInterval, "lastreported", timestamp));
				}
			} catch (RuntimeException e) {
				// One can easily send something else than a Map<String,Number>
				// down the __metrics stream and make this part break.
				// If you ask me either the message should carry type
				// information or there should be different stream per message
				// type
				// This is one of the reasons why I want to write a further
				// abstraction on this facility
				errorReporter.reportError(e);
				metrics_metadata
						.putIfAbsent("ERROR_METRIC_CONSUMER_"
								+ e.getClass().getSimpleName(), ImmutableMap
								.of("offending_message_sample", point.value));
			}
		}
	}

	private static final ObjectMapper OM = new ObjectMapper();

	private Server startServlet(int serverPort) throws Exception {
		// Setup HTTP server
		Server server = new Server(serverPort);
		Context root = new Context(server, "/");
		server.start();

		HttpServlet servlet = new HttpServlet() {
			@Override
			protected void doGet(HttpServletRequest req,
					HttpServletResponse resp) throws ServletException,
					IOException {
				SortedMap<String, Number> metrics = ImmutableSortedMap
						.copyOf(DebugMetricConsumer.this.metrics);
				SortedMap<String, Map<String, Object>> metrics_metadata = ImmutableSortedMap
						.copyOf(DebugMetricConsumer.this.metrics_metadata);

				Map<String, Object> toplevel = ImmutableMap
						.of("retrieved",
								new Date(),

								// TODO this call fails with mysterious
								// exception
								// "java.lang.IllegalArgumentException: Could not find component common for __metrics"
								// Mailing list suggests it‘s a library version
								// issue but couldn‘t find anything suspicious
								// Need to eventually investigate
								// "sources",
								// context.getThisSources().toString(),

								"metrics", metrics, "metric_metadata",
								metrics_metadata);

				ObjectWriter prettyPrinter = OM
						.writerWithDefaultPrettyPrinter();
				prettyPrinter.writeValue(resp.getWriter(), toplevel);
			}
		};

		root.addServlet(new ServletHolder(servlet), "/metrics");

		log.info("Started metric server...");
		return server;

	}

	public void cleanup() {
		try {
			server.stop();
		} catch (Exception e) {
			throw new AssertionError(e);
		}
	}

}

前提说明:

storm从0.9.0开始,增加了指标统计框架,用来收集应用程序的特定指标,并将其输出到外部系统。

一般来说,您只需要去实现 LoggingMetricsConsumer,统计将指标值输出到metric.log日志文件之中。

当然,您也可以自定义一个监听的类:只需要去实现IMetricsConsumer接口就可以了。这些类可以在代码里注册(registerMetricsConsumer),也可以在 storm.yaml配置文件中注册:

Storm【实践系列-如何写一个爬虫- Metric 系列】1

时间: 2024-08-21 05:31:18

Storm【实践系列-如何写一个爬虫- Metric 系列】1的相关文章

Storm【实践系列-如何写一个爬虫- 对于Protocol进行的封装】

本章描述:对于Protocol的封装 package com.digitalpebble.storm.crawler.fetcher; import com.digitalpebble.storm.crawler.util.Configuration; public interface Protocol {     public ProtocolResponse getProtocolOutput(String url) throws Exception;          public voi

Storm【实践系列-如何写一个爬虫】 - ParserBolt

阅读背景: 如果您对爬虫,或则web前端不够了解,请自行google. 代码前提:您需要参阅本ID 所写的前面两篇博文:  Storm[实践系列-如何写一个爬虫] - Fetcher 本章主题: ParserBolt 如何完成的解析,并且如何从前面的组件得到数据,并emit出去. 博文流程:  博文将整个 爬虫系列公开,其过程为: 1 : 代码实现. 2 : 对代码的细节进行解析. 3 : 对真个设计进行回顾,并作总结. 如果您在参看本ID的博文的过程之中,只存在流程 1.那么请继续等待.一旦公

通过写一个爬虫来学习大前端(草稿)

## 为什么说写爬虫能够磨练编程技艺呢?- 我们平时很不喜欢看别人的网站,通过写爬虫我们可以快速的对别人的网站布局有一个更好的理解- 可以提高我们的正则的水平- 提高http协议的水平- 我们可以充分的利用node的异步的优势- 可以玩一下一些好玩的node中间件- 考虑编码的问题- 我们会采用angular和bootstrap作为前端框架- 使用angular-router,做一个列表页和内容页- 熟悉一些开源工具像bower+npm+git- 当然这只是一个练习,我们把数据抓取下来之后,直接

用Scrapy写一个爬虫

昨天用python谢了一个简单爬虫,抓取页面图片: 但实际用到的爬虫需要处理很多复杂的环境,也需要更加的智能,重复发明轮子的事情不能干, 再说python向来以爬虫作为其擅长的一个领域,想必有许多成熟的第三方框架,百度后选用了 Scrapy作为平台构建复杂爬虫. Scarpy的下载安装不必细说,话说当前只支持python2.x版本,很郁闷,下载安装了python2.7. 安装完后,按照<Scrapy Tutorial>和Scrapy at a glance两篇帖子作为学习范本. 概念及步骤简要

自己写一个爬虫 copider

copider 模仿scrapy的一些写法,当然我这个是单进程的,不是异步的 1.目录 copider/copider.py #coding=utf-8 ''' Created on 2015年10月8日 @author: snt1 ''' import urllib2 import lxml.html import StringIO class Spider(object): def __init__(self, url, meta=None): self.URL = url self.MET

用Python写一个最简单的网络爬虫

什么是网络爬虫?这是百度百科的解释: 网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动的抓取万维网信息的程序或者脚本.另外一些不常使用的名字还有蚂蚁,自动索引,模拟程序或者蠕虫. 爬虫可以做什么?爬虫可以帮助我们在茫茫互联网中爬取我们需要的特定数据,这个特定数据可以是任何想获得的数据. 爬虫是一个让人热血的话题,因为当你在写爬虫的时候,你会感觉到自己是在做一件很NB的事,而每当写出一个爬虫,就会在此基础上不断尝试写出更NB的爬虫,有

零基础写python爬虫之urllib2使用指南

零基础写python爬虫之urllib2使用指南 前面说到了urllib2的简单入门,下面整理了一部分urllib2的使用细节. 1.Proxy 的设置 urllib2 默认会使用环境变量 http_proxy 来设置 HTTP Proxy. 如果想在程序中明确控制 Proxy 而不受环境变量的影响,可以使用代理. 新建test14来实现一个简单的代理Demo: import urllib2   enable_proxy = True   proxy_handler = urllib2.Prox

用weexplus从0到1写一个app(2)-页面跳转和文章列表及文章详情的编写

说明 结束连续几天的加班,最近的项目终于告一段落,今天抽点时间开始继续写我这篇拖了很久的<用weexplus从0到1写一个app>系列文章.写这篇文章的时候,weexplus的作者已经把weexplus重构了一下,可以同时打包出web端和native端,我这边的ui界面和项目结构也跟着做了一点变化.这里有weexplus官方放出的一个电影APP的demo,有需要的可以去下载看看,然后顺便给weexplus一个star吧! 文章可能会很长,在此分几篇文章来写,先占个坑: 用weexplus从0到

用Python实现一个爬虫爬取ZINC网站进行生物信息学数据分析

最近接到实验室的导师交给我的一个任务,就是他们手头有很多smile表达式,格式类似这种:C(=C(c1ccccc1)c1ccccc1)c1ccccc1(这是生物信息学中表达小分子结构的一种常用表达式),他们需要对每个smile表达式在ZINC网站(生物信息学数据网站)上进行搜索,然后找到对应的ZINC号.小分子供应商.构象预测等信息.基本步骤如下: 点击查找之后网页就会跳转到详细信息,我们需要获取它的ZINC号.小分子供应商.构象预测.CAS号等信息,如下: 这一套流程要是靠人工手动完成的话有点