第一个非常简单的storm topology的提交运行

配置好storm之后就可以开始在eclipse里面写topology了。

下面是我在网上搜到的一个简单的例子,我按照自己的理解注释了一下。

第一步,创建mvn工程

这是pom.xml文件

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3   <modelVersion>4.0.0</modelVersion>
 4
 5   <groupId>cn.aeths.storm</groupId>
 6   <artifactId>storm001</artifactId>
 7   <version>0.0.1-SNAPSHOT</version>
 8   <packaging>jar</packaging>
 9
10   <name>storm001</name>
11   <url>http://maven.apache.org</url>
12
13   <properties>
14     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15   </properties>
16
17   <dependencies>
18      <dependency>
19       <groupId>org.apache.storm</groupId>
20       <artifactId>storm-core</artifactId>
21       <version>0.9.2-incubating</version>
22       <scope>provided</scope>
23     </dependency>
24     <dependency>
25       <groupId>junit</groupId>
26       <artifactId>junit</artifactId>
27       <version>3.8.1</version>
28       <scope>test</scope>
29     </dependency>
30   </dependencies>
31
32   <build>
33     <plugins>
34       <plugin>
35         <artifactId>maven-assembly-plugin</artifactId>
36         <version>2.4</version>
37         <configuration>
38           <descriptorRefs>
39             <descriptorRef>jar-with-dependencies</descriptorRef>
40           </descriptorRefs>
41         </configuration>
42         <executions>
43           <execution>
44             <id>make-assembly</id>
45             <phase>package</phase>
46             <goals>
47               <goal>single</goal>
48             </goals>
49           </execution>
50         </executions>
51       </plugin>
52     </plugins>
53   </build>
54 </project>

使用的是storm-core-0.9.2-incubating.jar

我没找到这个jar包的源码导致没看到LocalCliuster的源码,也可能是添加的方式有问题

第二步,创建spout输入tuple

这是RandomSpout.java类的代码

 1 package cn.aeths.storm.storm001.spout;
 2
 3 import java.util.Map;
 4 import java.util.Random;
 5
 6 import backtype.storm.spout.SpoutOutputCollector;
 7 import backtype.storm.task.TopologyContext;
 8 import backtype.storm.topology.OutputFieldsDeclarer;
 9 import backtype.storm.topology.base.BaseRichSpout;
10 import backtype.storm.tuple.Fields;
11 import backtype.storm.tuple.Values;
12
13 //简单的输入spout
14 @SuppressWarnings("serial")
15 public class RandomSpout extends BaseRichSpout {
16     //继承了一个方法少的抽象类
17     //私有属性,一个收集器,收集来自spout的tuple,这个和OutputCollector的区别是可以打id保证都执行一次
18     private SpoutOutputCollector collector;
19     //这是个字符串数组,虚拟的输入源
20     private static String[] words = { "happy", "excited", "angry" };
21
22     //可以当做是worker的初始化的方法,传递的参数分别是配置信息,获取id的参数,处理流的参数。
23     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
24         this.collector = arg2;
25     }
26
27     //调用这个方法就是在要求spout传递流交给收集器
28     public void nextTuple() {
29         //随机的给word赋值
30         String word = words[new Random().nextInt(words.length)];
31         //将流发射到输出,不带id的发射,所以不跟踪,没有ack和fail的调用
32         collector.emit(new Values(word));
33     }
34
35     public void declareOutputFields(OutputFieldsDeclarer arg0) {
36         //声明分发的方式
37         arg0.declare(new Fields("randomstring"));
38     }
39
40 }

第三步,创建bolt处理tuple

这是SequenceBolt.java类的代码

 1 package cn.aeths.storm.storm001.bolt;
 2
 3 import backtype.storm.topology.BasicOutputCollector;
 4 import backtype.storm.topology.OutputFieldsDeclarer;
 5 import backtype.storm.topology.base.BaseBasicBolt;
 6 import backtype.storm.tuple.Tuple;
 7 //这个是简单的bolt类
 8 @SuppressWarnings("serial")
 9 public class SenqueceBolt extends BaseBasicBolt{
10     //继承了一个BaseBasicBolt
11     //这是简单的方式,因为它也是别的类实现的BaseBasicBolt extends BaseComponent implements IBasicBolt
12     //public interface IBasicBolt extends IComponent
13     //总之继承基础的抽象类,实现了其中的两个方法
14
15     public void execute(Tuple input, BasicOutputCollector collector) {
16         //处理输入tuple或者在输入的tuple的基础上emit tuple
17         //在本例中就是获取输入,将它包装一下,打印在命令行
18          String word = (String) input.getValue(0);
19          String out = "I‘m " + word +  "!";
20          System.out.println("out=" + out);
21     }
22
23     public void declareOutputFields(OutputFieldsDeclarer declarer) {
24         //声明拓扑的流的输出约束,这里没用到
25     }
26
27 }

第四步,创建topology

这是FirstTopo.java类的代码

 1 package cn.aeths.storm.storm001.topology;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.topology.TopologyBuilder;
 7 import backtype.storm.utils.Utils;
 8 import cn.aeths.storm.storm001.bolt.SenqueceBolt;
 9 import cn.aeths.storm.storm001.spout.RandomSpout;
