一、关联代码
使用maven,代码如下。
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.ljh.storm</groupId> <artifactId>storm-helloworld</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>storm-helloworld</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> </dependencies> </project>
ExclamationTopology.java
package cn.ljh.storm.helloworld; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; public class ExclamationTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 1); builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word"); builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test3", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test3"); cluster.shutdown(); } } }
TestWordSpout.java
package cn.ljh.storm.helloworld; import org.apache.storm.topology.OutputFieldsDeclarer; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestWordSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); SpoutOutputCollector _collector; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
ExclamationBolt.java
package cn.ljh.storm.helloworld; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
PrintBolt.java
package cn.ljh.storm.helloworld; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrintBolt extends BaseRichBolt { private static Logger LOG = LoggerFactory.getLogger(PrintBolt.class); OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { LOG.info(tuple.getString(0) + " Hello World!"); _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
storm有本地模式和集群模式。
本地模式一般用于测试和开发阶段,直接在Eclipse执行ExclamationTopology的main函数进行。
集群模式需要先把应用达成jar,然后使用storm命令提交到集群中去。
提交命令:storm jar /home/test/storm-helloworld-0.0.1-SNAPSHOT.jar cn.ljh.storm.helloworld.ExclamationTopology ExclamationTest
杀死命令:storm kill ExclamationTest
二、集群运行效果
运行提交命令后,出现如下log,说明提交成功。
查看集群的进程jps,两个Supervisor节点出现了worker进程
在Nimbus节点的/usr/local/storm/data/nimbus/inbox下面有提交的jar
UI界面显示提交topology
至此HelloWorld示例完成。
时间: 2024-12-21 04:42:04