storm starter学习(一)

官方提供的storm starter示例中,有很多应用的例子,对storm的应用场景理解很有帮助。本文结合源码来进行功能分解,记录一下,作为记忆索引吧。

先来看一个比较简单的示例:WordCountTopology,原版代码该示例是为了说明多语言适配而做的应用场景,主要功能是随机生成一些String,将这些String划分分组,统计各单词出现数量。后来修改了一下,去掉了py调用的地方。使用java来进行词组划分。

先来看一下Topology:

public static void main(String[] args) throws Exception {
	TopologyBuilder builder = new TopologyBuilder();
	builder.setSpout("spout", new RandomSentenceSpout(), 5); // 数据源
	builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); // 单词划分
	builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",new Fields("word")); // 统计单词出现个数
	Config conf = new Config();
	conf.setDebug(true);

	if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar("wordCount", conf,
					builder.createTopology());
	} else {
	    conf.setMaxTaskParallelism(3);
	    LocalCluster cluster = new LocalCluster();
	    cluster.submitTopology("word-count", conf, builder.createTopology());

	    Thread.sleep(10000);
	    cluster.shutdown();
	}
}

RandomSentenceSpout功能很简单,随机生成字符串到tuple中,key为word。

再看一下bolt,该示例中使用了两类分组方式shuffleGrouping(随机分组),fieldsGrouping(按字段分组)。使用shuffleGrouping来进行单词划分,为了保证单词统计时都在一个bolt中进行,使用fieldsGrouping来进行word划分统计,运行时会看到相同key值的tuple会分配到同一线程上。

public static class SplitSentence implements IBasicBolt {
	public void prepare(Map conf, TopologyContext context) {
	}

	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String sentence = tuple.getString(0);
		for (String word : sentence.split(" ")) { // 将Spout接收到的tuple按空格进行分解,产生单词数据流
			collector.emit(new Values(word));
		}

	}

	public void cleanup() {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word")); // 定义key值
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}

单词统计bolt:

public static class WordCount extends BaseBasicBolt {
	Map<String, Integer> counts = new HashMap<String, Integer>();

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String word = tuple.getString(0);
		Integer count = counts.get(word);
		if (count == null)
			count = 0;
		count++;
		counts.put(word, count);
		collector.emit(new Values(word, count)); // 输出结果word + word number
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "count"));
	}
}

总结:

通常情况下,为了保证数据可靠性与完整性,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,相当于自动处理了prepare方法和collector.emit.ack(inputTuple);

storm starter学习(一)

时间: 2024-11-05 22:48:44

storm starter学习(一)的相关文章

storm starter学习(二) - 流聚合

SingleJoinExample示例说明了storm中流聚合的应用,将具有相同tuple属性的数据流整合成一个新的数据流.来看一下Topology.先定义两个数据源genderSpout和ageSpout,Fields分别为("id", "gender").("id", "age"),最终聚合后的数据流按id进行分组,输出为("gender", "age").具体Topology如下:

Storm入门学习随记

推荐慕课网视频:http://www.imooc.com/video/10055 ====Storm的起源. Storm是开源的.分布式.流式计算系统 什么是分布式呢?就是将一个任务拆解给多个计算机去执行,让许多机器共通完成同一个任务, 把这个多机的细节给屏蔽,对外提供同一个接口.同一个服务,这样的系统就是分布式系统. 在多年以前并没有非常范用的分布式系统,即使存在,也都是限定在指定的领域, 当然,也有人尝试从中提取出共通的部分,发明一个通用的分布式系统,但是都没有很好的结果. 后来,Googl

Flume-ng+Kafka+storm的学习笔记

Flume-ng Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume的文档可以看http://flume.apache.org/FlumeUserGuide.html 官方的英文文档 介绍的比较全面. 不过这里写写自己的见解 这个是flume的架构图 从上图可以看到几个名词: Agent: 一个Agent包含Source.Channel.Sink和其他的组件.Flume就是一个或多个Agent构成的. Source:数据源.简单的说就是agent获取数据的入口

storm记录--5-- Storm学习的HelloWorld

Storm学习的HelloWorld 1.下载Storm-start(https://github.com/nathanmarz/storm-starter/archive/master.zip) 2.进入下载目录,对zip文件解压 3.进入解压后的文件目录,修改m2-pom.xml(将twitter4j-core和twitter4j-stream替换为下面的部分) <dependency> <groupId>org.twitter4j</groupId> <ar

flume+kafka+storm+mysql架构设计

前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考. 这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql (项目是maven项目,需要改动mysql配置,提供两种topology:读取本地文件(用来本地测试):读取服务器日志文件.) (是visio画的,图太大,放上来字看起来比较小,如果有需要的朋友留邮箱) 实时日志分析系统架构简介 系统主要分为四部分:                         负责从各节点上

storm 原理简介及单机版安装指南(转)

本文翻译自: https://github.com/nathanmarz/storm/wiki/Tutorial Storm是一个分布式的.高容错的实时计算系统. Storm对于实时计算的的意义相当于Hadoop对于批处理的意义.Hadoop为我们提供了Map和Reduce原语,使我们对数据进行批处理变的非常的简单和优美.同样,Storm也对数据的实时计算提供了简单Spout和Bolt原语. Storm适用的场景: 1.流数据处理:Storm可以用来用来处理源源不断的消息,并将处理之后的结果保存

Hadoop平台提供离线数据和Storm平台提供实时数据流

1.准备工作 2.一个Storm集群的基本组件 3.Topologies 4.Stream 5.数据模型(Data Model) 6.一个简单的Topology 7.流分组策略(Stream grouping) 8.使用别的语言来定义Bolt 9.可靠的消息处理 10.单机版安装指南 本文翻译自: https://github.com/nathanmarz/storm/wiki/Tutorial Storm是一个分布式的.高容错的实时计算系统.Storm对于实时计算的的意义相当于Hadoop对于

storm 原理简介及单机版安装指南

目录[-] 1.准备工作 2.一个Storm集群的基本组件 3.Topologies 4.Stream 5.数据模型(Data Model) 6.一个简单的Topology 7.流分组策略(Stream grouping) 8.使用别的语言来定义Bolt 9.可靠的消息处理 10.单机版安装指南 本文翻译自: https://github.com/nathanmarz/storm/wiki/Tutorial Storm是一个分布式的.高容错的实时计算系统.Storm对于实时计算的的意义相当于Ha

Storm框架基础(一)

* Storm框架基础(一) Storm简述 如果你了解过SparkStreaming,那么Storm就可以类比着入门,在此我们可以先做一个简单的比较:  在SparkStreaming中: 我们曾尝试过每秒钟的实时数据处理,或者使用Window若干时间范围内的数据统一处理结果.亦或统计所有时间范围内的数据结果. 在Storm中: 我们可以根据进来的每一条数据进行实时处理,也就是说,Storm处理数据的速度,要小于1秒,也就是毫秒级别的. 如果你疑问,1秒处理1次数据,和进来1条数据处理1次有什