Storm之路-WordCount-实例

初学storm,有不足的地方还请纠正。

网上看了很多wordcount实例,发现都不是我想要的。

实现场景:统计shengjing.txt词频到集合,一次打印结果。

● 消息源Spout
  继承BaseRichSpout类 / 实现IRichSpout接口
    open,初始化动作;
    nextTuple,消息接入,执行数据发射;
    ack,tuple成功处理后调用;
    fail,tuple处理失败后调用;
    declareOutputFields,声明输出字段;

● 处理单元Bolt
  继承BaseBasicBolt类 / BaseWindowedBolt / 实现IRichBolt接口
    prepare,worker启动时初始化;
    execute,接受一个tuple / tupleWindow并执行逻辑处理,发射出去;
    cleanup,关闭前调用;
    declareOutputFiedls,字段申明;

● 项目结构

● pom.xml文件,配置项目jar依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.scps.storm</groupId>
    <artifactId>storm-example</artifactId>
    <version>0.0.1</version>
    <name>storm.example</name>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>
</project>

● WordTopology.java文件,入口类,实例Topology、Spout、Bolt,配置等

 1 package com.scps.storm.helloword;
 2
 3 import java.util.concurrent.TimeUnit;
 4
 5 import org.apache.storm.Config;
 6 import org.apache.storm.LocalCluster;
 7 import org.apache.storm.StormSubmitter;
 8 import org.apache.storm.generated.AlreadyAliveException;
 9 import org.apache.storm.generated.AuthorizationException;
