图解Storm

问题导读:
1.你认为什么图形可以显示hadoop与storm的区别?(电梯)
2.本文是如何形象讲解hadoop与storm的?(离线批量处理、实时流式处理)
3.hadoop map/reduce对应storm那两个概念?(spout/bolt)
4.storm流由谁来组成?(Tuples)
5.tuple具体是什么形式?



什么是Storm?
Storm是:

  • 快速且可扩展伸缩
  • 容错
  • 确保消息能够被处理
  • 易于设置和操作
  • 开源的分布式实时计算系统
  • 最初由Nathan Marz开发
  • 使用Java 和 Clojure 编写

区别:
我们知道hadoop是批处理,storm是流式处理,那么是什么是批处理,什么流式处理?
Storm和Hadoop主要区别是实时和批处理的区别:
 
Storm概念组成:Spout和Bolt组成Topology。

Tuple是Storm的数据模型,如[‘jdon‘,12346]
多个Tuple组成事件流:

Spout是读取需要分析处理的数据源,然后转为Tuples,这些数据源可以是Web日志、 API调用、数据库等等。Spout相当于事件流的生产者。
Bolt 处理Tuples然后再创建新的Tuples流,Bolt相当于事件流的消费者。

Bolt 作为真正业务处理者,主要实现大数据处理的核心功能,比如转换数据,应用相应过滤器,计算和聚合数据(比如统计总和等等) 。
以Twitter的某个Tweet为案例,看看Storm如何处理:

这些tweett贴内容是:“No Small Cell Lung #Cancer(没有小细胞肺癌#癌症)” "An #OnCology Consult...."
这些贴被Spout读取以后,产生Tuple,字段名是tweet,内容是"No Small Cell Lung #Cancer",格式类似:[‘No Small Cell Lung #Cancer‘,133221]。
然后进入被流 消费者Bolt进行处理,第一个Bolt是SplitSentence,将tuple内容进行分离,结果成为:一个个单词:"No" "Small" "Cell" "Lung" "#Cancer" ;然后经过第二个Bolt进行过滤HashTagFilter处理,Hash标签是单词中用#标注的,也就是Cancer;再经过HasTagCount计数,可以本地内存缓存这个计数结果,最后通过PrinterBolt打印出标签单词统计结果 。

我们使用Stom所要做的就是编制Spout和Bolt代码:

 1 public class RandomSentenceSpout extends BaseRichSpout {
 2   SpoutOutputCollector collector;
 3   Random random;
 4   //读入外部数据
 5   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 6     this.collector = collector;
 7     random = new Random();
 8   }
 9   //产生Tuple
10    public void nextTuple() {
11     String[] sentences = new String[] {
12       "No Small Cell Lung #Cancer",
13       "An #OnCology Consultant apple a day keeps the doctor away",
14       "four score and seven years ago",
15       "snow white and the seven dwarfs",
16       "i am at two with nature"
17     };
18     String tweet = sentences[random.nextInt(sentences.length)];
19     //定义字段名"tweet" 的值
20     collector.emit(new Values(tweet));
21   }
22   // 定义字段名"tweet"
23   public void declareOutputFields(OutputFieldsDeclarer declarer) {
24     declarer.declare(new Fields("tweet"));
25   }
26   @Override
27   public void ack(Object msgId) {}
28   @Override
29   public void fail(Object msgId) {}
30 }

下面是Bolt的代码编写:

 1 public class SplitSentenceBolt extends BaseRichBolt {
 2   OutputCollector collector;
 3   @Override
 4   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
 5     this.collector = collector;
 6   }
 7   @Override 消费者激活主要方法:分离成单个单词
 8   public void execute(Tuple input) {
 9     for (String s : input.getString(0).split("\\s")) {
10       collector.emit(new Values(s));
11     }
12   }
13   @Override 定义新的字段名
14   public void declareOutputFields(OutputFieldsDeclarer declarer) {
15     declarer.declare(new Fields("word"));
16   }

最后是装配运行Spout和Bolt的客户端调用代码:

 1 public class WordCountTopology {
 2   public static void main(String[] args) throws Exception {
 3     TopologyBuilder builder = new TopologyBuilder();
 4     builder.setSpout("tweet", new RandomSentenceSpout(), 2);
 5     builder.setBolt("split", new SplitSentenceBolt(), 4)
 6       .shuffleGrouping("tweet")
 7       .setNumTasks(8);
 8     builder.setBolt("count", new WordCountBolt(), 6)
 9       .fieldsGrouping("split", new Fields("word"));
10     ..设置多个Bolt
11     Config config = new Config();
12     config.setNumWorkers(4);
13     
14     StormSubmitter.submitTopology("wordcount", config, builder.createTopology());
15 //Local testing
16 //LocalCluster cluster = new LocalCluster();
17 //cluster.submitTopology("wordcount", config, builder.createTopology());
18 //Thread.sleep(10000);
19 //cluster.shutdown();
20 }
21 }

