用实例理解Storm的Stream概念

原文首发在个人博客:http://zqhxuyuan.github.io/2016/06/30/Hello-Storm/

如需转载,请注明出处,谢谢!

缘起

事情源于在看基于Storm的CEP引擎:flowmix

FlowmixBuilder代码,

每个Bolt设置了这么多的Group

而且declareStream也声明了这么多的stream-id,

对于只写过WordCountTopology的小白而言,

直接懵逼了,没见过这么用的啊,我承认一开始是拒绝的,每个Bolt都设置了这么多Group,这TMD拓扑图是什么样的?

  public TopologyBuilder create() {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(EVENT, (IRichSpout) eventsComponent, eventLoaderParallelism == -1 ? parallelismHint : eventLoaderParallelism);
    builder.setSpout(FLOW_LOADER_STREAM, (IRichSpout) flowLoaderSpout, 1);
    builder.setSpout("tick", new TickSpout(1000), 1);
    builder.setBolt(INITIALIZER, new FlowInitializerBolt(), parallelismHint)  // kicks off a flow determining where to start
              .localOrShuffleGrouping(EVENT)
              .allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM);

    declarebolt(builder, FILTER, new FilterBolt(), parallelismHint, true);
    declarebolt(builder, SELECT, new SelectorBolt(), parallelismHint, true);
    declarebolt(builder, PARTITION, new PartitionBolt(), parallelismHint, true);
    declarebolt(builder, SWITCH, new SwitchBolt(), parallelismHint, true);
    declarebolt(builder, AGGREGATE, new AggregatorBolt(), parallelismHint, true);
    declarebolt(builder, JOIN, new JoinBolt(), parallelismHint, true);
    declarebolt(builder, EACH, new EachBolt(), parallelismHint, true);
    declarebolt(builder, SORT, new SortBolt(), parallelismHint, true);
    declarebolt(builder, SPLIT, new SplitBolt(), parallelismHint, true);
    declarebolt(builder, OUTPUT, outputBolt, parallelismHint, false);

    return builder;
  }
  private static void declarebolt(TopologyBuilder builder, String boltName, IRichBolt bolt, int parallelism, boolean control) {
      BoltDeclarer declarer = builder.setBolt(boltName, bolt, parallelism)
          .allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
          .allGrouping("tick", "tick")
          .localOrShuffleGrouping(INITIALIZER, boltName)
          .localOrShuffleGrouping(FILTER, boltName)
          .fieldsGrouping(PARTITION, boltName, new Fields(FLOW_ID, PARTITION))    // guaranteed partitions will always group the same flow for flows that have joins with default partitions.
          .localOrShuffleGrouping(AGGREGATE, boltName)
          .localOrShuffleGrouping(SELECT, boltName)
          .localOrShuffleGrouping(EACH, boltName)
          .localOrShuffleGrouping(SORT, boltName)
          .localOrShuffleGrouping(SWITCH, boltName)
          .localOrShuffleGrouping(SPLIT, boltName)
          .localOrShuffleGrouping(JOIN, boltName);
    }
  public static void declareOutputStreams(OutputFieldsDeclarer declarer, Fields fields) {
      declarer.declareStream(PARTITION, fields);
      declarer.declareStream(FILTER, fields);
      declarer.declareStream(SELECT, fields);
      declarer.declareStream(AGGREGATE, fields);
      declarer.declareStream(SWITCH, fields);
      declarer.declareStream(SORT, fields);
      declarer.declareStream(JOIN, fields);
      declarer.declareStream(SPLIT, fields);
      declarer.declareStream(EACH, fields);
      declarer.declareStream(OUTPUT, fields);
  }

先来复习下经典的WordCountTopology

WordCountTopology Default Stream

public class WordCountTopologySimple {

    public static class RandomSentenceSpout extends BaseRichSpout {
        SpoutOutputCollector collector;
        Random rand;
        String[] sentences = null;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            rand = new Random();
            sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        }

        @Override
        public void nextTuple() {
            Utils.sleep(1000);
            String sentence = sentences[rand.nextInt(sentences.length)];
            System.out.println("\n" + sentence);
            this.collector.emit(new Values(sentence));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
        public void ack(Object id) {}
        public void fail(Object id) {}
    }

