JStorm - Hello Word


1. 创建Maven工程


2. 创建HelloWorldBold

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

 * Hello world!
public class HelloWorldBolt extends BaseRichBolt {
	private static final long serialVersionUID = 1L;
	private int myCount = 0;

	 * prepare() => on create
	public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {

	 * execute() => most important method in the bolt is execute(Tuple input),
	 * which is called once per tuple received the bolt may emit several tuples
	 * for each tuple received
	public void execute(Tuple tuple) {
		String test = tuple.getStringByField("sentence");
		if (test == "Hello World") {
			System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount));

	 * declareOutputFields => This bolt emits nothing hence no body for
	 * declareOutputFields()
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

3. 创建HelloWorldSpout

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

 * Hello world!
public class HelloWorldSpout extends BaseRichSpout {
	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	private int referenceRandom;
	private static final int MAX_RANDOM = 10;

	public HelloWorldSpout() {
		final Random rand = new Random();
		referenceRandom = rand.nextInt(MAX_RANDOM);

	 * declareOutputFields() => you need to tell the Storm cluster which fields
	 * this Spout emits within the declareOutputFields method.
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));

	 * open() => The first method called in any spout is 'open' TopologyContext
	 * => contains all our topology data SpoutOutputCollector => enables us to
	 * emit the data that will be processed by the bolts conf => created in the
	 * topology definition
	public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext topologyContext,
			SpoutOutputCollector collector) {
		this.collector = collector;

	 * nextTuple() => Storm cluster will repeatedly call the nextTuple method
	 * which will do all the work of the spout. nextTuple() must release the
	 * control of the thread when there is no work to do so that the other
	 * methods have a chance to be called.
	public void nextTuple() {
		final Random rand = new Random();
		int instanceRandom = rand.nextInt(MAX_RANDOM);
		if (instanceRandom == referenceRandom) {
			collector.emit(new Values("Hello World"));
		} else {
			collector.emit(new Values("Other Random Word"));

4. 创建HelloWorldTopology

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;

 * Hello world!
public class HelloWorldTopology {
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10);
		builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 1).shuffleGrouping("randomHelloWorld");
		Config conf = new Config();
		if (args != null && args.length > 0) {// 如果在JStrom集群中运行
			// JStorm 安装完后,默认的NIMBUS端口配置为7672
			conf.put(Config.NIMBUS_THRIFT_PORT, 7672);
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

5. 编译打包


6. 提交至JStorm

# $JSTORM_HOME/bin/jstorm jar HelloWorld.jar com.test.jstorm.HelloWorldTopology HelloWorld


时间: 2025-01-14 10:34:17

