Storm WordCount Topology详解



 1 package org.apache.storm.storm_core;
 2
 3 import java.util.Map;
 4
 5 import backtype.storm.task.OutputCollector;
 6 import backtype.storm.task.TopologyContext;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.topology.base.BaseRichBolt;
 9 import backtype.storm.tuple.Fields;
10 import backtype.storm.tuple.Tuple;
11 import backtype.storm.tuple.Values;
12
13 public class SplitSentenceBolt extends BaseRichBolt {
14     /**
15      *
16      */
17     private static final long serialVersionUID = -2107029392155190729L;
18     private OutputCollector collector;// 用来向其他Spout发射tuple的发射器
19
20     /*
21      * (non-Javadoc) prepare方法类似于open方法,prepare在bolt初始化时被调用
22      */
23     public void prepare(Map stormConf, TopologyContext context,
24             OutputCollector collector) {
25         // TODO Auto-generated method stub
26         this.collector = collector;// 发射器初始化
27
28     }
29
30     public void execute(Tuple input) {
31         // TODO Auto-generated method stub
32         // 接收从SentenceSpout的发射器发射过来的tuple,因为SentenceSpout中声明的tuple字段为sentence,故getStringByField方法的参数为sentence
33         String sentence = input.getStringByField("sentence");// 该tuple是一个包含
34                                                                 // 键为sentence
35                                                                 // 值为字符串
36                                                                 // 的列表List<Map<sentence,String>>
37         String[] words = sentence.split(" ");// 将字符串分解成一个个的单词
38         for (String word : words)
39             this.collector.emit(new Values(word));// 将每个单词构造成tuple并发送给下一个Spout
40     }
41
42     public void declareOutputFields(OutputFieldsDeclarer declarer) {
43         // TODO Auto-generated method stub
44         declarer.declare(new Fields("word"));// 定义SplitSentenceBolt发送的tuple的字段("键值")为 word
45     }
46 }

 1 package org.apache.storm.storm_core;
 2
 3 import java.util.Map;
 4
 5 import backtype.storm.spout.SpoutOutputCollector;
 6 import backtype.storm.task.TopologyContext;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.topology.base.BaseRichSpout;
 9 import backtype.storm.tuple.Fields;
