storm之6:API

(一)一个例子

本示例使用storm运行经典的wordcount程序,拓扑如下:

sentence-spout—>split-bolt—>count-bolt—>report-bolt

分别完成句子的产生、拆分出单词、单词数量统计、统计结果输出

完整代码请见 https://github.com/jinhong-lu/stormdemo

以下是关键代码的分析。

1、创建spout

public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private int index = 0;
    private String[] sentences = { "when i was young i'd listen to the radio",
            "waiting for my favorite songs", "when they played i'd sing along",
            "it make me smile",
            "those were such happy times and not so long ago",
            "how i wondered where they'd gone",
            "but they're back again just like a long lost friend",
            "all the songs i love so well", "every shalala every wo'wo",
            "still shines.", "every shing-a-ling-a-ling",
            "that they're starting", "to sing so fine"};

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            //e.printStackTrace();
        }
    }
}

上述类中,将string数组中内容逐行发送出去,主要的方法有:

(1)open()方法完成spout的初始化工作,与bolt的prepare()方法类似

(2)declareOutputFileds()定义了发送内容的字段名称与字段数量,bolt中的方法名称一样。

(3)nextTuple()方法是对每一个需要处理的数据均会执行的操作,也bolt的executor()方法类似。它是整个逻辑处理的核心,通过emit()方法将数据发送到拓扑中的下一个节点。

2、创建split-bolt

public class SplitSentenceBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    public void execute(Tuple input) {
        String sentence = input.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            this.collector.emit(new Values(word));
            //System.out.println(word);
        }
    }
}

三个方法的含义与spout类似,这个类根据空格把收到的句子进行拆分,拆成一个一个的单词,然后把单词逐个发送出去。

input.getStringByField("sentence”)可以根据上一节点发送的关键字获取到相应的内容。

3、创建wordcount-bolt

public class WordCountBolt extends BaseRichBolt{
    private OutputCollector collector;
    private Map<String,Long> counts = null;

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }

    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Long count = this.counts.get(word);
        if(count == null){
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        this.collector.emit(new Values(word,count));
        //System.out.println(count);
    }
}

本类将接收到的word进行数量统计,并把结果发送出去。

这个bolt发送了2个filed:

declarer.declare(new Fields("word","count"));

this.collector.emit(new Values(word,count));

4、创建report-bolt

public class ReportBolt extends BaseRichBolt{
    private Map<String, Long> counts;

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.counts = new HashMap<String,Long>();
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        counts.put(word, count);
    }

    public void cleanup() {
        System.out.println("Final output");
        Iterator<Entry<String, Long>> iter = counts.entrySet().iterator();
        while (iter.hasNext()) {
            Entry<String, Long> entry = iter.next();
            String word = (String) entry.getKey();
            Long count = (Long) entry.getValue();
            System.out.println(word + " : " + count);
        }

        super.cleanup();
    }    

}

本类将从wordcount-bolt接收到的数据进行输出。

先将结果放到一个map中,当topo被关闭时,会调用cleanup()方法,此时将map中的内容输出。

5、创建topo

public class WordCountTopology {
    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) {
        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout);
        builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(
                SENTENCE_SPOUT_ID);
        builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID,
                new Fields("word"));
        builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(
                COUNT_BOLT_ID);

        Config conf = new Config();

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();

            cluster.submitTopology(TOPOLOGY_NAME, conf,
                    builder.createTopology());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
            }
            cluster.killTopology(TOPOLOGY_NAME);
            cluster.shutdown();
        } else {
            try {
                StormSubmitter.submitTopology(args[0], conf,builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }

        }
    }
}

关键步骤为:

(1)创建TopologyBuilder,并为这个builder指定spout与bolt

builder.setSpout(SENTENCE_SPOUT_ID, spout);

builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(

SENTENCE_SPOUT_ID);

builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID,

new Fields("word"));

builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(

COUNT_BOLT_ID);

(2)创建conf对象

Config conf = new Config();

这个对象用于指定一些与拓扑相关的属性,如并行度、nimbus地址等。

(3)创建并运行拓扑,这里使用了2种方式

一是当没有参数时,建立一个localcluster,在本地上直接运行,运行10秒后,关闭集群:

LocalCluster cluster = new LocalCluster();

cluster.submitTopology(TOPOLOGY_NAME, conf,builder.createTopology());

Thread.sleep(10000);

cluster.killTopology(TOPOLOGY_NAME);

cluster.shutdown();

二是有参数是,将拓扑提交到集群中:

