3. Storm编程框架

实例分析lifeCycle:

RandomWordSpout

  1.  1 package cn.itcast.storm.spout;
     2 import java.util.Map;
     3 import java.util.Random;
     4 import org.apache.commons.logging.Log;
     5 import org.apache.commons.logging.LogFactory;
     6 import backtype.storm.spout.SpoutOutputCollector;
     7 import backtype.storm.task.TopologyContext;
     8 import backtype.storm.topology.OutputFieldsDeclarer;
     9 import backtype.storm.topology.base.BaseRichSpout;
    10 import backtype.storm.tuple.Fields;
    11 import backtype.storm.tuple.Values;
    12 import backtype.storm.utils.Utils;
    13 public class RandomWordSpout extends BaseRichSpout {
    14     private static final long serialVersionUID = -4287209449750623371L;
    15
    16     private static final Log log = LogFactory.getLog(RandomWordSpout.class);
    17     private SpoutOutputCollector collector;
    18
    19     private String[] words = new String[]{"storm", "hadoop", "hive", "flume"};
    20
    21     private Random random = new Random();
    22
    23     public RandomWordSpout() {
    24         log.warn("RandomWordSpout constructor method invoked");
    25     }
    26     @Override
    27     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    28         log.warn("RandomWordSpout open() method invoked");
    29         this.collector = collector;
    30     }
    31     @Override
    32     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    33         log.warn("RandomWordSpout declareOutputFields() method invoked");
    34         declarer.declare(new Fields("str"));
    35     }
    36     @Override
    37     public void nextTuple() {
    38         log.warn("RandomWordSpout nextTuple() method invoked");
    39         Utils.sleep(500);
    40         String str = words[random.nextInt(words.length)];
    41         collector.emit(new Values(str));
    42     }
    43     @Override
    44     public void activate() {
    45         log.warn("RandomWordSpout activate() method invoked");
    46     }
    47     @Override
    48     public void deactivate() {
    49         log.warn("RandomWordSpout deactivate() method invoked");
    50     }
    51 }

TransferBolt

  1.  1 package cn.itcast.storm.bolt;
     2 import java.util.Map;
     3 import org.apache.commons.logging.Log;
     4 import org.apache.commons.logging.LogFactory;
     5 import backtype.storm.task.TopologyContext;
     6 import backtype.storm.topology.BasicOutputCollector;
     7 import backtype.storm.topology.OutputFieldsDeclarer;
     8 import backtype.storm.topology.base.BaseBasicBolt;
     9 import backtype.storm.tuple.Fields;
    10 import backtype.storm.tuple.Tuple;
    11 import backtype.storm.tuple.Values;
    12 public class TransferBolt extends BaseBasicBolt {
    13     private static final long serialVersionUID = 4223708336037089125L;
    14     private static final Log log = LogFactory.getLog(TransferBolt.class);
    15
    16     public TransferBolt() {
    17         log.warn("TransferBolt constructor method invoked");
    18     }
    19
    20     @Override
    21     public void prepare(Map stormConf, TopologyContext context) {
    22         log.warn("TransferBolt prepare() method invoked");
    23     }
    24     @Override
    25     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    26         log.warn("TransferBolt declareOutputFields() method invoked");
    27         declarer.declare(new Fields("word"));
    28     }
    29     @Override
    30     public void execute(Tuple input, BasicOutputCollector collector) {
    31         log.warn("TransferBolt execute() method invoked");
    32         String word = input.getStringByField("str");
    33         collector.emit(new Values(word));
    34     }
    35 }


