什么是Trident?
1、基于Storm用于实时计算的高级抽象源语;
有何优势?
1、支持高吞吐(每秒百万级别),有状态的流处理;
2、提供低延时的分布式查询功能;
3、Trident具有连接、聚合、分组、自定义行为和过滤的功能;
4、基于内存或数据库做有状态的增量式的计算;
5、能够保证每个Tuple严格只被执行一次
6、构建Topology简单;
例子:统计各个单词出现的次数
Spout
用于接收外部数据,转化为Tuple;
package trident.test.demo2; import backtype.storm.Config; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.operation.TridentCollector; import storm.trident.spout.IBatchSpout; import java.util.Arrays; import java.util.List; import java.util.Map; /** * User: jianwl * Date: 2016/2/24 * Time: 16:12 */ public class TridentSpoutDemo implements IBatchSpout{ private static final List<String> sentences = Arrays.asList( "the cow jumped over the moon", "the man went to the store and bought some candy", "four score and seven years ago"); private boolean complete = false; @Override public void open(Map map, TopologyContext topologyContext) { // nothing do } @Override public void emitBatch(long l, TridentCollector tridentCollector) { if(complete){ return; } for(int i=0; i< sentences.size() ; i++){ tridentCollector.emit(new Values(sentences.get(i))); } complete = true; } @Override public void ack(long l) { // nothing do } @Override public void close() { // nothing do } @Override public Map getComponentConfiguration() { return new Config(); } @Override public Fields getOutputFields() { return new Fields("sentence"); } }
创建Topology
public class TridentTopo { public static void main(String[] args) { TridentTopology topology = new TridentTopology(); topology.newStream("spout", new TridentSpoutDemo()) .each(new Fields("sentence"), new Split(), new Fields("word")) .each(new Fields("word"), new PrintFunction(),new Fields("field","count")); Config config = new Config(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("trident-demo2", config, topology.build()); } }
public class Split extends BaseFunction { public Split() { } public void execute(TridentTuple tuple, TridentCollector collector) { String[] arr$ = tuple.getString(0).split(" "); int len$ = arr$.length; for(int i$ = 0; i$ < len$; ++i$) { String word = arr$[i$]; if(word.length() > 0) { collector.emit(new Values(new Object[]{word})); } } }}
public class PrintFunction extends BaseFunction { private final static Map<String,Integer> map = new HashMap<>(); @Override public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) { String input = tridentTuple.getString(0); if(!map.containsKey(input)){ map.put(input,1); }else{ int count = map.get(input) + 1; map.put(input,count); } System.out.println("result ==> "+map); }}
时间: 2024-10-21 10:26:49