1. 新建一个Maven项目,pom.xml代码如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yg</groupId> <artifactId>storm</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>storm</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.path.to.main.Class</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
2.新建HelloWorldSpout.java,代码如下:
package com.yg.storm.spouts; import java.util.Map; import java.util.Random; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; public class HelloWorldSpout extends BaseRichSpout{ /** * 功能:随机生成字符串 * 实现:先产生一个1-10随机整数,再不断产生一个1-10随机整数,若两者 * 相等,则发射hello world,否则发送其他字符串 */ private static final long serialVersionUID = -5698117627723074157L; private static final int MAX_RANDOM = 10; private int referenceRandom; private SpoutOutputCollector collector; //构造函数 public HelloWorldSpout(){ //产生第一个随机数 final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); } //在spout加载时,打开一些资源(只在spout加载的时候执行一次) @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //核心方法,storm会不断调用该方法,也就是方法执行完后会马上重置并再次执行 @Override public void nextTuple() { Utils.sleep(1000);//停滞一秒 final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if (referenceRandom == instanceRandom){ collector.emit(new Values("Hello World"));//有顺序的 } else { collector.emit(new Values("Other Random Word")); } } //声明Tuple的字段名,有顺序的 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } }
3.新建HelloWorldBolt.java,代码如下:
package com.yg.storm.bolts; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; public class HelloWorldBolt extends BaseRichBolt{ /** * 功能:就收到spout发送的数据,打印并统计hello world的数量 * 实现:打印,创建计数变量用于统计hello world */ private static final long serialVersionUID = -5061906223048521415L; private int myCount = 0;//计数变量,不能在execute函数中初始化 private TopologyContext context;//上下文变量 private OutputCollector collector; //相当于spout中的open @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.context = context; this.collector = collector; } //相当于spout中的nextTuple @Override public void execute(Tuple input) { //拿到数据,用字段名取出 String text = input.getStringByField("sentence"); System.out.println("One tuple gets in: " + context.getThisTaskId() + text); if ("Hello World".equals(text)){ myCount++; System.out.println("Found a Hello World! My count is now:" + myCount); } collector.ack(input);//处理完成要通知Storm // collector.fail(input);//处理失败要通知Storm } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
4.新建HelloWorldTopolog.java,代码如下:
package com.yg.storm.topologies; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; import com.yg.storm.bolts.HelloWorldBolt; import com.yg.storm.spouts.HelloWorldSpout; public class HelloWorldTopology { //可以向main传递一个参数作为集群模式下的Topology的名字,若没有传入参数则使用本地模式 public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("hlSpout", new HelloWorldSpout()); builder.setBolt("hlBolt", new HelloWorldBolt()) .shuffleGrouping("hlSpout"); Config conf = new Config(); if (args != null && args.length > 0){ //集群模式提交 conf.setNumWorkers(3); try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { //本地模式提交 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(1000*60); cluster.killTopology("test"); cluster.shutdown(); } } }
直接本地运行HelloWorldTopology类即可.
原文地址:https://www.cnblogs.com/dreamboy/p/11392809.html
时间: 2024-10-17 23:25:16