Storm详解二、写第一个Storm应用

在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm。

Storm运行模式:

  1. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试。
  2. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。

写一个HelloWord Storm

     我们现在创建这么一个应用,统计文本文件中的单词个数,详细学习过Hadoop的朋友都应该写过。那么我们需要具体创建这样一个Topology,用一个spout负责读取文本文件,用第一个bolt来解析成单词,用第二个bolt来对解析出的单词计数,整体结构如图所示:

可以从这里下载源码:https://github.com/storm-book/examples-ch02-getting_started/zipball/master

     写一个可运行的Demo很简单,我们只需要三步:

  1. 创建一个Spout读取数据
  2. 创建bolt处理数据
  3. 创建一个Topology提交到集群

下面我们就写一下,以下代码拷贝到eclipse(依赖的jar包到官网下载即可)即可运行。

1.创建一个Spout作为数据源

Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。

[java] view plaincopy

  1. package storm.demo.spout;
  2. import java.io.BufferedReader;
  3. import java.io.FileNotFoundException;
  4. import java.io.FileReader;
  5. import java.util.Map;
  6. import backtype.storm.spout.SpoutOutputCollector;
  7. import backtype.storm.task.TopologyContext;
  8. import backtype.storm.topology.IRichSpout;
  9. import backtype.storm.topology.OutputFieldsDeclarer;
  10. import backtype.storm.tuple.Fields;
  11. import backtype.storm.tuple.Values;
  12. public class WordReader implements IRichSpout {
  13. private static final long serialVersionUID = 1L;
  14. private SpoutOutputCollector collector;
  15. private FileReader fileReader;
  16. private boolean completed = false;
  17. public boolean isDistributed() {
  18. return false;
  19. }
  20. /**
  21. * 这是第一个方法,里面接收了三个参数,第一个是创建Topology时的配置,
  22. * 第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt
  23. * **/
  24. @Override
  25. public void open(Map conf, TopologyContext context,
  26. SpoutOutputCollector collector) {
  27. try {
  28. //获取创建Topology时指定的要读取的文件路径
  29. this.fileReader = new FileReader(conf.get("wordsFile").toString());
  30. } catch (FileNotFoundException e) {
  31. throw new RuntimeException("Error reading file ["
  32. + conf.get("wordFile") + "]");
  33. }
  34. //初始化发射器
  35. this.collector = collector;
  36. }
  37. /**
  38. * 这是Spout最主要的方法,在这里我们读取文本文件,并把它的每一行发射出去(给bolt)
  39. * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下
  40. * **/
  41. @Override
  42. public void nextTuple() {
  43. if (completed) {
  44. try {
  45. Thread.sleep(1000);
  46. } catch (InterruptedException e) {
  47. // Do nothing
  48. }
  49. return;
  50. }
  51. String str;
  52. // Open the reader
  53. BufferedReader reader = new BufferedReader(fileReader);
  54. try {
  55. // Read all lines
  56. while ((str = reader.readLine()) != null) {
  57. /**
  58. * 发射每一行,Values是一个ArrayList的实现
  59. */
  60. this.collector.emit(new Values(str), str);
  61. }
  62. } catch (Exception e) {
  63. throw new RuntimeException("Error reading tuple", e);
  64. } finally {
  65. completed = true;
  66. }
  67. }
  68. @Override
  69. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  70. declarer.declare(new Fields("line"));
  71. }
  72. @Override
  73. public void close() {
  74. // TODO Auto-generated method stub
  75. }
  76. @Override
  77. public void activate() {
  78. // TODO Auto-generated method stub
  79. }
  80. @Override
  81. public void deactivate() {
  82. // TODO Auto-generated method stub
  83. }
  84. @Override
  85. public void ack(Object msgId) {
  86. System.out.println("OK:" + msgId);
  87. }
  88. @Override
  89. public void fail(Object msgId) {
  90. System.out.println("FAIL:" + msgId);
  91. }
  92. @Override
  93. public Map<String, Object> getComponentConfiguration() {
  94. // TODO Auto-generated method stub
  95. return null;
  96. }
  97. }

2.创建两个bolt来处理Spout发射出的数据

Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。

Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用。

第一个bolt:WordNormalizer