10 import org.apache.storm.generated.InvalidTopologyException;
11 import org.apache.storm.topology.TopologyBuilder;
12 import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
13 import org.apache.storm.tuple.Fields;
14
15 import com.scps.storm.helloword.bolt.SlidingWindowBolt;
16 import com.scps.storm.helloword.bolt.WordCountBolt;
17 import com.scps.storm.helloword.bolt.WordFinalBolt;
18 import com.scps.storm.helloword.bolt.WordSplitBolt;
19 import com.scps.storm.helloword.spout.WordReaderSpout;
20
21 public class WordTopology {
22
23     public static void main(String[] args) {
24
25         TopologyBuilder builder = new TopologyBuilder();
26
27         // 1个task去读文件
28         builder.setSpout("word-reader", new WordReaderSpout(), 1);
29
30         // 2个task分割行
31         builder.setBolt("word-split", new WordSplitBolt(), 2).shuffleGrouping("word-reader");
32
33         // 2个task分批统计,并发送相同的word到同一个task
34         builder.setBolt("word-count", new WordCountBolt(), 2).fieldsGrouping("word-split", new Fields("word"));
35
36         // 1个task汇总,每隔3秒统计最近5秒的tuple,SlidingWindow滑动窗口(间隔)
37         // builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count");
38         // 1个task汇总,统计5秒内的tuple,不能超过15秒?提示超时错误,TumblingWindow滚动窗口
39         builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withTumblingWindow(new Duration(5, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count");
40
41         // 1个task输出
42         builder.setBolt("word-final", new WordFinalBolt(), 1).shuffleGrouping("sliding-window-bolt");
43
44         Config conf = new Config();
45
46         conf.setDebug(false);
47
48         if (args != null && args.length > 0) {
49
50             // 在集群运行,需要mvn package编译
51             // bin/storm jar "/root/storm-example-0.0.1.jar" com.scps.storm.helloword.WordTopology "http://nimbus:8080/uploads/shengjing.txt" wordcount
52
53             try {
54
55                 String file = args[0];
56                 String name = args[1];
57
58                 conf.put("file", file);
59                 // conf.setNumWorkers(2);
60
61                 StormSubmitter.submitTopology(name, conf, builder.createTopology());
62
63             } catch (AlreadyAliveException e) {
64
65                 e.printStackTrace();
66
67             } catch (InvalidTopologyException e) {
68
69                 e.printStackTrace();
70
71             } catch (AuthorizationException e) {
72
73                 e.printStackTrace();
74             }
75
76         } else {
77
78             // 直接在eclipse中运行
79
80             conf.put("file", "C:\\Users\\Administrator\\Downloads\\shengjing1.txt");
81             // conf.put("file", "http://192.168.100.170:8080/uploads/shengjing.txt");
82             // conf.setMaxTaskParallelism(2); // 设置最大task数
83             LocalCluster cluster = new LocalCluster();
84             cluster.submitTopology("wordcount", conf, builder.createTopology());
85         }
86     }
87 }

● WordReaderSpout.java文件,读取txt文件,发送行

  1 package com.scps.storm.helloword.spout;
  2
  3 import java.io.BufferedReader;
  4 import java.io.FileInputStream;
  5 import java.io.FileNotFoundException;
  6 import java.io.IOException;
  7 import java.io.InputStream;
  8 import java.io.InputStreamReader;
  9 import java.io.UnsupportedEncodingException;
 10 import java.net.MalformedURLException;
 11 import java.net.URL;
 12 import java.net.URLConnection;
 13 import java.text.SimpleDateFormat;
 14 import java.util.Date;
 15 import java.util.Map;
 16
 17 import org.apache.storm.spout.SpoutOutputCollector;
 18 import org.apache.storm.task.TopologyContext;
 19 import org.apache.storm.topology.IRichSpout;
 20 import org.apache.storm.topology.OutputFieldsDeclarer;
 21 import org.apache.storm.tuple.Fields;
 22 import org.apache.storm.tuple.Values;
 23 import org.apache.storm.utils.Utils;
 24
 25 public class WordReaderSpout implements IRichSpout {
 26
 27     private static final long serialVersionUID = 1L;
 28     private SpoutOutputCollector outputCollector;
 29     private String filePath;
 30     private boolean completed = false;
 31
 32     public void ack(Object arg0) {
 33
 34     }
 35
 36     public void activate() {
 37
 38     }
 39
 40     public void close() {
 41
 42     }
 43
 44     public void deactivate() {
 45
 46     }
 47
 48     public void fail(Object arg0) {
 49
 50     }
 51
 52     @SuppressWarnings("rawtypes")
 53     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 54
 55         filePath = conf.get("file").toString();
 56         outputCollector = collector;
 57     }
 58
 59     public void nextTuple() {
 60
 61         if (!completed) {
 62
 63             String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
 64             System.out.println("WordReaderSpout nextTuple, " + time);
 65
 66             String line = "";
 67             InputStream inputStream = null;
 68             InputStreamReader inputStreamReader = null;
 69             BufferedReader reader = null;
 70
 71             try {
 72
 73                 // filePath = "http://192.168.100.170:8080/uploads/shengjing.txt";
 74                 // filePath = "C:\\Users\\Administrator\\Downloads\\shengjing.txt";
 75
 76                 if (filePath.startsWith("http://")) { // 远程文件
 77                     URL url = new URL(filePath);
 78                     URLConnection urlConn = url.openConnection();
 79                     inputStream = urlConn.getInputStream();
 80                 } else { // 本地文件
 81                     inputStream = new FileInputStream(filePath);
 82                 }
 83
 84                 inputStreamReader = new InputStreamReader(inputStream, "utf-8");
 85                 reader = new BufferedReader(inputStreamReader);
 86                 while ((line = reader.readLine()) != null) {
 87                     outputCollector.emit(new Values(line));
 88                 }
 89
 90             } catch (MalformedURLException e) {
 91                 e.printStackTrace();
 92             } catch (FileNotFoundException e) {
 93                 e.printStackTrace();
 94             } catch (UnsupportedEncodingException e) {
 95                 e.printStackTrace();
 96             } catch (IOException e) {
 97                 e.printStackTrace();
 98             } finally {
 99                 completed = true;
100                 try {
101                     if (reader != null) {
102                         reader.close();
103                     }
104                     if (inputStreamReader != null) {
105                         inputStreamReader.close();
106                     }
107                     if (inputStream != null) {
108                         inputStream.close();
109                     }
110                 } catch (IOException e) {
111                     e.printStackTrace();
112                 }
113             }
114         }
115
116         Utils.sleep(20000);
117     }
118
119     public void declareOutputFields(OutputFieldsDeclarer declarer) {
120
121         declarer.declare(new Fields("line"));
122     }
123
124     public Map<String, Object> getComponentConfiguration() {
125
126         return null;
127     }
128 }

使用集群测试时,先把txt文件上传到nimbus的ui里,随机指派supervisor远程读取文件。

● WordSplitBolt.java文件,接收行,分割行,发送词

 1 package com.scps.storm.helloword.bolt;
 2
 3 import java.util.Map;
 4
 5 import org.apache.storm.task.OutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.IRichBolt;
 8 import org.apache.storm.topology.OutputFieldsDeclarer;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
11 import org.apache.storm.tuple.Values;
12
13 public class WordSplitBolt implements IRichBolt {
14
15     private static final long serialVersionUID = 1L;
16     private OutputCollector outputCollector;
17
18     @SuppressWarnings("rawtypes")
19     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
20
21         outputCollector = collector;
22     }
23
24     public void execute(Tuple input) {
25
26         String line = input.getStringByField("line");
27
28         line = line.trim();
29         line = line.replace(",", " ");
30         line = line.replace(".", " ");
31         line = line.replace(":", " ");
32         line = line.replace(";", " ");
33         line = line.replace("?", " ");
34         line = line.replace("!", " ");
35         line = line.replace("(", " ");
36         line = line.replace(")", " ");
37         line = line.replace("[", " ");
38         line = line.replace("]", " ");
39         line = line.trim();
40
41         String[] words = line.split(" ");
42         for (String word : words) {
43             word = word.trim();
44             if (!"".equals(word)) {
45                 outputCollector.emit(new Values(word));
46             }
47         }
48     }
49
50     public void declareOutputFields(OutputFieldsDeclarer declarer) {
51
52         declarer.declare(new Fields("word"));
53     }
54
55     public void cleanup() {
56
57     }
58
59     public Map<String, Object> getComponentConfiguration() {
60
61         return null;
62     }
63 }