WriterBolt

  1.  1 package cn.itcast.storm.bolt;
     2 import java.io.FileWriter;
     3 import java.io.IOException;
     4 import java.util.Map;
     5 import org.apache.commons.logging.Log;
     6 import org.apache.commons.logging.LogFactory;
     7 import backtype.storm.task.TopologyContext;
     8 import backtype.storm.topology.BasicOutputCollector;
     9 import backtype.storm.topology.OutputFieldsDeclarer;
    10 import backtype.storm.topology.base.BaseBasicBolt;
    11 import backtype.storm.tuple.Tuple;
    12 public class WriterBolt extends BaseBasicBolt {
    13     private static final long serialVersionUID = -6586283337287975719L;
    14
    15     private static final Log log = LogFactory.getLog(WriterBolt.class);
    16
    17     private FileWriter writer = null;
    18
    19     public WriterBolt() {
    20         log.warn("WriterBolt constructor method invoked");
    21     }
    22     @Override
    23     public void prepare(Map stormConf, TopologyContext context) {
    24         log.warn("WriterBolt prepare() method invoked");
    25         try {
    26             writer = new FileWriter("/home/" + this);
    27         } catch (IOException e) {
    28             log.error(e);
    29             throw new RuntimeException(e);
    30         }
    31     }
    32     @Override
    33     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    34         log.warn("WriterBolt declareOutputFields() method invoked");
    35     }
    36
    37     @Override
    38     public void execute(Tuple input, BasicOutputCollector collector) {
    39         log.warn("WriterBolt execute() method invoked");
    40         String s = input.getString(0);
    41         try {
    42             writer.write(s);
    43             writer.write("\n");
    44             writer.flush();
    45         } catch (IOException e) {
    46             log.error(e);
    47             throw new RuntimeException(e);
    48         }
    49     }
    50 }

TopoMain

  1. package cn.itcast.storm.topology;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import cn.itcast.storm.bolt.TransferBolt;
    import cn.itcast.storm.bolt.WriterBolt;
    import cn.itcast.storm.spout.RandomWordSpout;
    public class TopoMain {
        private static final Log log = LogFactory.getLog(TopoMain.class);
    
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("random", new RandomWordSpout(), 2);
            builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");
            builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));
            Config conf = new Config();
            conf.setNumWorkers(2);
            conf.setDebug(true);
            log.warn("submitting topology...");
            StormSubmitter.submitTopology("life-cycle", conf, builder.createTopology());
            log.warn("topology submitted !");
        }
    }


方法执行顺序:

  • Spout方法调用顺势
  1. declareOutputFields()(调用一次)
  2. open() (调用一次)
  3. activate() (调用一次)
  4. nextTuple()    (循环调用 )
  5. deactivate() (手动调用)
  • Bolt方法调用顺序
  1. declareOutputFields() (调用一次)
  2. prepare() (调用一次)
  3. execute()     (循环执行)

执行日志:

  1. [[email protected] work]# storm jar lifeCycle1.jar cn.itcast.storm.topology.TopoMain
  2. Running:/usr/local/jdk/bin/java -client -Dstorm.options=-Dstorm.home=/usr/local/apache-storm-0.9.4-Dstorm.log.dir=/usr/local/apache-storm-0.9.4/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp /usr/local/apache-storm-0.9.4/lib/clj-time-0.4.1.jar:....-Dstorm.jar=lifeCycle1.jar cn.itcast.storm.topology.TopoMain
  3. 464[main] WARN cn.itcast.storm.spout.RandomWordSpout-RandomWordSpout constructor method invoked #初始化对象,执行构造方法
  4. 490[main] WARN cn.itcast.storm.bolt.TransferBolt-TransferBolt constructor method invoked
  5. 505[main] WARN cn.itcast.storm.bolt.WriterBolt-WriterBolt constructor method invoked
  6. 515[main] WARN cn.itcast.storm.topology.TopoMain- submitting topology...
  7. 516[main] WARN cn.itcast.storm.bolt.TransferBolt-TransferBolt declareOutputFields()method invoked
  8. 906[main] WARN cn.itcast.storm.bolt.WriterBolt-WriterBolt declareOutputFields() method invoked
  9. 909[main] WARN cn.itcast.storm.spout.RandomWordSpout-RandomWordSpout declareOutputFields() method invoked
  10. 1106[main] INFO backtype.storm.StormSubmitter-Jar not uploaded to master yet.Submitting jar...
  11. 1117[main] INFO backtype.storm.StormSubmitter-Uploading topology jar lifeCycle1.jar to assigned location:/tmp/storm/nimbus/inbox/stormjar-13252904-45c2-41e8-8703-957feae2bf27.jar
  12. 1361[main] INFO backtype.storm.StormSubmitter-Successfully uploaded topology jar to assigned location:/tmp/storm/nimbus/inbox/stormjar-13252904-45c2-41e8-8703-957feae2bf27.jar
  13. 1362[main] INFO backtype.storm.StormSubmitter-Submitting topology life-cycle in distributed mode with conf {"topology.workers":2,"topology.debug":true}
  14. 1568[main] INFO backtype.storm.StormSubmitter-Finished submitting topology: life-cycle
  15. 1568[main] WARN cn.itcast.storm.topology.TopoMain- topology submitted !

