Storm学习笔记(1)Hello WordCount - 单机模式

古人云,纸上得来终觉浅,绝知此事要躬行。翻译过来,就是学东西哪有不踩坑的。

因为工作原因要折腾Storm,环境和第一个例子折腾了好久,搞完了回头看,吐血的简单。

Storm有两种模式,单机和集群。入门当然选单机。

1、安装JDK,配置Eclipse环境

2、建立一个Maven工程,在pom.xml加上这段

<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>
     <version>1.1.2</version>
     <scope>compile</scope>

</dependency>

3、通过Maven建立项目和下载依赖包。

其实,所需要的storm-core-1.1.2.jar可以从官网下载的storm包里面的lib目录中找到。

Java在下不熟悉,也就不多说了。

4、参考官方或者各种教程的word-count例子编个代码。

5、在Eclipse里面run起来就可以了。

什么Storm, Zookeeper,其实在这个单机入门例子里面,都是不需要的!

就这么简单。

具体代码来说,官方提供的storm-starter例子中,WordCountTopology.java挺适合入门的。

只是里面有个坑:

官方采用了python作为句子分割blot的实现,但是如果环境不具备的话,一跑就会出错。

就是这段:

public static class SplitSentence extends ShellBolt implements IRichBolt {

public SplitSentence() {
     super("python", "splitsentence.py");
   }

// 其余部分略

可以用这个类来替代:

 1 public static class SplitSentence extends BaseBasicBolt{
 2      @Override
 3      public void execute(Tuple tuple, BasicOutputCollector collector){
 4          // 接收到一个句子
 5          String sentence = tuple.getString(0);
 6          // 把句子切割为单词
 7          StringTokenizer iter = new StringTokenizer(sentence);
 8          // 发送每一个单词
 9          while(iter.hasMoreElements()){
10              collector.emit(new Values(iter.nextToken()));
11          }
12      }
13
14      @Override
15      public void declareOutputFields(OutputFieldsDeclarer declarer){
16          // 定义一个字段
17          declarer.declare(new Fields("word"));
18      }
19
20      @Override
21      public Map<String, Object> getComponentConfiguration() {
22        return null;
23      }
24 } 

Run起来以后,在Eclipse的Console窗口里面可以看到运行的详情。

完整代码如下:

