1 package org.apache.storm.storm_core; 2 3 import java.util.Map; 4 5 import backtype.storm.task.OutputCollector; 6 import backtype.storm.task.TopologyContext; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.topology.base.BaseRichBolt; 9 import backtype.storm.tuple.Fields; 10 import backtype.storm.tuple.Tuple; 11 import backtype.storm.tuple.Values; 12 13 public class SplitSentenceBolt extends BaseRichBolt { 14 /** 15 * 16 */ 17 private static final long serialVersionUID = -2107029392155190729L; 18 private OutputCollector collector;// 用来向其他Spout发射tuple的发射器 19 20 /* 21 * (non-Javadoc) prepare方法类似于open方法,prepare在bolt初始化时被调用 22 */ 23 public void prepare(Map stormConf, TopologyContext context, 24 OutputCollector collector) { 25 // TODO Auto-generated method stub 26 this.collector = collector;// 发射器初始化 27 28 } 29 30 public void execute(Tuple input) { 31 // TODO Auto-generated method stub 32 // 接收从SentenceSpout的发射器发射过来的tuple,因为SentenceSpout中声明的tuple字段为sentence,故getStringByField方法的参数为sentence 33 String sentence = input.getStringByField("sentence");// 该tuple是一个包含 34 // 键为sentence 35 // 值为字符串 36 // 的列表List<Map<sentence,String>> 37 String[] words = sentence.split(" ");// 将字符串分解成一个个的单词 38 for (String word : words) 39 this.collector.emit(new Values(word));// 将每个单词构造成tuple并发送给下一个Spout 40 } 41 42 public void declareOutputFields(OutputFieldsDeclarer declarer) { 43 // TODO Auto-generated method stub 44 declarer.declare(new Fields("word"));// 定义SplitSentenceBolt发送的tuple的字段("键值")为 word 45 } 46 }
1 package org.apache.storm.storm_core; 2 3 import java.util.Map; 4 5 import backtype.storm.spout.SpoutOutputCollector; 6 import backtype.storm.task.TopologyContext; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.topology.base.BaseRichSpout; 9 import backtype.storm.tuple.Fields; 10 import backtype.storm.tuple.Values; 11 import backtype.storm.utils.Utils; 12 13 public class SentenceSpout extends BaseRichSpout { 14 /** 15 * 16 */ 17 private static final long serialVersionUID = 3444934973982660864L; 18 private SpoutOutputCollector collector;// 用来向其他Spout发射tuple 19 private String[] sentences = { "my dog has fleas", "i like cold beverages", 20 "the dog ate my homework", "don‘t have a cow man", 21 "i don‘t think i like fleas" }; 22 23 private int index = 0; 24 25 /* 26 * open() 方法在所有的Spout组件初始化时被调用 27 * 28 * @param Map conf storm 配置信息 29 * 30 * @context TopologyContext topology 组件信息 31 */ 32 public void open(@SuppressWarnings("rawtypes") Map conf, 33 TopologyContext context, SpoutOutputCollector collector) { 34 // TODO Auto-generated method stub 35 this.collector = collector; 36 } 37 38 /* 39 * Values.java extends ArrayList Storm 调用该方法向输出的collector发射tuple 40 */ 41 public void nextTuple() { 42 // TODO Auto-generated method stub 43 // 以字符串数组sentences 中的每个字符串 作为参数 构造tuple 44 this.collector.emit(new Values(sentences[index]));// 通过emit方法将构造好的tuple发送出去 45 index++; 46 if (index >= sentences.length) { 47 index = 0; 48 } 49 Utils.sleep(100); 50 } 51 52 /* 53 * SentenceSpout 发送的tuple它是一个包含键值对的List,该方法声明了List中包含的键值对的键为 sentence 54 */ 55 public void declareOutputFields(OutputFieldsDeclarer declarer) { 56 // TODO Auto-generated method stub 57 declarer.declare(new Fields("sentence"));// 标记SentenceSpout发送的tuple的键为 58 // sentence 59 } 60 }
1 package org.apache.storm.storm_core; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.topology.TopologyBuilder; 6 import backtype.storm.tuple.Fields; 7 import backtype.storm.utils.Utils; 8 9 public class WordCountTopology { 10 private static final String SENTENCE_SPOUT_ID = "sentence-spout"; 11 private static final String SPLIT_BOLT_ID = "split-bolt"; 12 private static final String COUNT_BOLT_ID = "count-bolt"; 13 private static final String REPORT_BOLT_ID = "report-bolt"; 14 private static final String TOPOLOGY_NAME = "word-count-topology"; 15 16 public static void main(String[] args) throws Exception{ 17 SentenceSpout spout = new SentenceSpout(); 18 SplitSentenceBolt splitBolt = new SplitSentenceBolt(); 19 WordCountBolt countBolt = new WordCountBolt(); 20 ReportBolt reportBolt = new ReportBolt(); 21 22 TopologyBuilder builder = new TopologyBuilder(); 23 builder.setSpout(SENTENCE_SPOUT_ID, spout); 24 builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID); 25 builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); 26 builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID); 27 28 Config config = new Config(); 29 LocalCluster cluster = new LocalCluster(); 30 31 cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); 32 Utils.sleep(1000); 33 cluster.killTopology(TOPOLOGY_NAME); 34 cluster.shutdown(); 35 36 } 37 }
package org.apache.storm.storm_core; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; public class ReportBolt extends BaseRichBolt{ /** * */ private static final long serialVersionUID = 4921144902730095910L; // private OutputCollector collector; ReportBolt不需要发射tuple了 private HashMap<String, Long> counts = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub this.counts = new HashMap<String, Long>(); } public void execute(Tuple input) { // TODO Auto-generated method stub String word = input.getStringByField("word"); Long count = input.getLongByField("count"); this.counts.put(word, count); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub //不需要发出任何数据流 } //Topology在storm集群中运行时,cleanup方法是不可靠的,并不能保证它一定会执行 public void cleanup(){ System.out.println("------ print counts ------"); List<String> keys = new ArrayList<String>(); keys.addAll(counts.keySet());//将HashMap中所有的键都添加到一个集合中 Collections.sort(keys);//对键(单词)进行排序 for(String key : keys)//输出排好序的每个单词的出现次数 System.out.println(key + " : " + this.counts.get(key)); System.out.println("--------bye----------"); } }
package storm.starter; import java.util.HashMap; import java.util.Map; import storm.starter.RandomSentenceSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * This topology demonstrates Storm‘s stream groupings and multilang * capabilities. */ public class WordCountTopology { public static class SplitSentence extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); System.out.println(msg + "-------------------"); if (msg != null) { String[] s = msg.split(" "); for (String string : s) { collector.emit(new Values(string)); } } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程 如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了 一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交 但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。 */ conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); //指定为本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } }
时间: 2024-10-13 07:32:33