    public static class SplitSentenceBolt extends BaseRichBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for (String word : words) {
                this.collector.emit(new Values(word));
            }
            this.collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    public static class WordCountBolt extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    public static class PrinterBolt extends BaseBasicBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String first = tuple.getString(0);
            int second = tuple.getInteger(1);
            System.out.println(first + "," + second);
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {}
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count");

        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }
}

SingleStream

默认情况下:Spout发送到下游Bolt的stream-id,以及Bolt发送到下游Bolt或者接收上游Spout/Bolt的stream-id都是default

可以对Spout/Bolt在发送消息时自定义stream-id,同时必须在声明输出字段时,指定对应的stream-id。

代码说明:发射时指定一个**stream-id,声明流时指定一个**stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id

class RandomSentenceSpout {
    public void nextTuple() {
        Utils.sleep(1000);
        String sentence = sentences[rand.nextInt(sentences.length)];
        System.out.println("\n" + sentence);
        this.collector.emit("split-stream", new Values(sentence));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            this.collector.emit("count-stream", new Values(word));
        }
        this.collector.ack(tuple);
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("count-stream", new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) count = 0;
        count++;
        counts.put(word, count);
        collector.emit("print-stream", new Values(word, count));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("print-stream", new Fields("word", "count"));
    }
}
class Topology {
    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");
    }
}

使用自定义stream-id,主要分成两个步骤:

下图示例细说明了拓扑图中各个组件是怎么协调工作的:

MultiStream

Spout/Bolt发射时可以指定多个stream-id,同样要在声明输出字段时指定所有在发射过程指定的stream-id。

虽然每条消息的输出消息流并不一定会用到所有的stream,比如下面示例中一条消息发射到stream1和stream3,

另外一条消息发射到stream2和stream3,stream1和stream2是互斥的,不可能同时发送到这两个stream。

但是可以看到在declareStream中,要同时指定所有的stream-id。

    public void execute(Tuple input) {
        String word = input.getString(0);
        //小于j的word发送给stream1; 大于j的word发送给stream2;
        if(word.compareTo("j") < 0){
            collector.emit("stream1", new Values(word));
        }else if(word.compareTo("j") > 0){
            collector.emit("stream2", new Values(word));
        }
        //不管什么都发送给stream3
        collector.emit("stream3", new Values(word));
    }
    public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("stream1", new Fields("word"));
        outputFieldsDeclarer.declareStream("stream2", new Fields("word"));
        outputFieldsDeclarer.declareStream("stream3", new Fields("word"));
    }

程序员都喜欢流程图,喏,下图左上角第一个就是了,右上角是对应到Storm中的Topology,下面两图示例了两条消息在Storm的消息流的走向。

仿照上面的示例,对WordCountTopology的Spout/Bolt的发射方法都指定一个输出的stream-id,

同时在declareOutputFields声明多个输出的stream-id。

现在虽然Spout/Bolt声明了多个输出stream-id,但是emit时还是只发射到一个stream-id中。

所以本质上和前面的SingleStream是一样的,所以Topology不需要做任何改动也还是可以运行的。

代码说明:发射时指定一个**stream-id,声明流时指定多个**stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id

emit不变,topology不变

class RandomSentenceSpout {
    public void nextTuple() {
        this.collector.emit("split-stream", new Values(sentence));              //?
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("sentence"));         //?
        declarer.declareStream("count-stream", new Fields("sentence"));
        declarer.declareStream("print-stream", new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        for (String word : words) {
            this.collector.emit("count-stream", new Values(word));              //?
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("word"));
        declarer.declareStream("count-stream", new Fields("word"));             //?
        declarer.declareStream("print-stream", new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        collector.emit("print-stream", new Values(word, count));                //?
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("word", "count"));
        declarer.declareStream("count-stream", new Fields("word", "count"));
        declarer.declareStream("print-stream", new Fields("word", "count"));    //?
    }
}
class Topology {
    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");
    }
}

那么我们为什么还要在Spout/Bolt中定义多个输出流呢?观察这部分代码,stream-id都是一样的,不同的是Fields部分,

如果将每个Spout/Bolt的多个declarer.declareStream抽取出来:

    public static void declareStream(OutputFieldsDeclarer declarer,
            Fields fields){
        declarer.declareStream("split-stream", fields);
        declarer.declareStream("count-stream", fields);
        declarer.declareStream("print-stream", fields);
    }

