JStorm第一个程序WordCount详解

一、Strom基本知识(回顾)

1,首先明确Storm各个组件的作用,包括Nimbus,Supervisor,Spout,Bolt,Task,Worker,Tuple

  • nimbus是整个storm任务的管理者,并不实际进行工作。负责在集群中分发代码,对节点分配任务,并监视主机故障。
  • supervisor是实际进行工作的节点,负责监听工作节点上已经分配的主机作业,启动和停止Nimbus已经分配的工作进程。
  • Worker是具体处理Spout/Bolt逻辑的进程,worker数量由拓扑中的conf.setNumWorkers来定义,storm会在每个Worker上均匀分配任务,一个Worker只能执行一个topology,但是可以执行其中的多个任务线程。
  • 一个worker是一个进程,里面可以同时运行多个线程,这些线程就是task

参考:http://blog.csdn.net/cuihaolong/article/details/52652686(storm各个节点介绍和容错机制)

2,一个简单的storm程序的基本流程是:spout作为数据源(可以来自hdfs,hbase等,也可以自发产生数据,比如wordcount这个例子)传送给bolt,bolt对数据进行处理,传给其它bolt或者直接输出。他们之间传送的数据是Tuple,可以成为数据元组。

3,Storm运行模式:

  • 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试。
  • 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。

二、WordCount详解

程序描述

  • spout随机发送一个准备好的字符串数组里面的一个字符串(sentence)
  • 第一层bolt,splitBolt负责对spout发过来的数据(sentence)进行split,分解成独立的单词,并按照一定的规则发往下一层bolt处理
  • 第二层bolt,接受第一层bolt传过来的数据,并对各个单词进行数量计算

程序流程

  • spout数据源
  • bolt1进行split操作
  • bolt2进行count操作
  • Topolgy运行程序

0,WordCountTopology类:创建拓扑,运行程序

 1 package act.chenkh.study.jstormPlay;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.topology.TopologyBuilder;
 7 import backtype.storm.tuple.Fields;
 8
 9 public class WordCountTopology {
10     public static void main(String[] args) throws Exception {
11         /**第一步,设计一个Topolgy*/
12         TopologyBuilder builder = new TopologyBuilder();
13         /*
14          * 设置spout和bolt,完整参数为spout的id(即name),spout对象,并发数(此项没有默认null)
15          */
16         //setSpout
17         builder.setSpout("sentence-spout",new RandomSentenceSpout(),1);
18         //setBolt:SplitBolt的grouping策略是上层随机分发,CountBolt的grouping策略是按照上层字段分发
19         //如果想要从多个Bolt获取数据,可以继续设置grouping
20         builder.setBolt("split-bolt", new SplitBolt(),1)
21             .shuffleGrouping("sentence-spout");
22         builder.setBolt("count-bolt", new CountBolt(),1)
23             .fieldsGrouping("split-bolt", new Fields("word"));
24         /**第二步,进行基本配置*/
25         Config conf = new Config();
26         //作用和影响???????????
27         conf.setDebug(true);
28         if (args != null && args.length > 0) {
29             conf.setNumWorkers(1);
30             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
31             }
32         else {
33             /*
34              * run in local cluster, for test in eclipse.
35              */
36             conf.setMaxTaskParallelism(3);
37             LocalCluster cluster = new LocalCluster();
38             cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
39             Thread.sleep(Integer.MAX_VALUE);
40             cluster.shutdown();
41         }
42     }
43 }

1,RandomSentenceSpout类:产生数据

 1 package act.chenkh.study.jstormPlay;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.topology.TopologyBuilder;
 7 import backtype.storm.tuple.Fields;
 8
 9 public class WordCountTopology {
10     public static void main(String[] args) throws Exception {
11         /**第一步,设计一个Topolgy*/
12         TopologyBuilder builder = new TopologyBuilder();
13         /*
14          * 设置spout和bolt,完整参数为spout的id(即name),spout对象,并发数(此项没有默认null)
15          */
16         //setSpout
17         builder.setSpout("sentence-spout",new RandomSentenceSpout(),1);
18         //setBolt:SplitBolt的grouping策略是上层随机分发,CountBolt的grouping策略是按照上层字段分发
19         //如果想要从多个Bolt获取数据,可以继续设置grouping
20         builder.setBolt("split-bolt", new SplitBolt(),1)
21             .shuffleGrouping("sentence-spout");
22         builder.setBolt("count-bolt", new CountBolt(),1)
23             .fieldsGrouping("split-bolt", new Fields("word"));
24         /**第二步,进行基本配置*/
25         Config conf = new Config();
26         //作用和影响???????????
27         conf.setDebug(true);
28         if (args != null && args.length > 0) {
29             conf.setNumWorkers(1);
30             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
31             }
32         else {
33             /*
34              * run in local cluster, for test in eclipse.
35              */
36             conf.setMaxTaskParallelism(3);
37             LocalCluster cluster = new LocalCluster();
38             cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
39             Thread.sleep(Integer.MAX_VALUE);
40             cluster.shutdown();
41         }
42     }
43 }