  1 package storm.blueprints;
  2
  3 import org.apache.storm.spout.SpoutOutputCollector;
  4 import org.apache.storm.task.TopologyContext;
  5 import org.apache.storm.topology.OutputFieldsDeclarer;
  6 import org.apache.storm.topology.base.BaseRichSpout;
  7 import org.apache.storm.tuple.Fields;
  8 import org.apache.storm.tuple.Values;
  9
 10 import org.apache.storm.utils.Utils;
 11 import org.slf4j.Logger;
 12 import org.slf4j.LoggerFactory;
 13
 14 import org.apache.storm.Config;
 15 import org.apache.storm.LocalCluster;
 16 import org.apache.storm.StormSubmitter;
 17 import org.apache.storm.task.ShellBolt;
 18
 19 import org.apache.storm.topology.BasicOutputCollector;
 20 import org.apache.storm.topology.IRichBolt;
 21 import org.apache.storm.topology.TopologyBuilder;
 22 import org.apache.storm.topology.base.BaseBasicBolt;
 23
 24 import org.apache.storm.tuple.Tuple;
 25 import java.util.HashMap;
 26 import java.util.Map;
 27
 28
 29 import java.util.*;
 30
 31 public class HelloWordCount
 32 {
 33      public static class RandomSentenceSpout extends BaseRichSpout {
 34            private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);
 35
 36           SpoutOutputCollector _collector;
 37            Random _rand;
 38
 39
 40            @Override
 41            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 42              _collector = collector;
 43              _rand = new Random();
 44            }
 45
 46           @Override
 47            public void nextTuple() {
 48              Utils.waitForMillis(100);//(100);
 49              String[] sentences = new String[]{
 50                      sentence("the cow jumped over the moon"),
 51                      sentence("an apple a day keeps the doctor away"),
 52                      sentence("four score and seven years ago"),
 53                      sentence("snow white and the seven dwarfs"),
 54                      sentence("i am at two with nature")};
 55              final String sentence = sentences[_rand.nextInt(sentences.length)];
 56
 57             LOG.debug("Emitting tuple: {}", sentence);
 58
 59             _collector.emit(new Values(sentence));
 60
 61              System.out.println("***" + sentence);
 62            }
 63
 64           protected String sentence(String input) {
 65              return input;
 66            }
 67
 68           @Override
 69            public void ack(Object id) {
 70            }
 71
 72           @Override
 73            public void fail(Object id) {
 74            }
 75
 76           @Override
 77            public void declareOutputFields(OutputFieldsDeclarer declarer) {
 78              declarer.declare(new Fields("sentence"));
 79            }
 80      }
 81
 82
 83      // 定义个Bolt,用于将句子切分为单词
 84      public static class SplitSentence extends BaseBasicBolt{
 85          @Override
 86          public void execute(Tuple tuple, BasicOutputCollector collector){
 87              // 接收到一个句子
 88              String sentence = tuple.getString(0);
 89              // 把句子切割为单词
 90              StringTokenizer iter = new StringTokenizer(sentence);
 91              // 发送每一个单词
 92              while(iter.hasMoreElements()){
 93                  collector.emit(new Values(iter.nextToken()));
 94              }
 95          }
 96
 97          @Override
 98          public void declareOutputFields(OutputFieldsDeclarer declarer){
 99              // 定义一个字段
100              declarer.declare(new Fields("word"));
101          }
102
103          @Override
104          public Map<String, Object> getComponentConfiguration() {
105            return null;
106          }
107      }
108
109      // 定义一个Bolt,用于单词计数
110      public static class WordCount extends BaseBasicBolt {
111          Map<String, Integer> counts = new HashMap<String, Integer>();
112
113          @Override
114          public void execute(Tuple tuple, BasicOutputCollector collector){
115              String word = tuple.getString(0);
116              Integer count = counts.get(word);
117              if (count == null)
118                count = 0;
119              count++;
120              counts.put(word, count);
121
122              System.out.println(word +"  "+count);
123          }
124
125          @Override
126          public void declareOutputFields(OutputFieldsDeclarer declarer){
127              // 定义两个字段word和count
128              declarer.declare(new Fields("word","count"));
129          }
130      }
131      public static void main(String[] args) throws Exception
132      {
133          System.out.println("main");
134          // 创建一个拓扑
135          TopologyBuilder builder = new TopologyBuilder();
136          // 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5
137          builder.setSpout("Spout", new RandomSentenceSpout(), 5);
138          // 设置slot——“split”,并行度为8,它的数据来源是spout的
139          builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("Spout");
140          // 设置slot——“count”,你并行度为12,它的数据来源是split的word字段
141          builder.setBolt("count", new WordCount(), 12).globalGrouping("split");//, new Fields("word"));
142
143          Config conf = new Config();
144
145              // 本地集群
146              LocalCluster cluster = new LocalCluster();
147
148              System.out.println("LocalCluster");
149
150              // 提交拓扑(该拓扑的名字叫word-count)
151              cluster.submitTopology("word-count", conf, builder.createTopology() );
152
153              System.out.println("submitTopology");
154
155              Utils.waitForSeconds(10);
156              cluster.killTopology("word-count");
157              cluster.shutdown();
158              }
159      }
160
161      public static class Utils {
162
163         public static void waitForSeconds(int seconds) {
164              try {
165                  Thread.sleep(seconds * 1000);
166              } catch (InterruptedException e) {
167              }
168          }
169
170         public static void waitForMillis(long milliseconds) {
171              try {
172                  Thread.sleep(milliseconds);
173              } catch (InterruptedException e) {
174              }
175          }
176      }
177 }

请使用手机"扫一扫"x

原文地址:https://www.cnblogs.com/maksheiev/p/8457775.html

时间: 2024-11-05 18:39:06

Storm学习笔记(1)Hello WordCount - 单机模式的相关文章

storm学习笔记

Storm学习笔记 一.简介 本文使用的Storm版本为1.0.1 Storm是一个免费开源的分布式实时计算系统,它使得可靠地处理无限的数据流更加容易,可以实时的处理Hadoop的批量任务.Storm简单易用,且支持各种主流的程序语言. Storm有很多适用场景:实时分析.在线机器学习.连续计算.分布式RPC.分布式ETL.易扩展.支持容错,可确保你的数据得到处理,易于构建和操控. 下图是Storm"流式数据处理"的概念图,即数据像水流一样从数据源头源源不断的流出,经过每个节点,每个节

storm学习笔记完整记录(一)