然后在Spout/Bolt的declareOutputFields调用declareStream方法一次声明所有的stream-id,只需要传递不同的Fields即可。

代码说明:声明多个stream时,每个组件的所有stream-id都一样,传入不同的Fields

emit不变,topology不变

class RandomSentenceSpout {
    public void nextTuple() {
        this.collector.emit("split-stream", new Values(sentence));              //?
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declareStream(declarer, new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        for (String word : words) {
            this.collector.emit("count-stream", new Values(word));              //?
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declareStream(declarer, new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        collector.emit("print-stream", new Values(word, count));                //?
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declareStream(declarer, new Fields("word", "count"));
    }
}
class Topology {
    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2)
            .shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2)
            .fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1)
            .shuffleGrouping("count", "print-stream");
    }
}

这样的好处是,如果事先知道所有的stream-id,只需要定义好declareStream,每个bolt都调用这个全局的方法即可。

实际上这种方式对于构建动态拓扑图是很有用的。

MultiGroup

通过把所有stream-id封装到一个方法中,而emit时只指定一个stream-id。

现在每个组件emit时只指定了一个stream-id,声明输出流时都指定了相同的stream-id集合。

也就是说Spout/Bolt中虽然声明了多个stream-id,但是一条消息只会选择一个stream-id。

那么可不可以对Group方式运用同样的方式呢,我们的目的是想要把setBolt这种逻辑也抽取出一个共同的方法。

下面这种方式肯定是不对的,首先无法抽取,因为每个Bolt的Group分组策略不同。

虽然是错误的,但是我们并没有对首尾组件用多个Group,这是为什么呢?

1.Spout没有所谓的分组,因为Spout就是源头,分组时指定component指的是当前component的数据源自这个指定的component

2.最后一个Bolt我们先不设置,这里有坑…

    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2)
                .shuffleGrouping("spout", "split-stream")                      //?
                .shuffleGrouping("split", "split-stream")
                .shuffleGrouping("count", "split-stream")
        ;
        builder.setBolt("count", new WordCountBolt(), 2)
                .fieldsGrouping("spout", "count-stream", new Fields("word"))
                .fieldsGrouping("split", "count-stream", new Fields("word"))   //?
                .fieldsGrouping("count", "count-stream", new Fields("word"))
        ;
        builder.setBolt("print", new PrinterBolt(), 1)
                .shuffleGrouping("count", "print-stream");
    }       

而且也无法构建拓扑图,比如WordCountBolt的输入component=”spout”时,

在拓扑图中这个组件是RandomSentenceSpout,它的输出字段名称为”sentence”,根本就没有word这个字段。

下面的错误也证实了这一点:Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})

count这个组件(即WordCountBolt)订阅了spout组件(即RandomSentenceSpout)的count-stream输出流,但是spout组件并不存在word字段。

6972 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name=‘word-count‘) #
<InvalidTopologyException InvalidTopologyException(msg:
    Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})>
7002 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

正确使用多个stream-id的姿势:

    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2)
                .shuffleGrouping("spout", "split-stream")                      //?
                .fieldsGrouping("split", "split-stream", new Fields("word"))
                .shuffleGrouping("count", "split-stream")
        ;
        builder.setBolt("count", new WordCountBolt(), 2)
                .shuffleGrouping("spout", "count-stream")
                .fieldsGrouping("split", "count-stream", new Fields("word"))   //?
                .shuffleGrouping("count", "count-stream")
        ;
        builder.setBolt("print", new PrinterBolt(), 1)
                .shuffleGrouping("count", "print-stream");
    }

现在每个Bolt的Group方式都是一样的了,并且component-id也是一样的,只有最后的stream-id不同。

很好,可以像抽取declareStream那样抽取setBolt了:

    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout",new RandomSentenceSpout(),1);

        setBolt(builder, new SplitSentenceBolt(), "split");
        setBolt(builder, new WordCountBolt(), "count");
        builder.setBolt("print", new PrinterBolt(), 1)
            .shuffleGrouping("count", "print-stream");
    }
    public static void setBolt(TopologyBuilder builder,IRichBolt bolt,String name){
        builder.setBolt(name, bolt, 2)
                .shuffleGrouping("spout", name + "-stream")
                .fieldsGrouping("split", name + "-stream", new Fields("word"))
                .shuffleGrouping("count", name + "-stream")
        ;
    }

