Storm 第三章 Storm编程案例及Stream Grouping详解

1 功能说明

  设计一个topology,来实现对文档里面的单词出现的频率进行统计。整个topology分为三个部分:

  SentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。

  SplitBolt:负责将单行文本记录(句子)切分成单词

  CountBolt:负责对单词的频率进行累加

2 代码实现

 1 package com.ntjr.bigdata;
 2
 3 import org.apache.storm.Config;
 4 import org.apache.storm.LocalCluster;
 5 import org.apache.storm.StormSubmitter;
 6 import org.apache.storm.generated.AlreadyAliveException;
 7 import org.apache.storm.generated.AuthorizationException;
 8 import org.apache.storm.generated.InvalidTopologyException;
 9 import org.apache.storm.topology.TopologyBuilder;
10 import org.apache.storm.tuple.Fields;
11
12 public class WrodCountTopolog {
13     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
14         //使用TopologyBuilder 构建一个topology
15         TopologyBuilder topologyBuilder = new TopologyBuilder();
16         //发送英文句子
17         topologyBuilder.setSpout("sentenceSpout", new SentenceSpout(), 2);
18         //将一行行的文本切分成单词
19         topologyBuilder.setBolt("splitBolt", new SplitBolt(), 2).shuffleGrouping("sentenceSpout");
20         //将单词的频率进行累加
21         topologyBuilder.setBolt("countBolt", new CountBolt(), 2).fieldsGrouping("splitBolt", new Fields("word"));
22         //启动topology的配置信息
23         Config config = new Config();
24         //定义集群分配多少个工作进程来执行这个topology
25         config.setNumWorkers(3);
26
27         //本地模式提交topology
28         LocalCluster localCluster = new LocalCluster();
29         localCluster.submitTopology("mywordCount", config, topologyBuilder.createTopology());
30
31         //集群模式提交topology
32         StormSubmitter.submitTopologyWithProgressBar("mywordCount", config, topologyBuilder.createTopology());
33
34     }
35
36 }

WrodCountTopolog.java

 1 package com.ntjr.bigdata;
 2
 3 import java.util.Map;
 4
 5 import org.apache.storm.spout.SpoutOutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.OutputFieldsDeclarer;
 8 import org.apache.storm.topology.base.BaseRichSpout;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Values;
11
12 public class SentenceSpout extends BaseRichSpout {
13
14     private static final long serialVersionUID = 1L;
15     // 用来收集Spout输出的tuple
16     private SpoutOutputCollector collector;
17
18     @Override
19     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
20         this.collector = collector;
21
22     }
23
24     // 该方法会循环调用
25     @Override
26     public void nextTuple() {
27         collector.emit(new Values("i am lilei love hanmeimei"));
28     }
29
30     // 消息源可以发送多条消息流,该方法定义输出的消息类型的字段
31     @Override
32     public void declareOutputFields(OutputFieldsDeclarer declarer) {
33         declarer.declare(new Fields("love"));
34
35     }
36
37 }

SentenceSpout.java

 1 package com.ntjr.bigdata;
 2
 3 import java.util.Map;
 4
 5 import org.apache.storm.task.OutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.OutputFieldsDeclarer;
 8 import org.apache.storm.topology.base.BaseRichBolt;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
11 import org.apache.storm.tuple.Values;
12
13 public class SplitBolt extends BaseRichBolt {
14
15     private static final long serialVersionUID = 1L;
16
17     private OutputCollector collector;
18
19     // 该方法只会调用一次用来执行初始化
20     @Override
21     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
22         this.collector = collector;
23
24     }
25
26     // 接收的参数时spout发出来的句子,一个句子就是一个tuple
27     @Override
28     public void execute(Tuple input) {
29         String line = input.getString(0);
30         String[] words = line.split(" ");
31         for (String word : words) {
32             collector.emit(new Values(word, 1));
33         }
34
35     }
36
37     // 定义输出类型,输出类型为单词和单词的数目和collector.emit(new Values(word, 1));对应
38     @Override
39     public void declareOutputFields(OutputFieldsDeclarer declarer) {
40         declarer.declare(new Fields("word", "num"));
41
42     }
43
44 }

SplitBolt.java

 1 package com.ntjr.bigdata;
 2
 3 import java.util.HashMap;
 4 import java.util.Map;
 5
 6 import org.apache.storm.task.OutputCollector;
 7 import org.apache.storm.task.TopologyContext;
 8 import org.apache.storm.topology.OutputFieldsDeclarer;
 9 import org.apache.storm.topology.base.BaseRichBolt;
10 import org.apache.storm.tuple.Tuple;
11
12 public class CountBolt extends BaseRichBolt {
13
14     private static final long serialVersionUID = 1L;
15     private OutputCollector collector;
16     // 用来保存最后的计算结果 key:单词,value:单词的个数
17     Map<String, Integer> map = new HashMap<String, Integer>();
18
19     // 该方法调用一次用来执行初始化
20     @Override
21     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
22         this.collector = collector;
23
24     }
25
26     @Override
27     public void execute(Tuple input) {
28         String word = input.getString(0);
29         Integer num = input.getInteger(1);
30
31         if (map.containsKey(word)) {
32             Integer count = map.get(word);
33             map.put(word, count + num);
34         } else {
35             map.put(word, num);
36         }
37         System.out.println("count:" + map);
38     }
39
40     @Override
41     public void declareOutputFields(OutputFieldsDeclarer declarer) {
42
43     }
44
45 }

CountBolt.java

3 执行流程图