worker日志

  1. 2015-05-16T17:57:18.295+0800 b.s.d.worker [INFO]Worker6ae03c97-dac4-4ef3-9f10-227de1219b16for storm life-cycle-4-1431770222 on 1360b011-2e64-4964
  2. -9f6c-d849db954ff2:6703 has finished loading
  3. 2015-05-16T17:57:18.797+0800 b.s.d.executor [INFO]Preparing bolt transfer:(5)
  4. 2015-05-16T17:57:18.798+0800 b.s.d.executor [INFO]Preparing bolt writer:(11)
  5. 2015-05-16T17:57:18.812+0800 c.i.s.b.WriterBolt[WARN]WriterBolt prepare() method invoked
  6. 2015-05-16T17:57:18.813+0800 b.s.d.executor [INFO]Prepared bolt writer:(11)
  7. 2015-05-16T17:57:18.820+0800 c.i.s.b.TransferBolt[WARN]TransferBolt prepare() method invoked
  8. 2015-05-16T17:57:18.821+0800 b.s.d.executor [INFO]Prepared bolt transfer:(5)
  9. 2015-05-16T17:57:18.834+0800 b.s.d.executor [INFO]Preparing bolt __system:(-1)
  10. 2015-05-16T17:57:18.834+0800 b.s.d.executor [INFO]Preparing bolt transfer:(7)
  11. 2015-05-16T17:57:18.839+0800 c.i.s.b.TransferBolt[WARN]TransferBolt prepare() method invoked
  12. 2015-05-16T17:57:18.839+0800 b.s.d.executor [INFO]Prepared bolt transfer:(7)
  13. 2015-05-16T17:57:18.840+0800 b.s.d.executor [INFO]Preparing bolt __acker:(1)
  14. 2015-05-16T17:57:18.841+0800 b.s.d.executor [INFO]Opening spout random:(3)
  15. 2015-05-16T17:57:18.841+0800 b.s.d.executor [INFO]Preparing bolt writer:(9)
  16. 2015-05-16T17:57:18.842+0800 c.i.s.b.WriterBolt[WARN]WriterBolt prepare() method invoked
  17. 2015-05-16T17:57:18.842+0800 b.s.d.executor [INFO]Prepared bolt writer:(9)
  18. 2015-05-16T17:57:18.846+0800 b.s.d.executor [INFO]Prepared bolt __acker:(1)
  19. 2015-05-16T17:57:18.848+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout open() method invoked
  20. 2015-05-16T17:57:18.854+0800 b.s.d.executor [INFO]Opened spout random:(3)
  21. 2015-05-16T17:57:18.867+0800 b.s.d.executor [INFO]Prepared bolt __system:(-1)
  22. 2015-05-16T17:57:18.873+0800 b.s.d.executor [INFO]Activating spout random:(3)
  23. 2015-05-16T17:57:18.873+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout activate() method invoked
  24. 2015-05-16T17:57:18.873+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout nextTuple() method invoked
  25. 2015-05-16T17:57:19.159+0800 b.s.d.executor [INFO]Processing received message source: random:4, stream: default, id:{},[hadoop]
  26. 2015-05-16T17:57:19.160+0800 c.i.s.b.TransferBolt[WARN]TransferBolt execute() method invoked
  27. 2015-05-16T17:57:19.161+0800 b.s.d.task [INFO]Emitting: transfer default [hadoop]
  28. 2015-05-16T17:57:19.162+0800 b.s.d.executor [INFO]Processing received message source: transfer:7, stream: default, id:{},[hadoop]
  29. 2015-05-16T17:57:19.162+0800 c.i.s.b.WriterBolt[WARN]WriterBolt execute() method invoked
时间: 2024-10-11 19:13:47

3. Storm编程框架的相关文章

storm入门(一):storm编程框架与举例

基础 http://os.51cto.com/art/201308/408739.htm 模型 http://www.cnblogs.com/linjiqin/archive/2013/05/28/3104016.html 一.Storm基本概念 在运行一个Storm任务之前,需要了解一些概念: Topologies Streams Spouts Bolts Stream groupings Reliability Tasks Workers Configuration Storm集群和Hado

WDF编程框架