每个Bolt都设置了多种分组策略,而分组的第一个参数component表示数据源自哪里,

现在SplitSentenceBolt和WordCountBolt都定义了三种分组策略,

那么是不是说[split]的数据源有:[spout],[split],[count],

同样[count]的数据源也有:[spout],[split],[count],这跟实际的Topology结构就完全不一样了。

可以看到下图的拓扑结构比原先的WordCountTopology多了几条线(而且还能自己指向自己我也是醉了)。

不过虽然每个Bolt都有多个输入源,但是输入源组件不一定有指定的stream-id。

比如split的数据源虽然有三个[spout],[split],[count],但是这三个组件中stream-id=”split-stream”的组件

只有[spout],因此即使设置了三个数据源,另外两个数据源是无效的。

同样[count]的数据源虽然也有三个[spout],[split],[count],但是这三个组件中stream-id=”count-stream”的组件也只有[split]才有。

所以最后实际上拓扑图还是最原始的[spout]->[split]->[count]->[print],并不会出现之前出现的多条线以及自己指向自己的情况。

最后一个Bolt

可以把最后一个PrintBolt也都加到每个Bolt的分组策略里吗?

        builder.setBolt("split", new SplitSentenceBolt(), 2)
                .shuffleGrouping("spout", "split-stream")                      //?
                .fieldsGrouping("split", "split-stream", new Fields("word"))
                .shuffleGrouping("count", "split-stream")
                .shuffleGrouping("print", "split-stream")
        ;
        builder.setBolt("count", new WordCountBolt(), 2)
                .shuffleGrouping("spout", "count-stream")
                .fieldsGrouping("split", "count-stream", new Fields("word"))   //?
                .shuffleGrouping("count", "count-stream")
                .shuffleGrouping("print", "count-stream")
        ;
        builder.setBolt("print", new PrinterBolt(), 1)
                .shuffleGrouping("spout", "print-stream")
                .fieldsGrouping("split", "print-stream", new Fields("word"))
                .shuffleGrouping("count", "print-stream")                      //??
                .shuffleGrouping("print", "print-stream")
        ;

拓扑图是这样的,虚线表示实际上是不存在的(因为输入源本身没有发射到这些stream)。

Opps….报错显示:[count]组件订阅了[print]组件中一个不存在的[count-stream]

9510 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name=‘word-count‘) #
<InvalidTopologyException InvalidTopologyException(msg:Component:
    [count] subscribes from non-existent stream: [count-stream] of component [print])>
9552 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

下面修改不同Bolt中和Print相关的分组方式,只有把Print全部注释掉才可以

  1. 不注释: [count] subscribes from non-existent stream: [count-stream] of component [print]
  2. 注释①: [split] subscribes from non-existent stream: [split-stream] of component [print]
  3. 注释①②: [print] subscribes from non-existent stream: [print-stream] of component [print]
  4. 注释①②③: SUCCESS!
        builder.setBolt("split", new SplitSentenceBolt(), 2)
                .shuffleGrouping("spout", "split-stream")                      //?
                .fieldsGrouping("split", "split-stream", new Fields("word"))
                .shuffleGrouping("count", "split-stream")
                //.shuffleGrouping("print", "split-stream")  //②
        ;
        builder.setBolt("count", new WordCountBolt(), 2)
                .shuffleGrouping("spout", "count-stream")
                .fieldsGrouping("split", "count-stream", new Fields("word"))   //?
                .shuffleGrouping("count", "count-stream")
                //.shuffleGrouping("print", "count-stream")  //①
        ;
        builder.setBolt("print", new PrinterBolt(), 1)
                .shuffleGrouping("spout", "print-stream")
                .fieldsGrouping("split", "print-stream", new Fields("word"))
                .shuffleGrouping("count", "print-stream")                      //?
                //.shuffleGrouping("print", "print-stream")  //③
        ;

发生了什么事?不存在stream为什么就不行?可是前面以SplitSentenceBolt为例,split和count也不存在split-stream啊,为什么就不会报错呢?

原因在于我们的PrintBolt只是打印数据,然后什么都不做,它没有emit出任何消息,也就没有emit消息到任何消息流,所以下图中从PrintBolt出来的线根本就不存在!

