官方提供的storm starter示例中,有很多应用的例子,对storm的应用场景理解很有帮助。本文结合源码来进行功能分解,记录一下,作为记忆索引吧。
先来看一个比较简单的示例:WordCountTopology,原版代码该示例是为了说明多语言适配而做的应用场景,主要功能是随机生成一些String,将这些String划分分组,统计各单词出现数量。后来修改了一下,去掉了py调用的地方。使用java来进行词组划分。
先来看一下Topology:
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); // 数据源 builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); // 单词划分 builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",new Fields("word")); // 统计单词出现个数 Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar("wordCount", conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
RandomSentenceSpout功能很简单,随机生成字符串到tuple中,key为word。
再看一下bolt,该示例中使用了两类分组方式shuffleGrouping(随机分组),fieldsGrouping(按字段分组)。使用shuffleGrouping来进行单词划分,为了保证单词统计时都在一个bolt中进行,使用fieldsGrouping来进行word划分统计,运行时会看到相同key值的tuple会分配到同一线程上。
public static class SplitSentence implements IBasicBolt { public void prepare(Map conf, TopologyContext context) { } public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { // 将Spout接收到的tuple按空格进行分解,产生单词数据流 collector.emit(new Values(word)); } } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); // 定义key值 } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
单词统计bolt:
public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); // 输出结果word + word number } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
总结:
通常情况下,为了保证数据可靠性与完整性,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,相当于自动处理了prepare方法和collector.emit.ack(inputTuple);
storm starter学习(一)
时间: 2024-11-05 22:48:44