storm wordcount实例

在storm环境部署完毕,并正确启动之后,现在就可以真正进入storm开发了,按照惯例,以wordcount作为开始。
这个例子很简单,核心组件包括:一个spout,两个bolt,一个Topology。
spout从一个路径读取文件,然后readLine,向bolt发射,一个文件处理完毕后,重命名,以不再重复处理。
第一个bolt将从spout接收到的字符串按空格split,产生word,发射给下一个bolt。
第二个bolt接收到word后,统计、计数,放到HashMap<string, integer="">容器中。

1,定义一个spout,作用是源源不断滴向bolt发射字符串。

点击(此处)折叠或打开

  1. import java.io.File;
  2. import java.io.IOException;
  3. import java.util.Collection;
  4. import java.util.List;
  5. import java.util.Map;
  6. import org.apache.commons.io.FileUtils;
  7. import org.apache.commons.io.filefilter.FileFilterUtils;
  8. import backtype.storm.spout.SpoutOutputCollector;
  9. import backtype.storm.task.TopologyContext;
  10. import backtype.storm.topology.OutputFieldsDeclarer;
  11. import backtype.storm.topology.base.BaseRichSpout;
  12. import backtype.storm.tuple.Fields;
  13. import backtype.storm.tuple.Values;
  14. public class WordReader extends BaseRichSpout {
  15. private static final long serialVersionUID = 2197521792014017918L;
  16. private String inputPath;
  17. private SpoutOutputCollector collector;
  18. @Override
  19. @SuppressWarnings(\"rawtypes\")
  20. public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  21. this.collector = collector;
  22. inputPath = (String) conf.get(\"INPUT_PATH\");
  23. }
  24. @Override
  25. public void nextTuple() {
  26. Collection<File> files = FileUtils.listFiles(new File(inputPath),
  27. FileFilterUtils.notFileFilter(FileFilterUtils.suffixFileFilter(\".bak\")), null);
  28. for (File f : files) {
  29. try {
  30. List<String> lines = FileUtils.readLines(f, \"UTF-8\");
  31. for (String line : lines) {
  32. collector.emit(new Values(line));
  33. }
  34. FileUtils.moveFile(f, new File(f.getPath() + System.currentTimeMillis() + \".bak\"));
  35. } catch (IOException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. }
  40. @Override
  41. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  42. declarer.declare(new Fields(\"line\"));
  43. }
  44. }

2,定义一个bolt,作用是接收spout发过来的字符串,并分割成word,发射给下一个bolt。

点击(此处)折叠或打开

  1. import org.apache.commons.lang.StringUtils;
  2. import backtype.storm.topology.BasicOutputCollector;
  3. import backtype.storm.topology.OutputFieldsDeclarer;
  4. import backtype.storm.topology.base.BaseBasicBolt;
  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Tuple;
  7. import backtype.storm.tuple.Values;
  8. public class WordSpliter extends BaseBasicBolt {
  9. private static final long serialVersionUID = -5653803832498574866L;
  10. @Override
  11. public void execute(Tuple input, BasicOutputCollector collector) {
  12. String line = input.getString(0);
  13. String[] words = line.split(\" \");
  14. for (String word : words) {
  15. word = word.trim();
  16. if (StringUtils.isNotBlank(word)) {
  17. word = word.toLowerCase();
  18. collector.emit(new Values(word));
  19. }
  20. }
  21. }
  22. @Override
  23. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  24. declarer.declare(new Fields(\"word\"));
  25. }
  26. }

3,定义一个bolt,接收word,并统计。

点击(此处)折叠或打开

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. import java.util.Map.Entry;
  4. import backtype.storm.task.TopologyContext;
  5. import backtype.storm.topology.BasicOutputCollector;
  6. import backtype.storm.topology.OutputFieldsDeclarer;
  7. import backtype.storm.topology.base.BaseBasicBolt;
  8. import backtype.storm.tuple.Tuple;
  9. public class WordCounter extends BaseBasicBolt {
  10. private static final long serialVersionUID = 5683648523524179434L;
  11. private HashMap<String, Integer> counters = new HashMap<String, Integer>();
  12. private volatile boolean edit = false;
  13. @Override
  14. @SuppressWarnings(\"rawtypes\")
  15. public void prepare(Map stormConf, TopologyContext context) {
  16. final long timeOffset = Long.parseLong(stormConf.get(\"TIME_OFFSET\").toString());
  17. new Thread(new Runnable() {
  18. @Override
  19. public void run() {
  20. while (true) {
  21. if (edit) {
  22. for (Entry<String, Integer> entry : counters.entrySet()) {
  23. System.out.println(entry.getKey() + \" : \" + entry.getValue());
  24. }
  25. System.out.println(\"WordCounter---------------------------------------\");
  26. edit = false;
  27. }
  28. try {
  29. Thread.sleep(timeOffset * 1000);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. }).start();
  36. }
  37. @Override
  38. public void execute(Tuple input, BasicOutputCollector collector) {
  39. String str = input.getString(0);
  40. if (!counters.containsKey(str)) {
  41. counters.put(str, 1);
  42. } else {
  43. Integer c = counters.get(str) + 1;
  44. counters.put(str, c);
  45. }
  46. edit = true;
  47. System.out.println(\"WordCounter+++++++++++++++++++++++++++++++++++++++++++\");
  48. }
  49. @Override
  50. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  51. }
  52. }

注意WordCounter类的prepare方法,里面定义了一个Thread,持续监控容器的变化(word个数增加或者新增word)。

4,定义一个Topology,提交作业。

点击(此处)折叠或打开

  1. public class WordCountTopo {
  2. public static void main(String[] args) {
  3. if (args.length != 2) {
  4. System.err.println(\"Usage: inputPaht timeOffset\");
  5. System.err.println(\"such as : java -jar WordCount.jar D://input/ 2\");
  6. System.exit(2);
  7. }
  8. TopologyBuilder builder = new TopologyBuilder();
  9. builder.setSpout(\"word-reader\", new WordReader());
  10. builder.setBolt(\"word-spilter\", new WordSpliter()).shuffleGrouping(\"word-reader\");
  11. builder.setBolt(\"word-counter\", new WordCounter()).shuffleGrouping(\"word-spilter\");
  12. String inputPaht = args[0];
  13. String timeOffset = args[1];
  14. Config conf = new Config();
  15. conf.put(\"INPUT_PATH\", inputPaht);
  16. conf.put(\"TIME_OFFSET\", timeOffset);
  17. conf.setDebug(false);
  18. LocalCluster cluster = new LocalCluster();
  19. cluster.submitTopology(\"WordCount\", conf, builder.createTopology());
  20. }
  21. }

5,代码完成后,导出jar(导出时不要指定Main class),然后上传至storm集群,通过命令./storm jar com.x.x.WordCountTopo /data/tianzhen/input 2来提交作业。
Topo启动,spout、bolt执行过程:

Thread监控的统计结果:

源文件处理之后被重命名为*.bak。

和Hadoop不同,在任务执行完之后,Topo不会停止,spout会一直监控数据源,不停地往bolt发射数据。
所以现在如果源数据发生变化,应该能够立马体现出来。我往path下再放一个文本文件,结果:

可见,结果立刻更新了,storm的实时性就体现在这里

时间: 2024-10-13 08:41:37

storm wordcount实例的相关文章

Hadoop3 在eclipse中访问hadoop并运行WordCount实例

前言:       毕业两年了,之前的工作一直没有接触过大数据的东西,对hadoop等比较陌生,所以最近开始学习了.对于我这样第一次学的人,过程还是充满了很多疑惑和不解的,不过我采取的策略是还是先让环境跑起来,然后在能用的基础上在多想想为什么.       通过这三个礼拜(基本上就是周六周日,其他时间都在加班啊T T)的探索,我目前主要完成的是: 1.在Linux环境中伪分布式部署hadoop(SSH免登陆),运行WordCount实例成功. http://www.cnblogs.com/Pur

wordcount实例

scala的wordcount实例 package com.wondersgroup.myscala import scala.actors.{Actor, Future} import scala.collection.mutable.ListBuffer import scala.io.Source //首先统计每个文本中出现的频率=>汇总 case class SubmitTask(f:String) case object StopTask //统计一个文本中单词出现的次数 class

Storm wordcount Read from file

source code: package stormdemo; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.HashMap; import jav

Storm WordCount Topology详解

1 package org.apache.storm.storm_core; 2 3 import java.util.Map; 4 5 import backtype.storm.task.OutputCollector; 6 import backtype.storm.task.TopologyContext; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.topology.bas

Storm可靠性实例解析——ack机制

对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性 很显然,要做到这个特性,必须要track每个data的去向和结果.Storm是如何做到的呢——acker机制. 先概括下acker所参与的工作流程: Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪: Bolt在处理Tuple成功或失败后,也会发一个消息通知acker: acker会找到发射该Tuple的Spout,回调其ack或fail方法. 我们说RichBolt和Basi

Storm WordCount

特别注意,在本地运行的时候应该去掉<scope>provided</scope>,否则会报java.lang.ClassNotFoundException: org.apache.storm.topology.IRichSpout 集群环境中运行的时候应该加上 在这个例子中,有一个spout,两个bolt,也就是说这个任务分为两步.spout随机发送一句话到stream,而SplitBolt负责将其分隔成一个一个单词,CountBolt负责计数.运行的时候,spout的并行数是3,

Spark编程环境搭建及WordCount实例

基于Intellij IDEA搭建Spark开发环境搭建 基于Intellij IDEA搭建Spark开发环境搭——参考文档 ● 参考文档http://spark.apache.org/docs/latest/programming-guide.html ● 操作步骤 ·a)创建maven 项目 ·b)引入依赖(Spark 依赖.打包插件等等) 基于Intellij IDEA搭建Spark开发环境—maven vs sbt ● 哪个熟悉用哪个 ● Maven也可以构建scala项目 基于Inte

【Flink】Flink基础之WordCount实例(Java与Scala版本)

简述 WordCount(单词计数)作为大数据体系的标准示例,一直是入门的经典案例,下面用java和scala实现Flink的WordCount代码: 采用IDEA + Maven + Flink 环境:文末附 pom 文件和相关技术点总结: Java批处理版本 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apa

Ubuntu14.04安装配置Hadoop2.6.0(完全分布式)与 wordcount实例运行

我的环境是:Ubuntu14.04+Hadoop2.6.0+JDK1.8.0_25 官网2.6.0的安装教程:http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-common/SingleCluster.html 为了方面配置,我在每台机器上都使用了hadoop用户来操作,这样做的确够方便. 结点信息:(分布式集群架构:master为主节点,其余为从节点) 机器名 IP 作用 master 122.205.135.254