微软的wdk开发包里自带了一些sample,这是些质量不错并且权威的学习资料,最好的学习驱动的方法就是阅读和修改这些代码.其中Ramdisk实现了一个虚拟磁盘,可以作为WDF编程的经典代码材料,<寒江独钓-Windows内核安全编程>第5章"磁盘的虚拟"便以此为例,这篇博客是一篇学习总结. 驱动的入口函数很简洁: NTSTATUS DriverEntry(     IN PDRIVER_OBJECT DriverObject,     IN PUNICODE_STRING 

iOS端JSON转Model链式编程框架SuperKVC使用方法与原理

背景 在client编程中.字典转模型是一个极为常见的问题,苹果提供了KVC来实现NSDictionary到Model的注入,可是KVC仅仅能进行单层浅注入.且无法处理类型转换.key与属性名不正确应.深度注入等问题,笔者从Masonry得到启示,开发了一个通过链式配置注入器实现深度注入.类型转换.key-属性名映射等功能的轻量级注入框架SuperKVC.眼下已经开源到GitHub,点击这里前往.欢迎Star和Fork.欢迎和我一起完好这个框架! 本文将从应用和原理两个角度介绍SuperKVC

Storm集群上的开发 ,Topology任务的编写 之 WordCountTopology数据流分析(storm编程模型)(一张图说明问题)(四)

WordCountTopology数据流分析(storm编程模型) 上一章的example的单词统计在storm的数据流动到底是怎么进行的呢,这一章节开始介绍:

Linux模块编程框架

Linux模块编程框架 Linux是单内核系统,可通用计算平台的外围设备是频繁变化的,不可能将所有的(包括将来即将出现的)设备的驱动程序都一次性编译进内核,为了解决这个问题,Linux提出了可加载内核模块(Loadable Kernel Module,LKM)的概念,允许一个设备驱动通过模块加载的方式,在内核运行起来之后"融入"内核,加载进内核的模块和本身就编译进内核的模块一模一样.一个程序在编译的地址的相对关系就已经确定了,运行的时候只是进行简单的偏移,为了使模块加载进内核后能够被放

跨平台网络通信与server编程框架库(acl库)介绍

一.描写叙述 acl project是一个跨平台(支持LINUX,WIN32,Solaris,MacOS,FreeBSD)的网络通信库及server编程框架,同一时候提供很多其它的有用功能库.通过该库,用户能够很easy地编写支持多种模式(多线程.多进程.非堵塞.触发器.UDP方式)的server程序,WEB 应用程序,数据库应用程序.此外,该库还提供了常见应用的client通信库(如:HTTP.SMTP.ICMP.memcache.beanstalk),常见流式编解码库:XML/JSON/MI

React Native是一套使用 React 构建 Native app 的编程框架

React Native at first sight what is React Native? 跟据官方的描述, React Native是一套使用 React 构建 Native app 的编程框架. 推出不久便引发了广泛关注, 这也得益于 JavaScript 开放而活跃的技术社区和 React Native 完备的技术体系支持. 本文试图概括的介绍 React Native. React Native 主要是一套 Runtime, 包括一套 React.js 库使得开发可以用 Reac

acl 是一个跨平台的网络通信库及服务器编程框架

acl 工程是一个跨平台(支持LINUX,WIN32,Solaris,MacOS,FreeBSD)的网络通信库及服务器编程框架,同时提供更多的实用功能库.通过该库,用户可以非常容易地编写支持多种模式(多线程.多进程.非阻塞.触发器.UDP方式.协程方式)的服务器程序,WEB 应用程序,数据库应用程序.此外,该库还提供了常见应用的客户端通信库(如:HTTP.SMTP.ICMP.redis.memcache.beanstalk.handler socket),常见流式编解码库:XML/JSON/MI

高大上函数响应式编程框架ReactiveCocoa学习笔记1 简介

原创文章,转载请声明出处哈. ReactiveCocoa函数响应式编程 一.简介 ReactiveCocoa(其简称为RAC)是函数响应式编程框架.RAC具有函数式编程和响应式编程的特性.它主要吸取了.Net的 Reactive Extensions的设计和实现. 函数式编程 (Functional Programming) 函数式编程也可以写N篇,它是完全不同于OO的编程模式,这里主要讲一下这个框架使用到的函数式思想. 1) 高阶函数:在函数式编程中,把函数当参数来回传递,而这个,说成术语,我