10 import backtype.storm.tuple.Values;
11 import backtype.storm.utils.Utils;
12
13 public class SentenceSpout extends BaseRichSpout {
14     /**
15      *
16      */
17     private static final long serialVersionUID = 3444934973982660864L;
18     private SpoutOutputCollector collector;// 用来向其他Spout发射tuple
19     private String[] sentences = { "my dog has fleas", "i like cold beverages",
20             "the dog ate my homework", "don‘t have a cow man",
21             "i don‘t think i like fleas" };
22
23     private int index = 0;
24
25     /*
26      * open() 方法在所有的Spout组件初始化时被调用
27      *
28      * @param Map conf storm 配置信息
29      *
30      * @context TopologyContext topology 组件信息
31      */
32     public void open(@SuppressWarnings("rawtypes") Map conf,
33             TopologyContext context, SpoutOutputCollector collector) {
34         // TODO Auto-generated method stub
35         this.collector = collector;
36     }
37
38     /*
39      * Values.java extends ArrayList Storm 调用该方法向输出的collector发射tuple
40      */
41     public void nextTuple() {
42         // TODO Auto-generated method stub
43         // 以字符串数组sentences 中的每个字符串 作为参数 构造tuple
44         this.collector.emit(new Values(sentences[index]));// 通过emit方法将构造好的tuple发送出去
45         index++;
46         if (index >= sentences.length) {
47             index = 0;
48         }
49         Utils.sleep(100);
50     }
51
52     /*
53      * SentenceSpout 发送的tuple它是一个包含键值对的List,该方法声明了List中包含的键值对的键为 sentence
54      */
55     public void declareOutputFields(OutputFieldsDeclarer declarer) {
56         // TODO Auto-generated method stub
57         declarer.declare(new Fields("sentence"));// 标记SentenceSpout发送的tuple的键为
58                                                     // sentence
59     }
60 }
 1 package org.apache.storm.storm_core;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.topology.TopologyBuilder;
 6 import backtype.storm.tuple.Fields;
 7 import backtype.storm.utils.Utils;
 8
 9 public class WordCountTopology {
10     private static final String SENTENCE_SPOUT_ID = "sentence-spout";
11     private static final String SPLIT_BOLT_ID = "split-bolt";
12     private static final String COUNT_BOLT_ID = "count-bolt";
13     private static final String REPORT_BOLT_ID = "report-bolt";
14     private static final String TOPOLOGY_NAME = "word-count-topology";
15
16     public static void main(String[] args) throws Exception{
17         SentenceSpout spout = new SentenceSpout();
18         SplitSentenceBolt splitBolt = new SplitSentenceBolt();
19         WordCountBolt countBolt = new WordCountBolt();
20         ReportBolt reportBolt = new ReportBolt();
21
22         TopologyBuilder builder = new TopologyBuilder();
23         builder.setSpout(SENTENCE_SPOUT_ID, spout);
24         builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
25         builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
26         builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
27
28         Config config = new Config();
29         LocalCluster cluster = new LocalCluster();
30
31         cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
32         Utils.sleep(1000);
33         cluster.killTopology(TOPOLOGY_NAME);
34         cluster.shutdown();
35
36     }
37 }
package org.apache.storm.storm_core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class ReportBolt extends BaseRichBolt{
/**
     *
     */
    private static final long serialVersionUID = 4921144902730095910L;
    //    private OutputCollector collector; ReportBolt不需要发射tuple了
    private HashMap<String, Long> counts = null;

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        // TODO Auto-generated method stub
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        this.counts.put(word, count);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        //不需要发出任何数据流
    }

    //Topology在storm集群中运行时,cleanup方法是不可靠的,并不能保证它一定会执行
    public void cleanup(){
        System.out.println("------ print counts ------");
        List<String> keys = new ArrayList<String>();
        keys.addAll(counts.keySet());//将HashMap中所有的键都添加到一个集合中
        Collections.sort(keys);//对键(单词)进行排序
        for(String key : keys)//输出排好序的每个单词的出现次数
            System.out.println(key + " : " + this.counts.get(key));
        System.out.println("--------bye----------");
    }
}
package storm.starter;

import java.util.HashMap;
import java.util.Map;

import storm.starter.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * This topology demonstrates Storm‘s stream groupings and multilang
 * capabilities.
 */
public class WordCountTopology {
    public static class SplitSentence extends BaseBasicBolt {