2,SplitBolt类:接收上层tuple,进行split,分发给下一层

 1 package act.chenkh.study.jstormPlay;
 2
 3 import java.util.HashMap;
 4 import java.util.Map;
 5
 6 import org.apache.log4j.Logger;
 7
 8 import com.alibaba.jstorm.callback.AsyncLoopThread;
 9 import com.alibaba.jstorm.callback.RunnableCallback;
10
11 import backtype.storm.task.TopologyContext;
12 import backtype.storm.topology.BasicOutputCollector;
13 import backtype.storm.topology.OutputFieldsDeclarer;
14 import backtype.storm.topology.base.BaseBasicBolt;
15 import backtype.storm.tuple.Fields;
16 import backtype.storm.tuple.Tuple;
17 import clojure.inspector__init;
18
19 public class CountBolt extends BaseBasicBolt {
20     Integer id;
21     String name;
22     Map<String, Integer> counters;
23     String component;
24     private static final Logger LOG = Logger.getLogger(CountBolt.class);
25     private AsyncLoopThread statThread;
26     /**
27      * On create
28      */
29     @Override
30     public void prepare(Map stormConf, TopologyContext context) {
31         this.counters = new HashMap<String, Integer>();
32         this.name = context.getThisComponentId();
33         this.id = context.getThisTaskId();
34         this.statThread = new AsyncLoopThread(new statRunnable());
35
36         LOG.info(stormConf.get("abc")+"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
37         component = context.getThisComponentId();
38     }
39
40     public void declareOutputFields(OutputFieldsDeclarer declarer) {
41          declarer.declare(new Fields("word","count"));
42          // declarer.declareStream("coord-"+"word-counter", new Fields("epoch","ebagNum"));
43          // LOG.info("set stream coord-"+component);
44     }
45
46     //接收消息之后被调用的方法
47     public void execute(Tuple input, BasicOutputCollector collector) {
48 //        String str = input.getString(0);
49         String str = input.getStringByField("word");
50         if(!counters.containsKey(str)){
51             counters.put(str, 1);
52         }else{
53             Integer c = counters.get(str) + 1;
54             counters.put(str, c);
55         }
56     }
57     class statRunnable extends RunnableCallback {
58
59         @Override
60         public void run() {
61             while(true){
62                 try {
63                     Thread.sleep(10000);
64                 } catch (InterruptedException e) {
65
66                 }
67                 LOG.info("\n-- Word Counter ["+name+"-"+id+"] --");
68                 for(Map.Entry<String, Integer> entry : counters.entrySet()){
69                     LOG.info(entry.getKey()+": "+entry.getValue());
70                 }
71                 LOG.info("");
72             }
73
74         }
75     }
76
77 }

3,CountBolt类:接收上层tuple,进行count,展示输出

 1 package act.chenkh.study.jstormPlay;
 2
 3 import java.util.HashMap;
 4 import java.util.Map;
 5
 6 import org.apache.log4j.Logger;
 7
 8 import com.alibaba.jstorm.callback.AsyncLoopThread;
 9 import com.alibaba.jstorm.callback.RunnableCallback;
10
11 import backtype.storm.task.TopologyContext;
12 import backtype.storm.topology.BasicOutputCollector;
13 import backtype.storm.topology.OutputFieldsDeclarer;
14 import backtype.storm.topology.base.BaseBasicBolt;
15 import backtype.storm.tuple.Fields;
16 import backtype.storm.tuple.Tuple;
17 import clojure.inspector__init;
18
19 public class CountBolt extends BaseBasicBolt {
20     Integer id;
21     String name;
22     Map<String, Integer> counters;
23     String component;
24     private static final Logger LOG = Logger.getLogger(CountBolt.class);
25     private AsyncLoopThread statThread;
26     /**
27      * On create
28      */
29     @Override
30     public void prepare(Map stormConf, TopologyContext context) {
31         this.counters = new HashMap<String, Integer>();
32         this.name = context.getThisComponentId();
33         this.id = context.getThisTaskId();
34         this.statThread = new AsyncLoopThread(new statRunnable());
35
36         LOG.info(stormConf.get("abc")+"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
37         component = context.getThisComponentId();
38     }
39
40     public void declareOutputFields(OutputFieldsDeclarer declarer) {
41          declarer.declare(new Fields("word","count"));
42          // declarer.declareStream("coord-"+"word-counter", new Fields("epoch","ebagNum"));
43          // LOG.info("set stream coord-"+component);
44     }
45
46     //接收消息之后被调用的方法
47     public void execute(Tuple input, BasicOutputCollector collector) {
48 //        String str = input.getString(0);
49         String str = input.getStringByField("word");
50         if(!counters.containsKey(str)){
51             counters.put(str, 1);
52         }else{
53             Integer c = counters.get(str) + 1;
54             counters.put(str, c);
55         }
56     }
57     class statRunnable extends RunnableCallback {
58
59         @Override
60         public void run() {
61             while(true){
62                 try {
63                     Thread.sleep(10000);
64                 } catch (InterruptedException e) {
65
66                 }
67                 LOG.info("\n-- Word Counter ["+name+"-"+id+"] --");
68                 for(Map.Entry<String, Integer> entry : counters.entrySet()){
69                     LOG.info(entry.getKey()+": "+entry.getValue());
70                 }
71                 LOG.info("");
72             }
73
74         }
75     }
76
77 }

参考:http://fireinwind.iteye.com/blog/2153699(第一个Storm应用)

三、Grouping的几种方式

四、Bolt的声明周期

1、在定义Topology实例过程中,定义好Spout实例和Bolt实例
2、在提交Topology实例给Nimbus的过程中,会调用TopologyBuilder实例的createTopology()方法,以获取定义的Topology实例。在运行createTopology()方法的过程中,会去调用Spout和Bolt实例上的declareOutputFields()方法和getComponentConfiguration()方法,declareOutputFields()方法配置Spout和Bolt实例的输出,getComponentConfiguration()方法输出特定于Spout和Bolt实例的配置参数值对。Storm会将以上过程中得到的实例,输出配置和配置参数值对等数据序列化,然后传递给Nimbus。
3、在Worker Node上运行的thread,从Nimbus上复制序列化后得到的字节码文件,从中反序列化得到Spout和Bolt实例,实例的输出配置和实例的配置参数值对等数据,在thread中Spout和Bolt实例的declareOutputFields()和getComponentConfiguration()不会再运行。
4、在thread中,反序列化得到一个Bolt实例后,它会先运行Bolt实例的prepare()方法,在这个方法调用中,需要传入一个OutputCollector实例,后面使用该OutputCollector实例输出Tuple
5、接下来在该thread中按照配置数量建立task集合,然后在每个task中就会循环调用thread所持有Bolt实例的execute()方法
6、在关闭一个thread时,thread所持有的Bolt实例会调用cleanup()方法
不过如果是强制关闭,这个cleanup()方法有可能不会被调用到

五、Stream里面的Tuple

1,Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。

storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。比如: 你可以把一个tweets流传输到热门话题的流。

storm提供的最基本的处理stream的原语是spout和bolt。你可以实现Spout和Bolt对应的接口以处理你的应用的逻辑。

spout的流的源头。比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。又比如一个spout可以调用twitter的一个api并且把返回的tweets发射成一个流。

通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,之后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。

bolt可以接收任意多个输入stream, 作一些处理, 有些bolt可能还会发射一些新的stream。一些复杂的流转换, 比如从一些tweet里面计算出热门话题, 需要多个步骤, 从而也就需要多个bolt。 Bolt可以做任何事情: 运行函数, 过滤tuple, 做一些聚合, 做一些合并以及访问数据库等等。

Bolt处理输入的Stream,并产生新的输出Stream。Bolt可以执行过滤、函数操作、Join、操作数据库等任何操作。Bolt是一个被动的 角色,其接口中有一个execute(Tuple input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑。

spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象(类似 Job), 你可以把topology提交给storm的集群来运行。topology的结构在Topology那一段已经说过了,这里就不再赘述了。

参考:http://www.cnblogs.com/wuxiang/p/5629138.html(Storm入门原理介绍)

2,Tuple: 消息传递的基本单位。

在spout发送的时候,函数原型

1 public List<Integer> emit(List<Object> tuple, Object messageId) {
2         return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
3     }

这里的tuple, 实际上是List<Object> 对象,返回的是 List<Integer> 是要发送的tast的IdsList

在bolt接收的时候, 变成一个Tuple对象,  结构应该也是一个list, List<Field1, value1, Field2, value2..>这样的一个结构, FieldList ValueList, 我们根据对应的fieldname就可以取出对应的getIntegerByField方法

时间: 2024-08-23 04:55:03

JStorm第一个程序WordCount详解的相关文章

(转载)Hadoop示例程序WordCount详解

最近在学习云计算,研究Haddop框架,费了一整天时间将Hadoop在Linux下完全运行起来,看到官方的map-reduce的demo程序WordCount,仔细研究了一下,算做入门了. 其实WordCount并不难,只是一下子接触到了很多的API,有一些陌生,还有就是很传统的开发相比,map-reduce确实是一种新的编程理念,为了让各位新手少走弯路,我将WordCount中的很多API都做了注释,其实这些方法搞明白了以后程序就很简单了,无非就是将一句话分词,先用map处理再用reduce处

Hadoop示例程序WordCount详解及实例(转)

1.图解MapReduce 2.简历过程: Input: Hello World Bye World Hello Hadoop Bye Hadoop Bye Hadoop Hello Hadoop Map: <Hello,1> <World,1> <Bye,1> <World,1> <Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1> <Bye,1> <Hadoop,

Hadoop集群WordCount详解

Hadoop集群WordCount详解 MapReduce理论介绍 MapReduce处理过程 MapReduce代码 1.MapReduce 理论介绍 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单地说,MapReduce就是"任务的分解与结果的汇总". 在Hadoop中,用于执行MapReduce任务的机器角色有两

Hadoop WordCount详解(二)

Hadoop集群WordCount详解(二) 源代码程序 WordCount处理过程 具体代码讲解 1.源代码程序 package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.WordCount.Token

入木三分学网络第一篇--VRRP协议详解第一篇(转)

因为keepalived使用了VRRP协议,所有有必要熟悉一下. 虚拟路由冗余协议(Virtual Router Redundancy Protocol,简称VRRP)是解决局域网中配置静态网关时,静态网关出现单点失效现象的路由协议. VRRP广泛应用在边缘网络中,它的设计目标是支持特定情况下IP数据流量失败转移不会引起混乱,允许主机使用单路由器(位于一个虚拟路由器组中, 在该组中,只有一台路由器--master路由器工作,转发数据包,其它路由器是backup路由器,不参与转发数据包),以及在实

嵌入式Linux应用程序开发详解------(创建守护进程)

嵌入式Linux应用程序开发详解 华清远见 本文只是阅读文摘. 创建一个守护进程的步骤: 1.创建一个子进程,然后退出父进程: 2.在子进程中使用创建新会话---setsid(): 3.改变当前工作目录---chdir(): 4.重新设置文件权限掩码---umask(): 5.关闭所有的文件描述符---close(fdx): 6.设置daemon程序的任务---此例主要在while循环中体现. 下面是一个例子程序: /* daemon * how to create a daemon proce

程序编译详解

程序编译详解 编译程序读取源程序(字符流),对之进行词法和语法的分析,将高级语言指令转换为功能等效的汇编代码,再由汇编程序转换为机器语言,并且按照操作系统对可执行文件格式的要求链接生成可执行程序.总过程如下: C源程序->编译预处理(对源文件进行词法分析和语法分析,确认符合语法规则)->编译(将其翻译成中间代码或汇编代码)->优化程序(对中间代码等进行优化)->汇编程序(把汇编代码翻译成目标机器指令)->链接程序(将相关的目标文件彼此相连接,形成统一的整体)->可执行文

Java中InvocationHandler接口中第一个参数proxy详解

java动态代理机制中有两个重要的类和接口InvocationHandler(接口)和Proxy(类),这一个类Proxy和接口InvocationHandler是我们实现动态代理的核心: 1.InvocationHandler接口是proxy代理实例的调用处理程序实现的一个接口,每一个proxy代理实例都有一个关联的调用处理程序:在代理实例调用方法时,方法调用被编码分派到调用处理程序的invoke方法. 看下官方文档对InvocationHandler接口的描述: {@code Invocat

JPA学习---第一节:JPA详解

一.详解 JPA JPA(Java Persistence API)是Sun官方提出的Java持久化规范.它为Java开发人员提供了一种对象/关系映射工具来管理Java应用中的关系数据.他的出现主要是为了简化现有的持久化开发工作和整合ORM技术,结束现在Hibernate.TopLink等ORM框架各自为营的局面.值得注意的是,JPA是在充分吸收了现有Hibernate.TopLink等ORM框架的基础上发展而来的,具有易于使用.伸缩性强等优点.从目前的开发社区的反应上看,JPA受到了极大的支持