[java] view plaincopy

  1. package storm.demo.bolt;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Map;
  5. import backtype.storm.task.OutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.IRichBolt;
  8. import backtype.storm.topology.OutputFieldsDeclarer;
  9. import backtype.storm.tuple.Fields;
  10. import backtype.storm.tuple.Tuple;
  11. import backtype.storm.tuple.Values;
  12. public class WordNormalizer implements IRichBolt {
  13. private OutputCollector collector;
  14. @Override
  15. public void prepare(Map stormConf, TopologyContext context,
  16. OutputCollector collector) {
  17. this.collector = collector;
  18. }
  19. /**这是bolt中最重要的方法,每当接收到一个tuple时,此方法便被调用
  20. * 这个方法的作用就是把文本文件中的每一行切分成一个个单词,并把这些单词发射出去(给下一个bolt处理)
  21. * **/
  22. @Override
  23. public void execute(Tuple input) {
  24. String sentence = input.getString(0);
  25. String[] words = sentence.split(" ");
  26. for (String word : words) {
  27. word = word.trim();
  28. if (!word.isEmpty()) {
  29. word = word.toLowerCase();
  30. // Emit the word
  31. List a = new ArrayList();
  32. a.add(input);
  33. collector.emit(a, new Values(word));
  34. }
  35. }
  36. //确认成功处理一个tuple
  37. collector.ack(input);
  38. }
  39. @Override
  40. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  41. declarer.declare(new Fields("word"));
  42. }
  43. @Override
  44. public void cleanup() {
  45. // TODO Auto-generated method stub
  46. }
  47. @Override
  48. public Map<String, Object> getComponentConfiguration() {
  49. // TODO Auto-generated method stub
  50. return null;
  51. }
  52. }

第二个bolt:WordCounter

[java] view plaincopy

  1. package storm.demo.bolt;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import backtype.storm.task.OutputCollector;
  5. import backtype.storm.task.TopologyContext;
  6. import backtype.storm.topology.IRichBolt;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.tuple.Tuple;
  9. public class WordCounter implements IRichBolt {
  10. Integer id;
  11. String name;
  12. Map<String, Integer> counters;
  13. private OutputCollector collector;
  14. @Override
  15. public void prepare(Map stormConf, TopologyContext context,
  16. OutputCollector collector) {
  17. this.counters = new HashMap<String, Integer>();
  18. this.collector = collector;
  19. this.name = context.getThisComponentId();
  20. this.id = context.getThisTaskId();
  21. }
  22. @Override
  23. public void execute(Tuple input) {
  24. String str = input.getString(0);
  25. if (!counters.containsKey(str)) {
  26. counters.put(str, 1);
  27. } else {
  28. Integer c = counters.get(str) + 1;
  29. counters.put(str, c);
  30. }
  31. // 确认成功处理一个tuple
  32. collector.ack(input);
  33. }
  34. /**
  35. * Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里
  36. * 因为这只是个Demo,我们用它来打印我们的计数器
  37. * */
  38. @Override
  39. public void cleanup() {
  40. System.out.println("-- Word Counter [" + name + "-" + id + "] --");
  41. for (Map.Entry<String, Integer> entry : counters.entrySet()) {
  42. System.out.println(entry.getKey() + ": " + entry.getValue());
  43. }
  44. counters.clear();
  45. }
  46. @Override
  47. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  48. // TODO Auto-generated method stub
  49. }
  50. @Override
  51. public Map<String, Object> getComponentConfiguration() {
  52. // TODO Auto-generated method stub
  53. return null;
  54. }
  55. }

3.在main函数中创建一个Topology

在这里我们要创建一个Topology和一个LocalCluster对象,还有一个Config对象做一些配置。

[java] view plaincopy

  1. package storm.demo;
  2. import storm.demo.bolt.WordCounter;
  3. import storm.demo.bolt.WordNormalizer;
  4. import storm.demo.spout.WordReader;
  5. import backtype.storm.Config;
  6. import backtype.storm.LocalCluster;
  7. import backtype.storm.topology.TopologyBuilder;
  8. import backtype.storm.tuple.Fields;
  9. public class WordCountTopologyMain {
  10. public static void main(String[] args) throws InterruptedException {
  11. //定义一个Topology
  12. TopologyBuilder builder = new TopologyBuilder();
  13. builder.setSpout("word-reader",new WordReader());
  14. builder.setBolt("word-normalizer", new WordNormalizer())
  15. .shuffleGrouping("word-reader");
  16. builder.setBolt("word-counter", new WordCounter(),2)
  17. .fieldsGrouping("word-normalizer", new Fields("word"));
  18. //配置
  19. Config conf = new Config();
  20. conf.put("wordsFile", "d:/text.txt");
  21. conf.setDebug(false);
  22. //提交Topology
  23. conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
  24. //创建一个本地模式cluster
  25. LocalCluster cluster = new LocalCluster();
  26. cluster.submitTopology("Getting-Started-Toplogie", conf,
  27. builder.createTopology());
  28. Thread.sleep(1000);
  29. cluster.shutdown();
  30. }
  31. }

运行这个函数我们即可看到后台打印出来的单词个数。

(ps:因为是Local模式,运行开始可能会打印很多错误log,这个先不用管)

时间: 2024-12-15 11:44:11

Storm详解二、写第一个Storm应用的相关文章

Storm详解一、Storm 概述

