Storm系列(十九)普通事务ITransactionalSpout及示例

普通事务API详解

public interface ITransactionalSpout<T> extends IComponent {
    public interface Coordinator<X> {
        // 事务初始化
        X initializeTransaction(BigInteger txid, X prevMetadata);
        // 启动事务,返回true表示开始
        boolean isReady();       
        // 结束时调用主要用于释放资源
        void close();
    }
10      
11      public interface Emitter<X> {
12          // 发射batch中的tuple到下一级Bolt
13          void emitBatch(TransactionAttempt tx, X coordinatorMeta, BatchOutputCollector collector);
14          // 根据事务ID进行状态数据的清理
15          void cleanupBefore(BigInteger txid);
16          // 结束时调用主要用于释放资源   
17          void close();
18      }
19          
20      Coordinator<T> getCoordinator(Map conf, TopologyContext context);
21      
22      Emitter<T> getEmitter(Map conf, TopologyContext context);
23  }

 

示例

入口类

public class TestMain {
 
    public static void main(String[] args) {
 
        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(
                "ttbId""spoutid"new TestTransaction(), 1);
        builder.setBolt("bolt1"new TestTransBolt1(), 3).shuffleGrouping(
                "spoutid");
        builder.setBolt("committer"new TestTransBolt2(), 1).shuffleGrouping(
10                  "bolt1");
11   
12          Config conf = new Config();
13          conf.setDebug(false);
14   
15          if (args.length > 0) {
16              try {
17                  StormSubmitter.submitTopology(args[0], conf,
18                          builder.buildTopology());
19              } catch (AlreadyAliveException e) {
20                  e.printStackTrace();
21              } catch (InvalidTopologyException e) {
22                  e.printStackTrace();
23              }
24          } else {
25              LocalCluster localCluster = new LocalCluster();
26              localCluster.submitTopology("mytopology", conf,
27                      builder.buildTopology());
28          }
29      }
30   
31  }

 

普通事务spout

public class TestTransaction implements ITransactionalSpout<TestMetaDate> {
 
    private static final long serialVersionUID = 1L;
    private Map<Long, String> DATA_BASE = null;
 
    public TestTransaction(){
        DATA_BASE = new HashMap<Long, String>();
       
        for (long i=0;i<50;i++){
10              DATA_BASE.put(i, "TestTransaction:"+i);
11          }
12         
13          System.out.println("TestTransaction start");
14      }
15     
16      @Override
17      public void declareOutputFields(OutputFieldsDeclarer declarer) {
18          declarer.declare(new Fields("tx","count"));
19      }
20   
21      @Override
22      public Map<String, Object> getComponentConfiguration() {
23          return null;
24      }
25   
26      @Override
27      public backtype.storm.transactional.ITransactionalSpout.Coordinator<TestMetaDate> getCoordinator(
28              Map conf, TopologyContext context) {
29          System.out.println("TestTransaction getCoordinator start");
30          return new TestCoordinator();
31      }
32   
33      @Override
34      public backtype.storm.transactional.ITransactionalSpout.Emitter<TestMetaDate> getEmitter(
35              Map conf, TopologyContext context) {
36          System.out.println("TestTransaction getEmitter start");
37          return new TestEmitter(DATA_BASE);
38      }
39  }

 

元数据实现类(存储到zookeeper中)

public class TestMetaDate implements Serializable {
 
    private static final long serialVersionUID = 1L;
   
    private long _index;
    private long _size;
   
    public long get_index() {
        return _index;
10      }
11      public void set_index(long _index) {
12          this._index = _index;
13      }
14     
15      public long get_size() {
16          return _size;
17      }
18      public void set_size(long _size) {
19          this._size = _size;
20      }
21     
22      @Override
23      public String toString() {
24          return "[_index=" + _index + ", _size=" + _size + "]";
25      }
26  }

 

元数据协调处理类

public class TestCoordinator implements ITransactionalSpout.Coordinator<TestMetaDate>{
 
    public TestCoordinator(){
        System.out.println("TestCoordinator start");
    }
   
    @Override
    public TestMetaDate initializeTransaction(BigInteger txid,
            TestMetaDate prevMetadata) {
10          long index = 0L;
11          if (null == prevMetadata){
12              index = 0L;
13          }
14          else {
15              index = prevMetadata.get_index()+prevMetadata.get_size();
16          }
17          TestMetaDate metaDate = new TestMetaDate();
18          metaDate.set_index(index);
19          metaDate.set_size(10);
20          System.out.println("开始事务:"+metaDate.toString());
21          return metaDate;
22      }
23   
24      @Override
25      public boolean isReady() {
26          Utils.sleep(1000);
27          return true;
28      }
29   
30      @Override
31      public void close() {
32      }
33     
34  }

 

Batch中的tuple发送处理类

public class TestEmitter implements ITransactionalSpout.Emitter<TestMetaDate> {
 
    private Map<Long, String> _dbMap = null;
 
