新建Maven项目
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
</dependency>
</dependencies>
JavaCode
1 package mystorm; 2 3 import java.io.File; 4 import java.io.IOException; 5 import java.util.HashMap; 6 import java.util.List; 7 import java.util.Map; 8 import java.util.Map.Entry; 9 10 import org.apache.commons.io.FileUtils; 11 12 import clojure.main; 13 import clojure.lang.MapEntry; 14 15 import backtype.storm.LocalCluster; 16 import backtype.storm.StormSubmitter; 17 import backtype.storm.generated.AlreadyAliveException; 18 import backtype.storm.generated.InvalidTopologyException; 19 import backtype.storm.spout.SpoutOutputCollector; 20 import backtype.storm.task.OutputCollector; 21 import backtype.storm.task.TopologyContext; 22 import backtype.storm.topology.OutputFieldsDeclarer; 23 import backtype.storm.topology.TopologyBuilder; 24 import backtype.storm.topology.base.BaseRichBolt; 25 import backtype.storm.topology.base.BaseRichSpout; 26 import backtype.storm.tuple.Fields; 27 import backtype.storm.tuple.Tuple; 28 import backtype.storm.tuple.Values; 29 30 //本地模式 31 public class WordCountApp { 32 33 public static void main(String[] args) throws Exception { 34 final TopologyBuilder topologyBuilder = new TopologyBuilder(); 35 36 topologyBuilder.setSpout("1", new MySpout()); 37 topologyBuilder.setBolt("2", new SplitLineBolt()).shuffleGrouping("1"); 38 topologyBuilder.setBolt("3", new WordCountBolt()).shuffleGrouping("2"); 39 40 final HashMap conf = new HashMap(); 41 42 final StormSubmitter stormSubmitter = new StormSubmitter(); 43 stormSubmitter.submitTopology(WordCountApp.class.getSimpleName(), conf, topologyBuilder.createTopology()); 44 } 45 } 46 47 class MySpout extends BaseRichSpout{ 48 private static final long serialVersionUID = 1L; 49 50 SpoutOutputCollector collector = null; 51 52 public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { 53 this.collector = collector; 54 } 55 56 //最最重要的方法,处理数据的。简单认为是死循环的,监听文件内容的变化 57 public void nextTuple() { 58 try { 59 final List<String> readLines = FileUtils.readLines(new File("/root/Downloads/hello")); 60 for (String line : readLines) { 61 //把每一行看作一个tuple 62 final Values tuple = new Values(line); 63 //collector把tuple送出去,交给bolt处理 64 collector.emit(tuple); 65 } 66 Thread.sleep(2000L); 67 } catch (Exception e) { 68 e.printStackTrace(); 69 } 70 } 71 72 73 public void declareOutputFields(OutputFieldsDeclarer declarer) { 74 final Fields fields = new Fields("line"); 75 declarer.declare(fields); 76 } 77 78 } 79 80 class SplitLineBolt extends BaseRichBolt{ 81 82 OutputCollector collector = null; 83 84 public void execute(Tuple tuple) { 85 final String line = tuple.getString(0); 86 final String[] splited = line.split("\t"); 87 for (String word : splited) { 88 this.collector.emit(new Values(word)); 89 } 90 try { 91 Thread.sleep(2000L); 92 } catch (InterruptedException e) { 93 e.printStackTrace(); 94 } 95 } 96 97 public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { 98 this.collector = collector; 99 } 100 101 public void declareOutputFields(OutputFieldsDeclarer declarer) { 102 declarer.declare(new Fields("word")); 103 } 104 } 105 106 107 class WordCountBolt extends BaseRichBolt{ 108 109 OutputCollector collector = null; 110 111 Map<String,Integer> map = new HashMap<String,Integer>(); 112 113 public void execute(Tuple tuple) { 114 final String word = tuple.getString(0); 115 final Integer value = map.get(word); 116 if(value==null) { 117 map.put(word, 1); 118 }else { 119 map.put(word, value+1); 120 } 121 122 try { 123 Thread.sleep(2000L); 124 } catch (InterruptedException e) { 125 e.printStackTrace(); 126 } 127 } 128 129 @Override 130 public void cleanup() { 131 for (Entry<String, Integer> entry : map.entrySet()) { 132 System.err.println(entry.getKey()+":"+entry.getValue()); 133 } 134 } 135 136 public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { 137 this.collector = collector; 138 } 139 140 public void declareOutputFields(OutputFieldsDeclarer declarer) { 141 declarer.declare(new Fields("word")); 142 } 143 144 145 }
本地运行
final HashMap conf = new HashMap();
final LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(WordCountApp.class.getSimpleName(), conf, topologyBuilder.createTopology());
时间: 2024-11-06 21:57:20