在这个代码中定义了一些参数比如Works的数目是4,其含义在后面详细分析。

下面我们要将上面这段代码发布部署到Storm中,首先了解Storm物理架构图

Nimbus是一个主后台处理器,主要负责:
  1.发布分发代码
  2.分配任务
  3.监控失败。
Supervisor是负责当前这个节点的后台工作处理器的监听。
Work类似Java的线程,采取JDK的Executor 。

下面开始将我们的代码部署到这个网络拓扑中:
将代码Jar包上传到Nimbus的inbox,包括所有的依赖包,然后提交。
Nimbus将保存在本地文件系统,然后开始配置网络拓扑,分配开始拓扑。
见下图:
Nimbus服务器将拓扑Jar 配置和结构下载到 Supervisor,负载平衡ZooKeeper分配某个特定的Supervisor服务器,而Supervisor开始基于配置分配Work,Work调用JDK的Executor启动线程,开始任务处理。
下面是我们代码对拓扑分配的参数示意图:

Executor启动的线程数目是12个,组件的实例是16个,那么如何在实际服务器中分配呢?如下图:
图中RsSpout代表我们的代码中RandomSentenceSpout;SplitSentenceBolt简写为SSbolt。

时间: 2024-10-03 23:15:46

图解Storm的相关文章

Apache Storm 1.1.0 中文文档 | ApacheCN

前言  Apache Storm 是一个免费的,开源的,分布式的实时计算系统. 官方文档: http://storm.apache.org 中文文档: http://storm.apachecn.org ApacheCN 最近组织了翻译 Storm 1.1.0 中文文档 的活动,整体 翻译进度 为 96%. 感谢大家参与到该活动中来 感谢无私奉献的 贡献者,才有了这份 Storm 1.1.0 中文文档 感谢一路有你的陪伴,我们才可以做的更好,走的更快,走的更远,我们一直在努力 ... 网页地址:

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice

storm的可靠性

消息确认机制: 在数据发送的过程中可能会数据丢失导致没能接收到,spout有个超时时间(默认是30S),如果30S过去了还是没有接收到数据,也认为是处理失败. 运行结果都是处理成功 参考代码StormTopologyAcker.java package yehua.storm; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.

《C#图解教程》读书笔记之三:方法

本篇已收录至<C#图解教程>读书笔记目录贴,点击访问该目录可获取更多内容. 一.方法那些事儿 (1)方法的结构:方法头-指定方法的特征,方法体-可执行代码的语句序列: (2)方法的调用:参数.值参数.引用参数.输出参数.参数数组: ①参数: 形参-本地变量,声明在参数列表中:形参的值在代码开始之前被初始化: 实参-实参的值用于初始化形参: ②值参数: 为形参在栈上分配内存,将实参的值复制到形参: ③引用参数: 不为形参在栈上分配内存,形参的参数名作为实参变量的别名指向同一位置,必须使用ref关

《C#图解教程》读书笔记之五:委托和事件

本篇已收录至<C#图解教程>读书笔记目录贴,点击访问该目录可获取更多内容. 一.委托初窥:一个拥有方法的对象 (1)本质:持有一个或多个方法的对象:委托和典型的对象不同,执行委托实际上是执行它所"持有"的方法.如果从C++的角度来理解委托,可以将其理解为一个类型安全的.面向对象的函数指针. (2)如何使用委托? ①声明委托类型(delegate关键字) ②使用该委托类型声明一个委托变量 ③为委托类型增加方法 ④调用委托执行方法 (3)委托的恒定性: 组合委托.为委托+=增加

Storm介绍及核心组件和编程模型

离线计算 离线计算:批量获取数据.批量传输数据.周期性批量计算数据.数据展示 代表技术:Sqoop批量导入数据.HDFS批量存储数据.MapReduce批量计算数据.Hive批量计算数据.azkaban/oozie任务调度 流式计算 流式计算:数据实时产生.数据实时传输.数据实时计算.实时展示 代表技术:Flume实时获取数据.Kafka/metaq实时数据存储.Storm/JStorm实时数据计算.Redis实时结果缓存.持久化存储(mysql). 一句话总结:将源源不断产生的数据实时收集并实

ESXI6.5 最新版尝鲜安装图解

ESXI6.5安装图解

storm单词计数 本地执行

import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.io.FileUtils; import backtype.storm.Config; imp

Storm实验 -- 单词计数4

在上一次单词计数的基础上做如下改动: 使用 自定义  分组策略,将首字母相同的单词发送给同一个task计数 自定义 CustomStreamGrouping package com.zhch.v4; import backtype.storm.generated.GlobalStreamId; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.task.WorkerTopologyContext;