● WordCountBolt.java文件,接收词,统计词,发送集合

 1 package com.scps.storm.helloword.bolt;
 2
 3 import java.util.HashMap;
 4 import java.util.Map;
 5
 6 import org.apache.storm.task.OutputCollector;
 7 import org.apache.storm.task.TopologyContext;
 8 import org.apache.storm.topology.IRichBolt;
 9 import org.apache.storm.topology.OutputFieldsDeclarer;
10 import org.apache.storm.tuple.Fields;
11 import org.apache.storm.tuple.Tuple;
12 import org.apache.storm.tuple.Values;
13
14 public class WordCountBolt implements IRichBolt {
15
16     private static final long serialVersionUID = 1L;
17     Map<String, Integer> counter;
18     private OutputCollector outputCollector;
19
20     @SuppressWarnings("rawtypes")
21     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
22
23         counter = new HashMap<String, Integer>();
24         outputCollector = collector;
25     }
26
27     public void execute(Tuple input) {
28
29         String word = input.getStringByField("word");
30         int count;
31
32         if (!counter.containsKey(word)) {
33             count = 1;
34         } else {
35             count = counter.get(word) + 1;
36         }
37
38         counter.put(word, count);
39         outputCollector.emit(new Values(word, count));
40     }
41
42     public void declareOutputFields(OutputFieldsDeclarer declarer) {
43
44         declarer.declare(new Fields("word", "count"));
45     }
46
47     public void cleanup() {
48
49     }
50
51     public Map<String, Object> getComponentConfiguration() {
52
53         return null;
54     }
55 }

● SlidingWindowBolt.java文件,接收集合,合并集合,发送集合

 1 package com.scps.storm.helloword.bolt;
 2
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.HashMap;
 6 import java.util.Map;
 7
 8 import org.apache.storm.task.OutputCollector;
 9 import org.apache.storm.task.TopologyContext;
10 import org.apache.storm.topology.OutputFieldsDeclarer;
11 import org.apache.storm.topology.base.BaseWindowedBolt;
12 import org.apache.storm.tuple.Fields;
13 import org.apache.storm.tuple.Tuple;
14 import org.apache.storm.tuple.Values;
15 import org.apache.storm.windowing.TupleWindow;
16
17 public class SlidingWindowBolt extends BaseWindowedBolt {
18
19     private static final long serialVersionUID = 1L;
20     Map<String, Integer> counter;
21     private OutputCollector outputCollector;
22
23     @SuppressWarnings("rawtypes")
24     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
25
26         counter = new HashMap<String, Integer>();
27         outputCollector = collector;
28     }
29
30     public void execute(TupleWindow inputWindow) {
31
32         String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
33         System.out.println("SlidingWindowBolt execute, " + time);
34
35         for (Tuple input : inputWindow.get()) {
36
37             String word = input.getStringByField("word");
38             int count = input.getIntegerByField("count");
39
40             counter.put(word, count);
41         }
42
43         outputCollector.emit(new Values(counter));
44     }
45
46     public void declareOutputFields(OutputFieldsDeclarer declarer) {
47
48         declarer.declare(new Fields("counter"));
49     }
50 }

● WordFinalBolt.java文件,接收集合,打印集合

 1 package com.scps.storm.helloword.bolt;
 2
 3 import java.text.SimpleDateFormat;
 4 import java.util.ArrayList;
 5 import java.util.Collections;
 6 import java.util.Date;
 7 import java.util.List;
 8 import java.util.Map;
 9
