一,环境搭建
eclipse的项目的创键和jar包的导入。
二,代码编写
1,组件spout的代码编写,用来发射数据源。
package com; import java.util.Map; import java.util.Random; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class RandomSentenceSpout extends BaseRichSpout{ //用来收集spout的输出tuple private SpoutOutputCollector Collector; //private Random rand; private static final long SrialversionUID=1l; @Override public void nextTuple() { // String[] data={"hello zhangsan","nice to meet","you zhangsan hello","lisi welcome to bj"}; // Collector.emit(new Values(data[rand.nextInt(data.length-1)])); String[] datas= {"hello zhangsan nice to meet you zhangsan hello lisi welcome to bj"}; Values values=new Values(datas[0]); //发射的数据 Collector.emit(values); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //初始化操作,只执行一遍 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector Collector ) { this.Collector=Collector; } //为发射的数据添加唯一标识, @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("spout")); } }
2,bolt组件的代码编写,用来切割字段。
package com; import java.util.Map; import java.util.Random; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class RandomSentenceSpout extends BaseRichSpout{ //用来收集spout的输出tuple private SpoutOutputCollector Collector; //private Random rand; private static final long SrialversionUID=1l; @Override public void nextTuple() { // String[] data={"hello zhangsan","nice to meet","you zhangsan hello","lisi welcome to bj"}; // Collector.emit(new Values(data[rand.nextInt(data.length-1)])); String[] datas= {"hello zhangsan nice to meet you zhangsan hello lisi welcome to bj"}; Values values=new Values(datas[0]); Collector.emit(values); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //初始化操作,只执行一遍 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector Collector ) { this.Collector=Collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("spout")); } }
3,bolt组件的代码编写,用来统计字段的数量。
package com; import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class WordCount extends BaseRichBolt{ private static final Long SrialversionUID=1l; private OutputCollector collector; Map<String,Integer>map=new HashMap<String,Integer>(); @Override public void execute(Tuple value) { String data = value.getStringByField("word"); if(map.containsKey(data)){ map.put(data, map.get(data)+1); }else{ map.put(data,1); } System.out.println(map); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { this.collector=collector; } @Override public void declareOutputFields(OutputFieldsDeclarer d) { //d.declare(new Fields("words","int")); } }
4,编写提交类
package com; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class mian { public static void main(String[] args) { TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("spout", new RandomSentenceSpout()); topologyBuilder.setBolt("wordBolt", new WordBolt()).shuffleGrouping("spout"); topologyBuilder.setBolt("wordint", new WordCount()).fieldsGrouping("wordBolt", new Fields("word")); Config config = new Config(); if(args==null||args.length==0){ //集群模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordCount",config ,topologyBuilder.createTopology()); }else{ //单机模式 config.setNumWorkers(1); try { StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
5,打成jar包,上传到服务器运行。注意只打主类的class,不要连带项目中的jar一起打入。否则在集群上面会报错。
storm实战之WordCount
原文地址:https://www.cnblogs.com/songweideboke/p/9901083.html
时间: 2024-10-12 22:38:33