一.Storm概述 Storm是一个分布式的.可靠的.零失误的流式数据处理系统.它的工作就是委派各种组件分别独立的处理一些简单任务.在Storm集群中处理输入流的是Spout组件,而Spout又把读取的数据传递给叫Bolt的组件.Bolt组件会对收到的数据元组进行处理,也有可能传递给下一个Bolt.我们可以把Storm集群想象成一个由bolt组件组成的链条集合,数据在这些链条上传输,而bolt作为链条上的节点来对数据进行处理. Storm和Hadoop集群表面看上去很类似,但是Hadoop上面运

LinearLayout详解二:从其父类View说起

这个View类说来就话长了,但我们又不得不说,要说呢,就得说的彻底,要让大家看得一清二楚,明明白白.所以我们就从源代码角度来看一个view是如何被加载的吧. 如果大家不知道怎么下载android的源代码,或者说懒得去下载(因为源代码确实比较大,大概有10G)的话,教大家几个取巧的办法: 1.直接在google中输入"android view.java"即可.这种方法成功率非常高,一般android的比较重要的类都能搜到. 2.给大家提供一个人家用于放源码的的git:[email pro

cocos2dx 启动过程详解二:内存管理和回调

在上一篇的第二部分中,我们有一句代码待解释的: // Draw the Scene void CCDirector::drawScene(void) { -- //tick before glClear: issue #533 if (! m_bPaused) //暂停 { m_pScheduler->update(m_fDeltaTime);   //待会会解释这里的内容 } -- } 这里是一个update函数,经常会写像this->schedule(schedule_selector(X

PopUpWindow使用详解(二)——进阶及答疑

相关文章:1.<PopUpWindow使用详解(一)——基本使用>2.<PopUpWindow使用详解(二)——进阶及答疑> 上篇为大家基本讲述了有关PopupWindow的基本使用,但还有几个相关函数还没有讲述,我们这篇将着重看看这几个函数的用法并结合源码来讲讲具体原因,最后是有关PopupWindow在使用时的疑问,给大家讲解一下. 一.常用函数讲解 这段将会给大家讲下下面几个函数的意义及用法,使用上篇那个带背景的例子为基础. [java] view plain copy pu

HTTPS详解二:SSL / TLS 工作原理和详细握手过程

HTTPS 详解一:附带最精美详尽的 HTTPS 原理图 HTTPS详解二:SSL / TLS 工作原理和详细握手过程 在上篇文章HTTPS详解一中,我已经为大家介绍了 HTTPS 的详细原理和通信流程,但总感觉少了点什么,应该是少了对安全层的针对性介绍,那么这篇文章就算是对HTTPS 详解一的补充吧.还记得这张图吧. HTTPS 和 HTTP的区别 显然,HTTPS 相比 HTTP最大的不同就是多了一层 SSL (Secure Sockets Layer 安全套接层)或 TLS (Transp

UINavigationController详解二(转)页面切换和SegmentedController

原文出自:http://blog.csdn.net/totogo2010/article/details/7682433,非常感谢. 1.RootView 跳到SecondView 首先我们需要新一个View.新建SecondView,按住Command键然后按N,弹出新建页面,我们新建SecondView 2.为Button 添加点击事件,实现跳转 在RootViewController.xib中和RootViewController.h文件建立连接 在RootViewController.m

Android 布局学习之——Layout(布局)详解二(常见布局和布局参数)

[Android布局学习系列]   1.Android 布局学习之——Layout(布局)详解一   2.Android 布局学习之——Layout(布局)详解二(常见布局和布局参数)   3.Android 布局学习之——LinearLayout的layout_weight属性   4.Android 布局学习之——LinearLayout属性baselineAligned的作用及baseline    Layout Parameters(布局参数): 在XML文件中,我们经常看到类似与lay

CSS3中的弹性流体盒模型技术详解(二)

在上一篇文章<CSS3中的弹性流体盒模型技术详解(一)>里,我给大家列出了,从css1到css3各版本中盒子模型的基本元素.本篇我会把余下的属性进行详细讲解. box-pack 作用:用来规定子元素在盒子内的水平空间分配方式 box-pack 语法:box-pack: start | end | center | justify; start 对于正常方向的框,首个子元素的左边缘吸附在盒子的左边框显示 对于相反方向的框,最后子元素的右边缘吸附在盒子的右边框显示 end 对于正常方向的框,最后子

php学习之道:WSDL详解(二)

3.定义服务使用的逻辑消息 当服务的操作被调用时,服务被定义为消息交换.在wsdl文档中,这些消息被定义message元素.这些消息由称之为part元素的部分组成. 一个服务的操作,通过指定逻辑消息的方式来定义.当操作被调用时,逻辑消息被交换.(也就是说,逻辑消息代表了服务的操作)这些逻辑消息,将在网络上传输的数据定义为xml文档.他包含了所有的参数,这些参数是方法调用的一部分.(也就是说,逻辑消息里的参数,是操作对应方法的参数集合) 消息和参数列表:每一个被服务暴露的操作能且仅能有一个输入消息