storm有两种运行模式(本地模式和集群模式) 1. 首先创建一个类似于HelloWorld的简单程序,以便进入storm的大门,包结构如下: 2.从包结构可以知道,这是一个Maven Project,pom.xml的内容如下: <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    

java/android 设计模式学习笔记(12)---组合模式

这篇我们来介绍一下组合模式(Composite Pattern),它也称为部分整体模式(Part-Whole Pattern),结构型模式之一.组合模式比较简单,它将一组相似的对象看作一个对象处理,并根据一个树状结构来组合对象,然后提供一个统一的方法去访问相应的对象,以此忽略掉对象与对象集合之间的差别.这个最典型的例子就是数据结构中的树了,如果一个节点有子节点,那么它就是枝干节点,如果没有子节点,那么它就是叶子节点,那么怎么把枝干节点和叶子节点统一当作一种对象处理呢?这就需要用到组合模式了. 转

java/android 设计模式学习笔记(9)---代理模式

这篇博客我们来介绍一下代理模式(Proxy Pattern),代理模式也成为委托模式,是一个非常重要的设计模式,不少设计模式也都会有代理模式的影子.代理在我们日常生活中也很常见,比如上网时连接的代理服务器地址,更比如我们平时租房子,将找房子的过程代理给中介等等,都是代理模式在日常生活中的使用例子. 代理模式中的代理对象能够连接任何事物:一个网络连接,一个占用很多内存的大对象,一个文件,或者是一些复制起来代价很高甚至根本不可能复制的一些资源.总之,代理是一个由客户端调用去访问幕后真正服务的包装对象

cocos2dx游戏开发——别踩白块学习笔记(二)——经典模式的实现

一.创建GameScene以及GameLayer 就是简单创建一个Scene而已,在此就不多说啦~,可以参照我的打飞机的学习笔记(2). 二.添加一个开始栏 很简单,就是调用Block中的create方法就可以啦~,只是需要传入大小和颜色等等的参数即可. void GameLayer::addStartLine() { auto block = Block::createWithArgs(Color3B::YELLOW, Size(visibleSize.width,visibleSize.he

Twitter Storm学习笔记

官方英文文档:http://storm.apache.org/documentation/Documentation.html 本文是学习笔记,转载整合加翻译,主要是为了便于学习. 一.基本概念 参考:http://storm.apache.org/documentation/Concepts.html 此段转自:http://xumingming.sinaapp.com/117/twitter-storm%E7%9A%84%E4%B8%80%E4%BA%9B%E5%85%B3%E9%94%AE

java/android 设计模式学习笔记(14)---外观模式

这篇博客来介绍外观模式(Facade Pattern),外观模式也称为门面模式,它在开发过程中运用频率非常高,尤其是第三方 SDK 基本很大概率都会使用外观模式.通过一个外观类使得整个子系统只有一个统一的高层的接口,这样能够降低用户的使用成本,也对用户屏蔽了很多实现细节.当然,在我们的开发过程中,外观模式也是我们封装 API 的常用手段,例如网络模块.ImageLoader 模块等.其实我们在开发过程中可能已经使用过很多次外观模式,只是没有从理论层面去了解它. 转载请注明出处:http://bl

java/android 设计模式学习笔记(10)---建造者模式

这篇博客我们来介绍一下建造者模式(Builder Pattern),建造者模式又被称为生成器模式,是创造性模式之一,与工厂方法模式和抽象工厂模式不同,后两者的目的是为了实现多态性,而 Builder 模式的目的则是为了将对象的构建与展示分离.Builder 模式是一步一步创建一个复杂对象的创建型模式,它允许用户在不知道内部构建细节的情况下,可以更精细地控制对象的构造流程.一个复杂的对象有大量的组成部分,比如汽车它有车轮.方向盘.发动机.以及各种各样的小零件,要将这些部件装配成一辆汽车,这个装配过

《大话设计模式》学习笔记系列--1. 简单工厂模式

简单工厂模式实现了一种"工厂"概念的面向对象设计模式,它可以在不指定对象具体类型的情况下创建对象.其实质是定义一个创建对象的接口,但让实现这个接口的类来决定实例化具体类.工厂方法让类的实例化推迟到子类中进行. 以书本上的计算器程序为例,其UML描述如下: 图中,AddOperator, SubtactOpertor继承算式基类Operator,而CreateOperator则是负责创建一个操作类,而不指明具体的子类类型. 下面,我们来看代码: 首先操作积累: /// <summa