10 //拓扑创建的额类
11 public class FirstTopo {
12
13     public static void main(String[] args) throws Exception {
14         //声明拓扑创建的api
15         TopologyBuilder builder = new TopologyBuilder();
16         //设置输入spout,第一个参数是id可以用这个被bolt调用,第二个参数是实例化的spout
17         builder.setSpout("spout", new RandomSpout());
18         //设置一个新的bolt处理流,第一个参数是id可以被其他的消费它的bolt使用,第二个参数是实例化的bolt
19         builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
20         Config conf = new Config();      //方便的为拓扑创建配置,是继承map的,使用setter方法
21         conf.setDebug(false);//关闭调试模式
22         if (args != null && args.length > 0) {
23             //有参数的时候,集群模式
24             conf.setNumWorkers(3);
25             //把本类,本配置,本拓扑提交
26             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
27         } else {
28             //本地模式
29             LocalCluster cluster = new LocalCluster();
30             //提交拓扑,名字自定义
31             cluster.submitTopology("firstTopo", conf, builder.createTopology());
32             //休眠10s
33             Utils.sleep(10000);
34             //杀死拓扑
35             cluster.killTopology("firstTopo");
36             cluster.shutdown();
37         }
38     }
39 }

第五步,提交storm运行

命令行的结果如下:

 1 out=I‘m happy!
 2 out=I‘m excited!
 3 out=I‘m happy!
 4 out=I‘m happy!
 5 out=I‘m excited!
 6 out=I‘m angry!
 7 out=I‘m angry!
 8 out=I‘m happy!
 9 52008 [Thread-8-bolt] INFO  backtype.storm.util - Async loop interrupted!
10 52009 [main] INFO  backtype.storm.daemon.executor - Shut down executor bolt:[2 2
11 ]
12 52009 [main] INFO  backtype.storm.daemon.executor - Shutting down executor spout
13 :[3 3]
14 52010 [Thread-10-spout] INFO  backtype.storm.util - Async loop interrupted!

http://blog.csdn.net/annkie/article/details/6960934
这个文献教会我怎么给eclipse添加jre的源码

这是第一个本地虚拟的集群运行的topology,下面学习一下相对复杂的一个例子。

时间: 2024-10-07 16:01:01

第一个非常简单的storm topology的提交运行的相关文章

Storm Topology Parallelism

Understanding the Parallelism of a Storm Topology What makes a running topology: worker processes, executors and tasks 在一个Strom集群中,实际运行一个topology有三个主要的实体 Worker processes Executors (threads) Tasks 下面是一张草图简单说明他们之间的关系: A worker process executes a subse

Storm笔记整理(三):Storm集群安装部署与Topology作业提交

[TOC] Storm分布式集群安装部署 概述 Storm集群表面类似Hadoop集群.但在Hadoop上你运行的是"MapReduce jobs",在Storm上你运行的是"topologies"."Jobs"和"topologies"是大不同的,一个关键不同是一个MapReduce的Job最终会结束,而一个topology永远处理消息(或直到你kill它). Storm集群有两种节点:控制(master)节点和工作者(wo

javascript之【贪吃蛇系列】第一弹:简单的贪吃蛇实现

参考博客:http://blog.csdn.net/sunxing007/article/details/4187038 以上博客是参考,毕竟第一次做,真让自己盲人摸象做不出来. 不过我在其上做了一些改进,界面等效果看起来更好一些. 下图是在Chrome上运行的效果,但是火狐和IE会不兼容,onkeydown事件不能正确调用 这里用了一张图把贪吃蛇制作过程的思想画了出来,画的有点简陋: 下面就是把代码发上来,上边有详细的解释: <html> <head> <title>

asp.net mvc 之旅—— 第一站 从简单的razor入手

原文:asp.net mvc 之旅-- 第一站 从简单的razor入手 记得2011年mvc3刚出来的时候,我们就有幸将 mvc3 用在我们团购项目上,当时老大让我们用一个星期时间来熟悉mvc,幸好园子里面的老朋友DR 正在写mvc3系列,也恭喜这个系列文章被整理成专题供后来者学习,详见:http://www.cnblogs.com/highend/archive/2011/08/04/aspnet_mvc3_ contents.html,2013年进携程的时候,也开心的看到公司正在将webfo

Storm系列之一——Storm Topology并发

1.是什么构成一个可运行的topology? worker processes(worker进程),executors(线程)和tasks. 一台Storm集群里面的机器可能运行一个或多个worker进程,一个worker进程运行一个特定topology的executors. 一个worker进程可能运行一个或多个executors.每个executor是一个线程.一个executor运行同一个spout或者bolt的一个或多个task. 一个task完成具体的数据处理. 一个worker进程执

Caused by: java.lang.ClassNotFoundException: backtype.storm.topology.IRichSpout

1:初次运行Strom程序出现如下所示的错误,贴一下,方便脑补,也希望帮助到看到的小伙伴: 错误如下所示,主要问题是刚开始使用maven获取jar包的时候需要写<scope>provided</scope>,运行的时候需要把这行注释了即可,这是作用域的问题,开始需要在本地下载jar包,但是在虚拟机运行的时候已经存在这些jar包了,所以再写这句话就冲突了: 1 java.lang.NoClassDefFoundError: backtype/storm/topology/IRichS

storm单词计数 本地运行

import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.io.FileUtils; import backtype.storm.Config; imp

第一章:C语言概述及如何上机运行C程序

第一章:C语言概述及如何上机运行C程序 让编程改变世界 Change the world by program C语言概述 C语言的发展过程 C语言是在 70 年代初问世的.一九七八年由美国电话电报公司(AT&T)贝尔实验室正式发表了C语言.同时由B.W.Kernighan和D.M.Ritchit合著了著名的“THE C PROGRAMMING LANGUAGE”一书.通常简称为<K&R>,也有人称之为<K&R>标准.但是,在<K&R>中

c# 单链表实现 简单示例(可复制直接运行)

最近学习数据结构,发现c# 其实和c 的链表的实现差不多的 下面是一段可直接运行的代码 1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using System.Threading; 5 6 namespace SingleLinkedList 7 { 8 class Program 9 { 10 static void Main(string[] args) 11 { 12 13 //实例调用