        public void execute(Tuple input, BasicOutputCollector collector) {
            try {
                String msg = input.getString(0);
                System.out.println(msg + "-------------------");
                if (msg != null) {
                    String[] s = msg.split(" ");
                    for (String string : s) {
                        collector.emit(new Values(string));
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    public static class WordCount extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();

        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null)
                count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
            /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程
            如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了
            一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交
            但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
           */
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf,
                    builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            //指定为本地模式运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());

            Thread.sleep(10000);

            cluster.shutdown();
        }
    }
}
时间: 2024-10-13 07:32:33

Storm WordCount Topology详解的相关文章

Hadoop下面WordCount运行详解

单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的"src/examples"目录下找到.单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数,如下图所示. 现在我们以"hadoop"用户登录"Master.Hadoop"服务器. 1. 创建本地的示例数据文件: 依次进入[Home]-[hadoop]-[ha

Storm 系列(二)—— Storm 核心概念详解

一.Storm核心概念 1.1 Topologies(拓扑) 一个完整的 Storm 流处理程序被称为 Storm topology(拓扑).它是一个是由 Spouts 和 Bolts 通过 Stream 连接起来的有向无环图,Storm 会保持每个提交到集群的 topology 持续地运行,从而处理源源不断的数据流,直到你将主动其杀死 (kill) 为止. 1.2 Streams(流) Stream 是 Storm 中的核心概念.一个 Stream 是一个无界的.以分布式方式并行创建和处理的

Java笔记---Hadoop 2.7.1下WordCount程序详解

一.前言 在之前我们已经在 CenOS6.5 下搭建好了 Hadoop2.x 的开发环境.既然环境已经搭建好了,那么现在我们就应该来干点正事嘛!比如来一个Hadoop世界的HelloWorld,也就是WordCount程序(一个简单的单词计数程序) 二.WordCount 官方案例的运行 2.1 程序简介 WordCount程序是hadoop自带的案例,我们可以在 hadoop 解压目录下找到包含这个程序的 jar 文件(hadoop-mapreduce-examples-2.7.1.jar),

Storm框架使用详解

开篇:实时计算是针对海量数据计算,主要是弥补hadoop等框架只能进行离线批处理的不足.实时计算不一定要精确到秒级,个人理解是相对于离线的一种范称吧.主要应用场景有: 1)  数据源是不断产生的,服务端要不断处理接收的数据,同时回馈给客户端. Storm是基于流的处理框架.以将发送的tuple序列化,进行分发到相应处理端中.数据流在时间和数量上是无限的,这种数据时不断产生的,比如用户的访问历史,点击历史,搜索信息等等. 2)  处理器是循环等待消息的,消息一来即处理数据,进而得出结果.当上传to

Storm文档详解

1.Storm基础概念 1.1.什么是storm? Apache Storm is a free and open source distributed realtime computation system. Storm是免费开源的分布式实时计算系统 实时和离线的区别: 1 离线计算:批量获取数据.批量传输数据.周期性批量计算数据.数据展示 代表技术:Sqoop批量导入数据.HDFS批量存储数据.MapReduce批量计算数据.Hive批量计算数据.***任务调度 2 流式计算:数据实时产生.

Storm 系列(五)—— Storm 编程模型详解

一.简介 下图为 Strom 的运行流程图,在开发 Storm 流处理程序时,我们需要采用内置或自定义实现 spout(数据源) 和 bolt(处理单元),并通过 TopologyBuilder 将它们之间进行关联,形成 Topology. 二.IComponent接口 IComponent 接口定义了 Topology 中所有组件 (spout/bolt) 的公共方法,自定义的 spout 或 bolt 必须直接或间接实现这个接口. public interface IComponent ex

Storm的并行度详解

Storm的并行度是非常重要的,通过提高并行度可以提高storm程序的计算能力. 那strom是如何提高并行度的呢? Strom程序的执行是由多个supervisor共同执行的.supervisor运行的是topology中的spout/bolt task task  是storm中进行计算的最小的运行单位,表示是spout或者bolt的运行实例. 程序执行的最大粒度的运行单位是进程,刚才说的task也是需要有进程来运行它的,在supervisor中,运行task的进程称为worker, Sup

Storm笔记整理(五):可靠性分析、定时任务与Storm UI参数详解

[TOC] 特别说明:前面的四篇Storm笔记中,关于计算总和的例子中的spout,使用了死循环的逻辑,实际上这样做是不正确的,原因很简单,Storm提供给我们的API中,nextTuple方法就是循环执行了,这相当于是做了双层循环.因为后面在做可靠性acker案例分析时发现,加入死循环逻辑后,该nextTuple所属于的那个task根本就没有办法跳出这个nextTuple方法,也就没有办法执行后面的ack或者是fail方法,这点尤其需要注意. Storm可靠性分析 基本原理 worker进程死

Storm 系列(七)—— Storm 集成 Redis 详解

一.简介 Storm-Redis 提供了 Storm 与 Redis 的集成支持,你只需要引入对应的依赖即可使用: <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> <type>jar</type> <