欢迎访问:鲁春利的工作笔记,学习是一种信仰,让时间考验坚持的力量。
简介:
Storm是由BackType开发的实时处理系统,底层采用Clojure实现的,BackType现在已在Twitter麾下。Twitter将Storm贡献给了开源社区,这是一个分布式的、容错的实时计算系统,它被托管在GitHub上,遵循 Eclipse Public License 1.0。
基本概念:
在storm中有一些核心基本概念,包括Topology、Nimbus、Supervisor、Worker、Executor、Task、Spout、Bolt、Tuple、Stream、Stream分组(grouping)等。
入门示例:
package storm; import java.util.Map; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * * @author lucl * * */ public class LocalStormTopology { /** * 发射数据 * @author lucl * */ public static class DatasourceSpout extends BaseRichSpout { private Map conf = null; private TopologyContext context = null; private SpoutOutputCollector collector = null; int num = 0; /** * 死循环,一直调用 */ @Override public void nextTuple() { try { Thread.sleep(1 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } if (num % 2 == 0) { System.out.println("spout发射数据(even): " + num); this.collector.emit(new Values("even", num)); // Values支持动态参数 } else { System.out.println("spout发射数据(odd): " + num); this.collector.emit(new Values("odd", num)); // Values支持动态参数 } num += 1; } /** * 初始化方法,在topology初始化时调用一次 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * spout发射数据,bolt读取数据,但是bolt怎么知道如何解析发射出的数据,就需要提前声明 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("type", "num")); // 与上面的data、num字段数目相对应 } } /** * * @author lucl * 数据处理 */ public static class DataSourceBolt extends BaseRichBolt { private Map conf = null; private TopologyContext context; private OutputCollector collector; /** * 对于spout的数据进行处理 */ int even = 0; int odd = 0; @Override public void execute(Tuple tuple) { String type = tuple.getStringByField("type"); Integer value = tuple.getIntegerByField("num"); if ("even".equals(type)) { even += value; System.out.println("bolt处理的偶数(even)累加结果:" + even); } else { odd += value; System.out.println("bolt处理的偶数(odd)累加结果:" + odd); } } @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer ceclarer) { System.out.println("method declareOutputFields of class Bolt!"); // 如果这个bolt还需要把数据传递给下一个bolt进行处理,那么还需要声明其输出字段 } } /** * * @param args */ public static void main(String[] args) { // 在这个topology中,所有spout和bolt的id标识不可相同 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new DatasourceSpout()); builder.setBolt("bolt", new DataSourceBolt()).shuffleGrouping("spout"); // 采用本地模式来执行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("topology", new Config(), builder.createTopology()); } }
运行结果如下:
时间: 2024-10-16 23:28:42