StormSubmitter.submitTopology(args[0], conf,builder.createTopology());

第一个参数为拓扑的名称。

6、本地运行

直接在eclipse中运行即可,输出结果在console中看到

7、集群运行

(1)编译并打包

mvn clean compile

(2)把编译好的jar包上传到nimbus机器上,然后

storm jar com.ljh.storm.5_stormdemo  com.ljh.storm.wordcount.WordCountTopology  topology_name

将拓扑提交到集群中。

时间: 2024-10-13 08:41:58

storm之6:API的相关文章

Storm批处理事务API详解

看此博文前,建议先查看 Storm批处理事务原理详解 为什么要进行批处理(Batch)? 逐个处理单个tuple,增加很多开销,如写库.输出结果频率过高 事务处理单个tuple效率比较低,因此storm中引入batch处理 批处理是一次性处理一批(batch)tuple,而事务则确保该批次要么全部处理成功,如果有处理失败的则全部不计,Storm会对失败的批次重新发送,且确保每个batch被且仅被处理一次 Spout有三种: 分别为: 1. ITransactionalSpout<T>,同Bas

【转】Twitter Storm如何保证消息不丢失

Twitter Storm如何保证消息不丢失 发表于 2011 年 09 月 30 日 由 xumingming 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: http://xumingming.sinaapp.com/127/twitter-storm如何保证消息不丢失/ 本文翻译自: https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing s

Twitter Storm如何保证消息不丢失

转自:http://xumingming.sinaapp.com/127/twitter-storm如何保证消息不丢失/ storm保证从spout发出的每个tuple都会被完全处理.这篇文章介绍storm是怎么做到这个保证的,以及我们使用者怎么做才能充分利用storm的可靠性特点. 一个tuple被”完全处理”是什么意思? 就如同蝴蝶效应一样,从spout发射的一个tuple可以引起其它成千上万个tuple因它而产生, 想想那个计算一篇文章中每个单词出现次数的topology. 帮助 1 2

【转】Twitter Storm: 在生产集群上运行topology

Twitter Storm: 在生产集群上运行topology 发表于 2011 年 10 月 07 日 由 xumingming 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: http://xumingming.sinaapp.com/185/twitter-storm-在生产集群上运行topology/ 本文翻译自: https://github.com/nathanmarz/storm/wiki/Running-topologi

Strom 消息处理机制 中英对照翻译 (Storm如何保证消息被完全处理)

官方链接: http://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html What does it mean for a message to be “fully processed”? A tuple coming off a spout can trigger thousands of tuples to be created based on it. Consider, for ex

apache Storm学习之三-消息可靠性

4.1 简介 storm可以确保spout发送出来的每个消息都会被完整的处理.本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理. 4.2 理解消息被完整处理 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22

storm配置

配置 Storm 有大量配置项用于调整 nimbus.supervisors 和拓扑的行为.有些配置项是系统级的配置项,在拓扑中不能修改,另外一些配置项则是可以在拓扑中修改的. 每一个配置项都在 Storm 代码库的 defaults.yaml 中有一个默认值.可以通过在 Nimbus 和 Supervisors 的环境变量中定义一个 storm.yaml 来覆盖默认值.最后,在使用 StormSubmitter 提交拓扑时也可以定义基于具体拓扑的配置项.但是,基于拓扑的配置项仅仅能够覆盖那些以

storm 原理简介及单机版安装指南(转)

本文翻译自: https://github.com/nathanmarz/storm/wiki/Tutorial Storm是一个分布式的.高容错的实时计算系统. Storm对于实时计算的的意义相当于Hadoop对于批处理的意义.Hadoop为我们提供了Map和Reduce原语,使我们对数据进行批处理变的非常的简单和优美.同样,Storm也对数据的实时计算提供了简单Spout和Bolt原语. Storm适用的场景: 1.流数据处理:Storm可以用来用来处理源源不断的消息,并将处理之后的结果保存

Storm 官方文档翻译 --- 消息的可靠性保障

消息的可靠性保障 Storm 能够保证每一个由 Spout 发送的消息都能够得到完整地处理.本文详细解释了 Storm 如何实现这种保障机制,以及作为用户如何使用好 Storm 的可靠性机制. 消息的“完整性处理”是什么意思 一个从 spout 中发送出的 tuple 会产生上千个基于它创建的 tuples.例如,有这样一个 word-count 拓扑: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sent

Storm buffer设置

引自:http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/ When you are optimizing the performance of your Storm topologies it helps to understand how Storm’s internal message queues are configured and put to use. In