怎么办呢,很简单,给PrintBolt添加一个带有stream-id的emit,同时也要在declareOutputFields中声明这个输出流。

只要PrintBolt有输出流,就不会报错了。也就是确保每个Bolt都会往下发送消息

最终完整的代码如下:

public class WordCountTopologyStream3 {

    public static class RandomSentenceSpout extends BaseRichSpout {
        SpoutOutputCollector collector;
        Random rand;
        String[] sentences = null;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            rand = new Random();
            sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        }

        @Override
        public void nextTuple() {
            Utils.sleep(1000);
            String sentence = sentences[rand.nextInt(sentences.length)];
            System.out.println("\n" + sentence);
            this.collector.emit("split-stream", new Values(sentence));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("sentence"));
        }
        public void ack(Object id) {}
        public void fail(Object id) {}
    }

    public static class SplitSentenceBolt extends BaseRichBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for (String word : words) {
                this.collector.emit("count-stream", new Values(word));
            }
            this.collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("word"));
        }
    }

    public static class WordCountBolt extends BaseRichBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
        private OutputCollector collector;
        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit("print-stream", new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("word", "count"));
        }
    }

    public static class PrinterBolt extends BaseRichBolt {
        private OutputCollector collector;
        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        @Override
        public void execute(Tuple tuple) {
            String first = tuple.getString(0);
            int second = tuple.getInteger(1);
            System.out.println(first + "," + second);
            collector.emit("whatever-stream", new Values(first + ":" + second));  //?
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("word:count"));  //?
        }
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        setBolt(builder, new SplitSentenceBolt(), "split");
        setBolt(builder, new WordCountBolt(), "count");
        setBolt(builder, new PrinterBolt(), "print");

        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }

    public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
        declarer.declareStream("split-stream", fields);
        declarer.declareStream("count-stream", fields);
        declarer.declareStream("print-stream", fields);
        declarer.declareStream("whatever-stream", fields);      //?
    }

    public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
        builder.setBolt(name, bolt, 2)
                .shuffleGrouping("spout", name + "-stream")
                .fieldsGrouping("split", name + "-stream", new Fields("word"))
                .shuffleGrouping("count", name + "-stream")
                .shuffleGrouping("print", name + "-stream")     //?
        ;
    }
}

你以为这样就完了吗,如果把PrintBolt的输出stream-id去掉,即采用默认的default的话:

    public static class PrinterBolt extends BaseRichBolt {
        @Override
        public void execute(Tuple tuple) {
            String first = tuple.getString(0);
            int second = tuple.getInteger(1);
            System.out.println(first + "," + second);
            //collector.emit("whatever-stream", new Values(first + ":" + second));
            collector.emit(new Values(first + ":" + second));
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //declareStream(declarer, new Fields("word:count"));
            declarer.declare(new Fields("word:count"));
        }
    }

    public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
        declarer.declareStream("split-stream", fields);
        declarer.declareStream("count-stream", fields);
        declarer.declareStream("print-stream", fields);
        //declarer.declareStream("whatever-stream", fields);      //?
    }

    public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
        builder.setBolt(name, bolt, 2)
                .shuffleGrouping("spout", name + "-stream")
                .fieldsGrouping("split", name + "-stream", new Fields("word"))
                .shuffleGrouping("count", name + "-stream")
                .shuffleGrouping("print", name + "-stream")
        ;
    }

还是报错:[count]组件订阅了[print]组件中不存在的[count-stream]

Component: [count] subscribes from non-existent stream: [count-stream] of component [print]

好吧,看来前面的组件都使用自定义的stream-id,最后一个组件也必须使用自定义的stream-id,即使这个stream-id看起来没什么意义!

EOF.

时间: 2024-08-01 17:02:39

用实例理解Storm的Stream概念的相关文章

我为什么要理解storm的一些概念

本文翻译自官方文档:http://storm.apache.org/documentation/Concepts.html. Topology,拓扑:类似MapReduce的Job.一个重要区别是MR的任务通常有结束,然而拓扑是一直运行下去的.在后端,拓扑就是一个Thrift结构体(structure),因此可以通过任何语言编写拓扑.Java提供了TopologyBuilder工具类,来帮助组装拓扑. Stream,流:流是一连串的tuple,在定义时会同时指定schema,这个schema定义

java 实例理解区块链的概念