    public TestEmitter(Map<Long, String> dbMap) {
        System.err.println("start TestEmitter");
        this._dbMap = dbMap;
    }
 
10      @Override
11      public void emitBatch(TransactionAttempt tx, TestMetaDate coordinatorMeta,
12              BatchOutputCollector collector) {
13          long index = coordinatorMeta.get_index();
14          long size = index + coordinatorMeta.get_size();
15          System.err.println("TestEmitter emitBatch size:" + size
16                  + ",_dbMap size:" + _dbMap.size());
17          if (size > _dbMap.size()) {
18              return;
19          }
20          for (; index < size; index++) {
21              if (null == _dbMap.get(index)) {
22                  System.out.println("TestEmitter continue");
23                  continue;
24              }
25              System.err.println("TestEmitter emitBatch index:"+index);
26              collector.emit(new Values(tx, _dbMap.get(index)));
27          }
28      }
29   
30      @Override
31      public void cleanupBefore(BigInteger txid) {
32      }
33   
34      @Override
35      public void close() {
36      }
37   
38  }

 

数据单元统计实现类

public class TestTransBolt1 extends BaseTransactionalBolt {
 
    private static final long serialVersionUID = 1L;
    private BatchOutputCollector _outputCollector;
    private TransactionAttempt _tx;
    private int count = 0;
    private TopologyContext _context;
 
    public TestTransBolt1() {
10          System.out.println("start TestTransBolt1 ");
11      }
12   
13      @Override
14      public void prepare(@SuppressWarnings("rawtypes") Map conf,
15              TopologyContext context, BatchOutputCollector collector,
16              TransactionAttempt id) {
17          this._context = context;
18          this._outputCollector = collector;
19          System.out.println("1 prepare TestTransBolt1 TransactionId:"
20                  + id.getTransactionId() + ",AttemptId:" + id.getAttemptId());
21   
22      }
23   
24      @Override
25      public void execute(Tuple tuple) {
26          _tx = (TransactionAttempt) tuple.getValueByField("tx");
27          String content = tuple.getStringByField("count");
28          System.out.println("1 TaskId:"+_context.getThisTaskId()+",TestTransBolt1 TransactionAttempt "
29                  + _tx.getTransactionId() + "  attemptid" + _tx.getAttemptId());
30          if (null != content && !content.isEmpty()) {
31              count++;
32          }
33      }
34   
35      @Override
36      public void finishBatch() {
37          System.out.println("1 TaskId:"+_context.getThisTaskId()+",finishBatch count:"+count);
38          _outputCollector.emit(new Values(_tx, count));
39      }
40   
41      @Override
42      public void declareOutputFields(OutputFieldsDeclarer declarer) {
43          declarer.declare(new Fields("tx""count"));
44      }
45   
46  }

 

数据汇总统计实现类

public class TestTransBolt2 extends BaseTransactionalBolt implements ICommitter {
 
    private static final long serialVersionUID = 1L;
    private int sum = 0;
    private TransactionAttempt _tx;
    private static int _result = 0;
    private static BigInteger _curtxid=null;
 
    public TestTransBolt2() {
10          System.out.println("TestTransBolt2 start!");
11      }
12   
13      @Override
14      public void prepare(@SuppressWarnings("rawtypes") Map conf,
15              TopologyContext context, BatchOutputCollector collector,
16              TransactionAttempt id) {
17   
18          this._tx = id;
19          System.out.println("TestTransBolt2 prepare TransactionId:" + id);
20      }
21   
22      @Override
23      public void execute(Tuple tuple) {
24          _tx = (TransactionAttempt) tuple.getValueByField("tx");
25          sum += tuple.getIntegerByField("count");
26   
27          System.out.println("TestTransBolt2 execute TransactionAttempt:" + _tx);
28      }
29   
30      @Override
31      public void finishBatch() {
32          System.out.println("finishBatch _curtxid:" + _curtxid
33                  + ",getTransactionId:" + _tx.getTransactionId());
34          if (null == _curtxid || !_curtxid.equals(_tx.getTransactionId())) {
35   
36              System.out.println("****** 1 _curtxid:" + _curtxid
37                      + ",_tx.getTransactionId():" + _tx.getTransactionId());
38   
39              if (null == _curtxid) {
40                  _result = sum;
41              } else {
42                  _result += sum;
43              }
44              _curtxid = _tx.getTransactionId();
45              System.out.println("****** 2 _curtxid:" + _curtxid
46                      + ",_tx.getTransactionId():" + _tx.getTransactionId());
47          }
48   
49          System.out.println("total==========================:" + _result);
50      }
51   
52      @Override
53      public void declareOutputFields(OutputFieldsDeclarer declarer) {
54   
55      }
56  }

 

结果:

