Topology的代码如下:
TopologyBuilder builder = new TopologyBuilder();
//WordReaderSpout会从文件中读取数据,数据用shuffle的方式发送给bolt进行处理
//当文件读取完成后,会发送一个global消息
builder.setSpout("word-reader",new WordReaderSpout());
builder.setBolt("word-normalizer", new WordNormalizerBolt())
.shuffleGrouping("word-reader")
.globalGrouping("word-reader", "FINISH");
builder.setBolt("word-counter", new WordCounterBolt(),1)
.fieldsGrouping("word-normalizer", new Fields("word"))
.globalGrouping("word-normalizer", "CLOSE");
以globalGrouping为例:
globalGrouping(“word-reader”, “FINISH”); 两个参数的含义
第一个参数: “word-reader” 为componet id, 这个值 与我们代码的 中的word-reader一致。
builder.setSpout("word-reader",new WordReaderSpout());
第二个参数: “FINISH”为stream id
这个值是在WordReaderSpout中定义的
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
declarer.declareStream("FINISH", new Fields("FINISH Reade file"));
}
下面以我们熟知的HashMap为例,解释这两个参数
1) 我们需要定义一个word-reader 的HashMap对象
Map<String, Object> word-reader = new HashMap<String, Object>();
2) 我们调用put两个对象进去,这两个对象对应的Key值分别为default与FINISH
word-reader.put("default", new Object());
word-reader.put("FINISH", new Object());
3) 有其它的对象需要获取我们put进去的两个对象,首先就需要先获取word-reader的Handler。
WordNormalizerBolt word-normalizer = new WordNormalizerBolt();
word-normalizer.setMap(word-reader);
对应到Storm的代码中,就如下:
1) 首先我们需要创建一个word-reader的对象, —这是bolt代码
builder.setSpout("word-reader",new WordReaderSpout());
2) 我们需要put两个对象,这里与Map可能有点儿差别,因为需要先声明。 — 这是spout的代码
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line")); //declare()相当于declarer.declareStream(“defalut”, new Fields("line"));
declarer.declareStream("FINISH", new Fields("FINISH Reade file"));
}
而put对象则是在nextTuple()中进行的
public void nextTuple() {
... ...
BufferedReader reader = new BufferedReader(fileReader);
try {
// Read all lines
while ((str = reader.readLine()) != null) {
//put一个key为"default"的对象
this.collector.emit(new Values(str), str);
}
} catch (Exception e) {
... ...
}
//put一个key为"FINISH"的对象
this.collector.emit("FINISH", new Values("Finish"));
}
3) bolt获取Map对象 —- 在TopologyBuilder代码中实现
builder.setBolt("word-normalizer", new WordNormalizerBolt())
.shuffleGrouping("word-reader")
.globalGrouping("word-reader", "FINISH");
最后附上源码:http://download.csdn.net/detail/eyoulc123/9514466
时间: 2024-10-30 06:30:18