区块链的核心是去中心化的存储,传统的数据库解决方案,包括关系型数据库,非关系型数据库,都是属于中心化的存储方式.去中心化的存储,就是数据没有中心,并且每个数据节点都包含了上一个数据节点的信息. 通过一个实例来理解区块链的数据存储形式: package com.weihua.blockchains.blackchain; import java.util.Date; public class BlockMan { public String hash; public String previous

用实例的方式去理解storm的并行度

什么是storm的并发度 一个topology(拓扑)在storm集群上最总是以executor和task的形式运行在suppervisor管理的worker节点上.而worker进程都是运行在jvm虚拟机上面的,每个拓扑都会被拆开多个组件分布式的运行在worker节点上. 1.worker 2.executor 3.task 这三个简单关系图: 一个worker工作进程运行一个拓扑的子集(其实就是拓扑的组件),每个组件的都会以executor(线程)在worker进程上执行,一个worker进

storm源码之理解Storm中Worker、Executor、Task关系【转】

[原]storm源码之理解Storm中Worker.Executor.Task关系 Storm在集群上运行一个Topology时,主要通过以下3个实体来完成Topology的执行工作:1. Worker(进程)2. Executor(线程)3. Task 下图简要描述了这3者之间的关系:                                                    1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服

Effective Objective-C 2.0 — 第二章 对象、消息、运行期 - 第六条:理解“属性”这一概念

开发者通过对象来 存储并传递数据. 在对象之间传递数据并执行任务的过程就叫做“消息传递”. 这两条特性的工作原理? Objective-C运行期环境(Objective-C runtime) ,提供了使得对象之间能够传递消息的重要函数,并且包含创建类实例所用的全部逻辑. 第六条:理解“属性”这一概念 property:

理解MySQL——架构与概念

理解MySQL——架构与概念 写在前面:最早接触的MySQL是在三年前,那时候MySQL还是4.x版本,很 多功能都不支持,比如,存储过程,视图,触发器,更别说分布式事务等复杂特性了.但从5.0(2005年10月)开始,MySQL渐渐步入企业级数据库的 行列了:复制.集群.分区.分布式事务,这些企业级的特性,使得现在的MySQL,完全可以应用于企业级应用环境(很多互联网公司都用其作为数据库服务 器,尽管节约成本是一个因素,但是没有强大功能作后盾,则是不可想象的).虽然,MySQL还有很多不足,比

理解maven的核心概念

原文链接:http://www.cnblogs.com/holbrook/archive/2012/12/24/2830519.html 好久没进行java方面的开发了,最近又完成了一个java相关的任务,顺便重新体会了 maven 这一利器. 在使用过程中发现以前对maven的理解不够深入,借此机会重新梳理了一下maven的核心概念.相信理解了这些核心概念, 即使长时间不使用,以后再重新上手也会非常容易. 本文以类图的方式,介绍maven核心的12个概念以及相互之间的关系. Table of

深入理解正则表达式环视的概念与用法

在<深入理解正则表达式高级教程-环视>中已经对环视做了简单的介绍,但是,可能还有一些读者比较迷惑,今天特意以专题的形式,深入探讨一下正则表达式的环视的概念与用法. 深入理解正则表达式环视的概念与用法 一.环视的概念 (一)环视概念与匹配过程示例 示例一:简单环视匹配过程 (二)什么是消耗正则的匹配字符? 示例二:一次匹配消耗匹配字符匹配过程 示例三:多次匹配消耗匹配字符匹配过程 二.环视的类型 (一)肯定和否定 (二)顺序和逆序 · 两种类型名称组合 · 四种组合的用法 四种组合正则与环视的摆

iOS 键值观察(KVO)简述及实例理解

KVO概述: KVO,即:Key-Value Observing,直译为:基于键值的观察者.  它提供一种机制,当指定的对象的属性被修改后,则对象就会接受到通知. 简单的说就是每次指定的被观察的对象的属性被修改后,KVO就会自动通知相应的观察者了.KVO的优点: 当有属性改变,KVO会提供自动的消息通知.这样开发人员不需要自己去实现这样的方案:每次属性改变了就发送消息通知. 这是KVO机制提供的最大的优点.因为这个方案已经被明确定义,获得框架级支持,可以方便地采用. 开发人员不需要添加任何代码,