10 import org.apache.storm.task.OutputCollector;
11 import org.apache.storm.task.TopologyContext;
12 import org.apache.storm.topology.IRichBolt;
13 import org.apache.storm.topology.OutputFieldsDeclarer;
14 import org.apache.storm.tuple.Tuple;
15
16 public class WordFinalBolt implements IRichBolt {
17
18     private static final long serialVersionUID = 1L;
19
20     @SuppressWarnings("rawtypes")
21     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
22
23     }
24
25     @SuppressWarnings("unchecked")
26     public void execute(Tuple input) {
27
28         Map<String, Integer> counter = (Map<String, Integer>) input.getValueByField("counter");
29         List<String> keys = new ArrayList<String>();
30         keys.addAll(counter.keySet());
31         Collections.sort(keys);
32         String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
33         System.out.println("-----------------begin------------------, " + time);
34         for (String key : keys) {
35             System.out.println(key + " : " + counter.get(key));
36         }
37         System.out.println("-----------------end--------------------, " + time);
38     }
39
40     public void cleanup() {
41
42     }
43
44     public void declareOutputFields(OutputFieldsDeclarer declarer) {
45
46     }
47
48     public Map<String, Object> getComponentConfiguration() {
49
50         return null;
51     }
52 }

● 项目源码文件地址:https://pan.baidu.com/s/1mhZtvq4 密码:ypbc

时间: 2024-10-12 22:38:28

Storm之路-WordCount-实例的相关文章

Hadoop3 在eclipse中访问hadoop并运行WordCount实例

前言:       毕业两年了,之前的工作一直没有接触过大数据的东西,对hadoop等比较陌生,所以最近开始学习了.对于我这样第一次学的人,过程还是充满了很多疑惑和不解的,不过我采取的策略是还是先让环境跑起来,然后在能用的基础上在多想想为什么.       通过这三个礼拜(基本上就是周六周日,其他时间都在加班啊T T)的探索,我目前主要完成的是: 1.在Linux环境中伪分布式部署hadoop(SSH免登陆),运行WordCount实例成功. http://www.cnblogs.com/Pur

storm实战之WordCount

一,环境搭建 eclipse的项目的创键和jar包的导入. 二,代码编写 1,组件spout的代码编写,用来发射数据源. package com; import java.util.Map; import java.util.Random; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topolog

wordcount实例

scala的wordcount实例 package com.wondersgroup.myscala import scala.actors.{Actor, Future} import scala.collection.mutable.ListBuffer import scala.io.Source //首先统计每个文本中出现的频率=>汇总 case class SubmitTask(f:String) case object StopTask //统计一个文本中单词出现的次数 class

storm wordcount实例

在storm环境部署完毕,并正确启动之后,现在就可以真正进入storm开发了,按照惯例,以wordcount作为开始.这个例子很简单,核心组件包括:一个spout,两个bolt,一个Topology.spout从一个路径读取文件,然后readLine,向bolt发射,一个文件处理完毕后,重命名,以不再重复处理.第一个bolt将从spout接收到的字符串按空格split,产生word,发射给下一个bolt.第二个bolt接收到word后,统计.计数,放到HashMap<string, intege

Storm手写WordCount

建立一个maven项目,在pom.xml中进行如下配置: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0

storm+kafka:WordCount程序

简单的输入输出做完了,来点复杂点儿的场景:从某个topic定于消息,然后根据空格分词,统计单词数量,然后将当前输入的单词数量推送到另一个topic.  首先规划需要用到的类:  从KafkaSpout接收数据并进行处理的backtype.storm.spout.Scheme子类: 数据切分bolt:SplitSentenceBolt: 计数bolt:WordCountBolt: 报表bolt:ReportBolt: topology定义:WordCountTopology: 最后再加一个原样显示

Spark编程环境搭建及WordCount实例

基于Intellij IDEA搭建Spark开发环境搭建 基于Intellij IDEA搭建Spark开发环境搭——参考文档 ● 参考文档http://spark.apache.org/docs/latest/programming-guide.html ● 操作步骤 ·a)创建maven 项目 ·b)引入依赖(Spark 依赖.打包插件等等) 基于Intellij IDEA搭建Spark开发环境—maven vs sbt ● 哪个熟悉用哪个 ● Maven也可以构建scala项目 基于Inte

【Flink】Flink基础之WordCount实例(Java与Scala版本)

简述 WordCount(单词计数)作为大数据体系的标准示例,一直是入门的经典案例,下面用java和scala实现Flink的WordCount代码: 采用IDEA + Maven + Flink 环境:文末附 pom 文件和相关技术点总结: Java批处理版本 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apa

python之路——爬虫实例

urlController.py import bsController from urllib import request class SpiderMain(object): def __init__(self): self.header = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11', 'Ac