TestTransBolt2 prepare TransactionId:5:3709460125605136723
TestTransBolt2 execute TransactionAttempt:5:3709460125605136723
TestTransBolt2 execute TransactionAttempt:5:3709460125605136723
TestTransBolt2 execute TransactionAttempt:5:3709460125605136723
finishBatch _curtxid:4,getTransactionId:5
****** 1 _curtxid:4,_tx.getTransactionId():5
****** 2 _curtxid:5,_tx.getTransactionId():5
total==========================:50
开始事务:[_index=50, _size=10]
TestEmitter emitBatch size:60,_dbMap size:50

时间: 2024-10-10 22:11:45

Storm系列(十九)普通事务ITransactionalSpout及示例的相关文章

Hulu机器学习问题与解答系列 | 十九:主题模型

今天的内容是 [主题模型] 场景描述 基于Bag-Of-Words(或N-gram)的文本表示模型有一个明显的缺陷,就是无法识别出不同的词(或词组)具有相同主题的情况.我们需要一种技术能够将具有相同主题的词(或词组)映射到同一维度上去,于是产生了主题模型(Topic Model).主题模型是一种特殊的概率图模型.想象一下我们如何判定两个不同的词具有相同的主题呢?这两个词可能有更高的概率出现在同一主题的文档中:换句话说,给定某一主题,这两个词的产生概率都是比较高的,而另一些不太相关的词产生的概率则

MySQL系列之九——MySQL事务和隔离级别

DML :DELETEINSERT INTOUPDATE与查询操作有关 INSERT INTO tb_name (col1,col2,...) VALUES (val1,val2,...)[,(val1,val2,...)]字符型 :单引号数值型 :不需要引号日期时间型空值 :NULLREPLACE INTO 替换 用法相同 DELETE :DELETE FROM tb_name WHERE condition;truncate tb_name 清空表并重置AUTOINCREMEN计数器: UP

Storm系列(十)聚流示例

功能:将多个数据源的数据汇集到一个处理单元进行集中分类处理: 入口类TestMain 1  public class TestMain { 2    3      public static void main(String[] args) { 4          TopologyBuilder builder = new TopologyBuilder(); 5          builder.setSpout("random1", new RandomWordSpout1(),

Storm系列(十五)架构分析之Executor-Spout

Spout实现mk-threads接口用于创建与Executor对应的消息循环主函数. defmulti mk-threads executor-selector Mk-threads函数的主消息循环通过async-loop方法实现,若传入的函数为工厂方法,则在第一次调用该方法时进行初始化,并返回用于消息循环的函数. Spout输入处理函数 spout的输入处理函数采用非阻塞的方式从接收队列中获取消息: (disruptor/consume-batch receive-queue event-h

Java设计模式菜鸟系列(十九)备忘录模式建模与实现

转载请注明出处:http://blog.csdn.net/lhy_ycu/article/details/40018967 备忘录模式(Memento): 主要目的是保存一个对象的某个状态,以便在适当的时候恢复对象. 一.uml建模: 二.代码实现 /** * 备忘录模式(Memento):主要目的是保存一个对象的某个状态,以便在适当的时候恢复对象 * * 示例:原始类--> 创建.恢复备忘录 */ class Original { private String state; public Or

Storm系列(十四)架构分析之Executor-输入和输出处理

Executor的数据 mk-executor-data函数用于定义Executor中含有的数据. Executor的输入处理 根据executor-id从Worker的:executor-receive-queue-map中获得Disruptor Queue 如下: 1  receive-queue ((:executor-receive-queue-map worker) executor-id) 说明: Worker的接收线程从ZMQ收到数据后,线程会根据目标的Task Id找到对应的Ex

Storm系列(十六)架构分析之Executor-Bolt

准备消息循环的数据 函数原型: 1  let[executor-sampler (mk-stats-sampler (:storm-conf executor-data))] 主要功能: 定义tuple-action-fn函数,该函数会根据TaskId获得对应的Bolt对象并调用其executor方法. Bolt输入处理函数 函数原型: 1  tuple-action-fn (fn [task-id ^TupleImpl tuple]) 主要功能: 获得Bolt对应的bolt-obj,调用exe

WPF入门教程系列十九——ListView示例(一)

经过前面的学习,今天我做一个比较综合的WPF程序示例,主要包括以下功能: 1) 查询功能.从数据库(本地数据库(local)/Test中的S_City表中读取城市信息数据,然后展示到WPF的Window上的一个ListView上. 2) 数据联动功能.当鼠标左键选中ListView中的某一条记录时,在ListView框下面文本框中显示详细信息. 3) 修改功能.修改TextBox中的内容后,点击“更新”按钮,把修改后的数据保存到数据库中,同时与Listview联动. 第一步.建立一个WPF项目

BizTalk开发系列(十九) BizTalk命名规范

目前BizTalk项目的开发人员比较少,但是在开发过程中还是需要命名规范的约束.根据以往BizTalk项目的经验,整理了BizTalk命 名规范.包括:BizTalk Application, Schema, Map, Pipeline, Orchestration Logic Port, Physical Port等.并提供了相关的示例方便大家参考. <BizTalk命名规范> 通用: 项目中能用英文单词的用单词,否则用拼音. BizTalk项目中的Schema , Map, Orchest