Spout类:
1 package transaction1; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.Random; 6 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.transactional.ITransactionalSpout; 10 import backtype.storm.tuple.Fields; 11 12 public class MyTxSpout implements ITransactionalSpout<MyMata> { 13 14 private static final long serialVersionUID = 1L; 15 16 /** 17 * 数据源 18 */ 19 Map<Long, String> dbMap = null; 20 21 public MyTxSpout(){ 22 23 dbMap = new HashMap<Long, String>(); 24 Random random = new Random(); 25 26 String[] hosts = { "www.taobao.com" }; 27 String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", 28 "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; 29 30 String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 31 "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; 32 33 for (long i = 0; i < 100; i++) { 34 dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); 35 } 36 } 37 38 /** 39 * 为事务性batch发射tuple,coordinator只有一个, 40 * 进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流 41 * 42 */ 43 @Override 44 public backtype.storm.transactional.ITransactionalSpout.Coordinator<MyMata> getCoordinator( 45 Map conf, TopologyContext context) { 46 return new MyCoordinator(); 47 } 48 49 /** 50 * 负责为每个batch实际发射tuple,emitter根据并行度可以有多个实例 51 * 52 */ 53 @Override 54 public backtype.storm.transactional.ITransactionalSpout.Emitter<MyMata> getEmitter( 55 Map conf, TopologyContext context) { 56 return new MyEmitter(dbMap); 57 } 58 59 60 @Override 61 public void declareOutputFields(OutputFieldsDeclarer declarer) { 62 declarer.declare(new Fields("tx","log")); 63 } 64 65 @Override 66 public Map<String, Object> getComponentConfiguration() { 67 return null; 68 } 69 }
MyTxSpout
Coordinator: 为事务性batch发射tuple,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流,coordinator只有一个,
1 package transaction1; 2 3 import java.math.BigInteger; 4 5 import backtype.storm.transactional.ITransactionalSpout; 6 import backtype.storm.utils.Utils; 7 8 public class MyCoordinator implements ITransactionalSpout.Coordinator<MyMata>{ 9 10 //batch中tuple的个数 11 private static int BATCH_NUM = 10; 12 13 /** 14 * 启动一个事务,生产元数据,定义事务开始的位置和数量 15 * @param txid 事务id,默认从0开始 16 * @param prevMetadata 上一个元数据 17 * 18 */ 19 @Override 20 public MyMata initializeTransaction(BigInteger txid, MyMata prevMetadata) { 21 22 long beginPoint = 0; 23 24 if (prevMetadata == null) { 25 //第一个事务,程序刚开始 26 beginPoint = 0; 27 28 }else{ 29 30 beginPoint = prevMetadata.getBeginPoint() + prevMetadata.getNum(); 31 } 32 33 MyMata myMata = new MyMata(); 34 myMata.setBeginPoint(beginPoint); 35 myMata.setNum(BATCH_NUM); 36 37 System.err.println("启动一个事务: "+myMata.toString()); 38 39 return myMata; 40 } 41 42 43 /** 44 * 只有返回为true,开启一个事务进入processing阶段,发射一个事务性的tuple到batch emit流,Emitter以广播方式订阅Coordinator的batch emit流 45 */ 46 @Override 47 public boolean isReady() { 48 49 Utils.sleep(2000); 50 return true; 51 } 52 53 @Override 54 public void close() { 55 56 } 57 }
MyCoordinator
MyEmitter:负责为每个batch实际发射tuple,emitter根据并行度可以有多个实例
package transaction1; import java.math.BigInteger; import java.util.Map; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.transactional.ITransactionalSpout; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.tuple.Values; public class MyEmitter implements ITransactionalSpout.Emitter<MyMata>{ Map<Long, String> dbMap = null; public MyEmitter(){} public MyEmitter(Map<Long, String> dbMap) { this.dbMap = dbMap; } /** * 接收Coordinator事务tuple后,会进行batch tuple的发射,逐个发射batch的tuple * * @param tx * 必须以TransactionAttempt作为第一个field,含两个值:一个transaction id,一个attempt id。 * transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的 * ,而且不管这个batch replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch, * 它replay之后的attempt id跟replay之前就不一样了, */ @Override public void emitBatch(TransactionAttempt tx, MyMata coordinatorMeta, BatchOutputCollector collector) { long beginPoint = coordinatorMeta.getBeginPoint(); long num = coordinatorMeta.getNum(); //每次发送数据源的量的多少,根据元数据决定 for (long i = beginPoint; i < num+beginPoint; i++) { if (dbMap.get(i)==null) { continue; } collector.emit(new Values(tx,dbMap.get(i))); } } //清理之前事务的信息 @Override public void cleanupBefore(BigInteger txid) { } @Override public void close() { } }
MyEmitter
元数据:
1 package transaction1; 2 3 import java.io.Serializable; 4 5 /** 6 * 定义元数据 7 */ 8 public class MyMata implements Serializable{ 9 10 private static final long serialVersionUID = 1L; 11 12 private Long beginPoint ;//事务开始位置 13 14 private int num; //batch 的tuple个数 15 16 public Long getBeginPoint() { 17 return beginPoint; 18 } 19 20 public void setBeginPoint(Long beginPoint) { 21 this.beginPoint = beginPoint; 22 } 23 24 public int getNum() { 25 return num; 26 } 27 28 public void setNum(int num) { 29 this.num = num; 30 } 31 32 @Override 33 public String toString() { 34 return getBeginPoint()+"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+getNum(); 35 } 36 37 38 }
BaseTransactionalBolt:
1 package transaction1; 2 3 import java.util.Map; 4 5 import backtype.storm.coordination.BatchOutputCollector; 6 import backtype.storm.task.TopologyContext; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.topology.base.BaseTransactionalBolt; 9 import backtype.storm.transactional.TransactionAttempt; 10 import backtype.storm.tuple.Fields; 11 import backtype.storm.tuple.Tuple; 12 import backtype.storm.tuple.Values; 13 14 public class MyTransactionBolt extends BaseTransactionalBolt{ 15 16 private static final long serialVersionUID = 1L; 17 18 long count = 0; 19 20 BatchOutputCollector collector ; 21 TransactionAttempt id; 22 @Override 23 public void prepare(Map conf, TopologyContext context, 24 BatchOutputCollector collector, TransactionAttempt id) { 25 this.collector = collector; 26 this.id = id; 27 System.err.println("MyTransactionBolt prepare :~~~~~~~~~~ TransactionId :"+id.getTransactionId() + " AttemptId : "+id.getAttemptId()); 28 } 29 30 /** 31 * 执行batch里面处理每一个tuple 32 */ 33 @Override 34 public void execute(Tuple tuple) { 35 36 TransactionAttempt tx = (TransactionAttempt)tuple.getValue(0); 37 System.err.println("MyTransactionBolt execute ~~~~~~~~~~ TransactionId :"+tx.getTransactionId() + " AttemptId : "+tx.getAttemptId()); 38 String log = tuple.getStringByField("log"); 39 40 if (log != null && log.length()> 0) { 41 count ++; 42 } 43 } 44 45 /** 46 * 同一个批次处理完一个批次调用 47 */ 48 @Override 49 public void finishBatch() { 50 System.err.println("finishBatch "+count ); 51 //继续发射 52 collector.emit(new Values(id,count)); 53 } 54 55 @Override 56 public void declareOutputFields(OutputFieldsDeclarer declarer) { 57 declarer.declare(new Fields("tx","count")); 58 } 59 }
MyTransactionBolt
MyCommiter:
1 package transaction1; 2 3 import java.math.BigInteger; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import backtype.storm.coordination.BatchOutputCollector; 8 import backtype.storm.task.TopologyContext; 9 import backtype.storm.topology.OutputFieldsDeclarer; 10 import backtype.storm.topology.base.BaseTransactionalBolt; 11 import backtype.storm.transactional.ICommitter; 12 import backtype.storm.transactional.TransactionAttempt; 13 import backtype.storm.tuple.Fields; 14 import backtype.storm.tuple.Tuple; 15 16 /** 17 * Commiting阶段 18 * 19 * 20 */ 21 public class MyCommiter extends BaseTransactionalBolt implements ICommitter { 22 23 public static Map<String,DBValue> dbMap = new HashMap<String, MyCommiter.DBValue>(); 24 25 private static final long serialVersionUID = 1L; 26 27 public static final String GLOBAL_KEY = "GLOBAL_KEY"; 28 29 long sum = 0; 30 31 TransactionAttempt id; 32 33 BatchOutputCollector collector; 34 35 @Override 36 public void prepare(Map conf, TopologyContext context, 37 BatchOutputCollector collector, TransactionAttempt id) { 38 this.id = id; 39 this.collector = collector; 40 } 41 42 @Override 43 public void execute(Tuple tuple) { 44 45 TransactionAttempt tx = (TransactionAttempt) tuple.getValue(0); 46 Long count = tuple.getLong(1); 47 sum += count; 48 } 49 50 @Override 51 public void finishBatch() { 52 53 //更新数据库 54 DBValue value = dbMap.get("GLOBAL_KEY"); 55 DBValue newvalue ; 56 57 //第一次写入或者写入最新的数据 58 if (value == null || !value.txid.equals(id.getTransactionId())) { 59 60 newvalue = new DBValue(); 61 newvalue.txid = id.getTransactionId(); 62 63 //第一次 64 if (value == null) { 65 newvalue.count = sum; 66 }else{ 67 newvalue.count =value.count+sum; 68 } 69 dbMap.put(GLOBAL_KEY, newvalue); 70 }else{ 71 newvalue = value; 72 } 73 74 System.out.println("total----------------------------->"+dbMap.get(GLOBAL_KEY).count); 75 76 77 //发送结果到下一级 collector.emit(new Values()); 78 } 79 80 @Override 81 public void declareOutputFields(OutputFieldsDeclarer declarer) { 82 declarer.declare(new Fields("")); 83 } 84 85 public static class DBValue{ 86 87 BigInteger txid; 88 long count =0; 89 } 90 }
MyCommiter
MyToPo:
1 package transaction1; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.transactional.TransactionalTopologyBuilder; 7 8 9 public class MyToPo { 10 public static void main(String [] args) throws Exception{ 11 12 TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "txSpoutId", new MyTxSpout(),1); 13 builder.setBolt("txBolt",new MyTransactionBolt(),5).shuffleGrouping("txSpoutId"); 14 builder.setCommitterBolt("commit", new MyCommiter(),1).shuffleGrouping("txBolt"); 15 16 //设置参数 17 Config conf = new Config(); 18 19 if (args.length > 0) { 20 //分布式提交 21 StormSubmitter.submitTopology(args[0], conf, builder.buildTopology()); 22 }else{ 23 //本地模式提交 24 LocalCluster localCluster = new LocalCluster(); 25 localCluster.submitTopology("mytopology", conf, builder.buildTopology()); 26 } 27 } 28 }
MyToPo
1 TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "txSpoutId", new MyTxSpout(),1); 2 builder.setBolt("txBolt",new MyTransactionBolt(),5).shuffleGrouping("txSpoutId"); 3 builder.setCommitterBolt("commit", new MyCommiter(),1).shuffleGrouping("txBolt");
运行结果:
时间: 2024-10-31 09:32:33