3 Stream Grouping详解

  3.1 Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。

  3.2 Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。

  3.3 All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。

  3.4 Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

  3.5 Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。

  3.6 Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。

            消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。

  3.7 Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。

原文地址:https://www.cnblogs.com/zhaobingqing/p/8341913.html

时间: 2025-01-07 17:24:41

Storm 第三章 Storm编程案例及Stream Grouping详解的相关文章

Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障 在上一篇 Storm系列二: Storm拓扑设计 中我们已经设计了一个稍微复杂一点的拓扑. 而本篇就是在上一篇的基础上再做出一定的调整. 在这里先大概提一下上一篇的业务逻辑, 我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信息, 我们需要做的事情是当接收到消息之后,根据SessionId判断是否属于同一消息, 如果是的话将内容拼接, 如果结束标识为 true, 表示会话已结束,则存

C#函数式编程中的部分应用详解

何谓函数式编程 相信大家在实际的开发中,很多情况下完成一个功能都需要借助多个类,那么我们这里的基本单元就是类.而函数式编程则更加细化,致使我们解决一个功能的基本单元是函数,而不是类,每个功能都是由多个函数构成,并且函数之间没有直接的关系.如果简单的文字描述还不足以让你理解,下面我们就配以图来演示. 如下图所示,图左是我们设计好的三个函数,而右边则是我们需要实现的功能.而我们需要做的就是利用这三个函数去完成对应的三个功能,笔者在这里只是进行简单而又形象的表述,实际的开发过程可能需要更多的函数,并且

“全栈2019”Java多线程第三十章:尝试获取锁tryLock()方法详解

难度 初级 学习时间 10分钟 适合人群 零基础 开发语言 Java 开发环境 JDK v11 IntelliJ IDEA v2018.3 文章原文链接 "全栈2019"Java多线程第三十章:尝试获取锁tryLock()方法详解 下一章 "全栈2019"Java多线程第三十一章:中断正在等待显式锁的线程 学习小组 加入同步学习小组,共同交流与进步. 方式一:关注头条号Gorhaf,私信"Java学习小组". 方式二:关注公众号Gorhaf,回复

Linux内核模块编程与内核模块LICENSE -《详解(第3版)》预读

Linux内核模块简介 Linux内核的整体结构已经非常庞大,而其包含的组件也非常多.我们怎样把需要的部分都包含在内核中呢?一种方法是把所有需要的功能都编译到Linux内核.这会导致两个问题,一是生成的内核会很大,二是如果我们要在现有的内核中新增或删除功能,将不得不重新编译内核. 有没有一种机制使得编译出的内核本身并不需要包含所有功能,而在这些功能需要被使用的时候,其对应的代码被动态地加载到内核中呢?Linux提供了这样的一种机制,这种机制被称为模块(Module).模块具有这样的特点. 模块本

Java多线程编程中Future模式的详解&lt;转&gt;

Java多线程编程中,常用的多线程设计模式包括:Future模式.Master-Worker模式.Guarded Suspeionsion模式.不变模式和生产者-消费者模式等.这篇文章主要讲述Future模式,关于其他多线程设计模式的地址如下:关于其他多线程设计模式的地址如下:关于Master-Worker模式的详解: Java多线程编程中Master-Worker模式的详解关于Guarded Suspeionsion模式的详解: Java多线程编程中Guarded Suspeionsion模式

“全栈2019”Java第七十一章:外部类访问静态内部类成员详解

难度 初级 学习时间 10分钟 适合人群 零基础 开发语言 Java 开发环境 JDK v11 IntelliJ IDEA v2018.3 文章原文链接 "全栈2019"Java第七十一章:外部类访问静态内部类成员详解 下一章 "全栈2019"Java第七十二章:静态内部类访问外部类成员 学习小组 加入同步学习小组,共同交流与进步. 方式一:关注头条号Gorhaf,私信"Java学习小组". 方式二:关注公众号Gorhaf,回复"Jav

“全栈2019”Java第一百零四章:匿名内部类与外部成员互访详解

难度 初级 学习时间 10分钟 适合人群 零基础 开发语言 Java 开发环境 JDK v11 IntelliJ IDEA v2018.3 文章原文链接 "全栈2019"Java第一百零四章:匿名内部类与外部成员互访详解 下一章 "全栈2019"Java第一百零五章:匿名内部类覆盖作用域成员详解 学习小组 加入同步学习小组,共同交流与进步. 方式一:关注头条号Gorhaf,私信"Java学习小组". 方式二:关注公众号Gorhaf,回复"

Linux网络编程——进程池实现过程详解(1)

目录 进程池 父进程的实现流程 子进程的实现流程 进程池 父进程的实现流程 1.定义数据结构pChild,申请子进程数目的结构体空间 2.通过循环,socketpair创建全双工管道,创建子进程,将子进程pid,管道对端,是否忙碌等信息存储 3.socket,bind,listen,对应的端口处于监听状态 netstat 4.epoll_create创建epfd,监控socketFd和所有子进程的管道对端 5.while(1)循环 epoll_wait等待客户端的请求及子进程是否有通知 如果so

Storm入门教程 第三章 Storm安装部署步骤[转]

本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以“注意事项”的形式给出. 3.1 Storm集群组件 Storm集群中包含两类节点:主控节点(Master Node)和工作节点(Work Node).其分别对应的角色如下: 1. 主控节点(Master Node)上运行一个被称为Nimbus的后台程序,它负责在Storm集群内分发代码,分配任务给工作机器,并且负责监控集群运行状态.Nimbus的作用类似