Storm-WordCount示例

1、定义topology:

public class TopologyMain {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
//spout
WordCountSpout wcs = new WordCountSpout();
//bolts
WordGroupBolt wgb = new WordGroupBolt();
WordCountBolt wcb = new WordCountBolt();
ReportBolt rb = new ReportBolt();
//topology
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("word-spout", wcs);
tb.setBolt("group-bolt", wgb).shuffleGrouping("word-spout");
tb.setBolt("count-bolt", wcb).fieldsGrouping("group-bolt", new Fields("word"));
tb.setBolt("report-bolt", rb).globalGrouping("count-bolt");
//Config
Config config = new Config();
StormSubmitter.submitTopology("word-count-topology", config, tb.createTopology());
Thread.sleep(10000);
}
}

2、spout:

public class WordCountSpout extends BaseRichSpout {

private static final long serialVersionUID = 1L;
//模拟数据源,每一个发出句子作为一个元组
private String[] wordArr = {"my dog has fleas","i like cold beverages","the dog ate my homework"};
private SpoutOutputCollector collector;
private int index = 0;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {//spout初始化时调用
this.collector = collector;
}

@Override
public void nextTuple() {//输出元组
this.collector.emit(new Values(wordArr[index]));
index ++;
if(index >= wordArr.length){
index = 0;
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));//设置每个发出的元组包含的字段,这里只包含一个word字段
}

}

3、bolt:

public class WordGroupBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
String[] words = word.split(" ");
for(String w : words){
this.collector.emit(new Values(w));
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));//对应spout中定义的字段
}

}

public class WordCountBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private OutputCollector collector;
private HashMap<String, Long> countMap = null;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.countMap = new HashMap<String, Long>();
}

@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = this.countMap.get(word);
if(null == count){
count = 0L;
}
count ++;
this.countMap.put(word, count);
this.collector.emit(new Values(word, count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}

}

public class ReportBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private HashMap<String, Long> countMap = null;
private static final Logger logger = LoggerFactory.getLogger(ReportBolt.class);

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.countMap = new HashMap<String, Long>();
}

@Override
public void execute(Tuple input) {
logger.info("ReportBolt print result:");
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
this.countMap.put(word, count);
logger.info(word + "-----------------" + count);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

@Override
public void cleanup() {
super.cleanup();
}

}

查看日志,可通过UI查询,如ip:8080可以看到提交的的topology名称,点进去,点端口号可以查看之日

也可以登录到对应工作节点的storm安装目录下logs文件夹下,每个端口号都会生成一个log文件,查看对应端口号的日志文件即可查看打印信息

时间: 2024-08-05 11:42:17

Storm-WordCount示例的相关文章

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice

Hadoop MapReduce 官方教程 -- WordCount示例

Hadoop MapReduce 官方教程 -- WordCount示例: http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html#%E4%BE%8B%E5%AD%90%EF%BC%9AWordCount+v1.0

WordCount示例深度学习MapReduce过程

转自: http://blog.csdn.net/yczws1/article/details/21794873 . 我们都安装完Hadoop之后,按照一些案例先要跑一个WourdCount程序,来测试Hadoop安装是否成功.在终端中用命令创建一个文件夹,简单的向两个文件中各写入一段话,然后运行Hadoop,WourdCount自带WourdCount程序指令,就可以输出写入的那句话各个不同单词的个数.但是这不是这篇博客主要讲的内容,主要是想通过一个简单的Wordcount程序,来认识Hado

WordCount示例深度学习MapReduce过程(1)

我们都安装完Hadoop之后,按照一些案例先要跑一个WourdCount程序,来测试Hadoop安装是否成功.在终端中用命令创建一个文件夹,简单的向两个文件中各写入一段话,然后运行Hadoop,WourdCount自带WourdCount程序指令,就可以输出写入的那句话各个不同单词的个数.但是这不是这篇博客主要讲的内容,主要是想通过一个简单的Wordcount程序,来认识Hadoop的内部机制.并通过此来深入了解MapReduce的详细过程.在Thinking in BigDate(八)大数据H

Spark教程-构建Spark集群-配置Hadoop伪分布模式并运行Wordcount示例(1)

第四步:配置Hadoop伪分布模式并运行Wordcount示例 伪分布模式主要涉及一下的配置信息: 修改Hadoop的核心配置文件core-site.xml,主要是配置HDFS的地址和端口号: 修改Hadoop中HDFS的配置文件hdfs-site.xml,主要是配置replication; 修改Hadoop的MapReduce的配置文件mapred-site.xml,主要是配置JobTracker的地址和端口: 在具体操作前我们先在Hadoop目录下创建几个文件夹: 下面开始构建具体的伪分布式

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

storm wordcount实例

在storm环境部署完毕,并正确启动之后,现在就可以真正进入storm开发了,按照惯例,以wordcount作为开始.这个例子很简单,核心组件包括:一个spout,两个bolt,一个Topology.spout从一个路径读取文件,然后readLine,向bolt发射,一个文件处理完毕后,重命名,以不再重复处理.第一个bolt将从spout接收到的字符串按空格split,产生word,发射给下一个bolt.第二个bolt接收到word后,统计.计数,放到HashMap<string, intege

Erlang基础 -- 介绍 -- Wordcount示例演示

在前两个blog中,已经说了Erlang的历史.应用场景.特点,这次主要演示一个Wordcount的示例,就是给定一个文本文件,统计这个文本文件中的单词以及该单词出现的次数. 今天和群友们讨论了一个问题,突然一下子就上升到哲学角度上了,装逼装大发了. PS:图片中有错别字,%s/财务和其他9个月/财务和其他9个人/g 不过真心想说的一点是,把Erlang系统,映射到现实中,很多奇葩问题,就能迎刃而解了.所以,在下面的简要设计中,我就尽可能的代入一下现实世界吧. 环境安装 mac 的话,用brew

Storm wordcount Read from file

source code: package stormdemo; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.HashMap; import jav

初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现,MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1.Hadoop示例程序WordCount详解及实例 2.hadoop 学习笔记:m