Storm批处理之ITransactionalSpout普通事务Spout

Spout类:

 1 package transaction1;
 2
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.Random;
 6
 7 import backtype.storm.task.TopologyContext;
 8 import backtype.storm.topology.OutputFieldsDeclarer;
 9 import backtype.storm.transactional.ITransactionalSpout;
10 import backtype.storm.tuple.Fields;
11
12 public class MyTxSpout implements ITransactionalSpout<MyMata> {
13
14     private static final long serialVersionUID = 1L;
15
16     /**
17      * 数据源
18      */
19     Map<Long, String>  dbMap = null;
20
21     public MyTxSpout(){
22
23         dbMap = new HashMap<Long, String>();
24         Random random = new Random();
25
26         String[] hosts = { "www.taobao.com" };
27         String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
28                 "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
29
30         String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
31                 "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
32
33         for (long i = 0; i < 100; i++) {
34             dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
35         }
36     }
37
38     /**
39      * 为事务性batch发射tuple,coordinator只有一个,
40      * 进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流
41      *
42      */
43     @Override
44     public backtype.storm.transactional.ITransactionalSpout.Coordinator<MyMata> getCoordinator(
45             Map conf, TopologyContext context) {
46         return new MyCoordinator();
47     }
48
49     /**
50      * 负责为每个batch实际发射tuple,emitter根据并行度可以有多个实例
51      *
52      */
53     @Override
54     public backtype.storm.transactional.ITransactionalSpout.Emitter<MyMata> getEmitter(
55             Map conf, TopologyContext context) {
56         return new MyEmitter(dbMap);
57     }
58
59
60     @Override
61     public void declareOutputFields(OutputFieldsDeclarer declarer) {
62         declarer.declare(new Fields("tx","log"));
63     }
64
65     @Override
66     public Map<String, Object> getComponentConfiguration() {
67         return null;
68     }
69 }

MyTxSpout

Coordinator: 为事务性batch发射tuple,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流,coordinator只有一个,

 1 package transaction1;
 2
 3 import java.math.BigInteger;
 4
 5 import backtype.storm.transactional.ITransactionalSpout;
 6 import backtype.storm.utils.Utils;
 7
 8 public class MyCoordinator implements ITransactionalSpout.Coordinator<MyMata>{
 9
10     //batch中tuple的个数
11     private static int BATCH_NUM = 10;
12
13     /**
14      * 启动一个事务,生产元数据,定义事务开始的位置和数量
15      * @param  txid 事务id,默认从0开始
16      * @param  prevMetadata 上一个元数据
17      *
18      */
19     @Override
20     public MyMata initializeTransaction(BigInteger txid, MyMata prevMetadata) {
21
22         long beginPoint = 0;
23
24         if (prevMetadata == null) {
25             //第一个事务,程序刚开始
26             beginPoint = 0;
27
28         }else{
29
30             beginPoint = prevMetadata.getBeginPoint() + prevMetadata.getNum();
31         }
32
33         MyMata myMata = new MyMata();
34         myMata.setBeginPoint(beginPoint);
35         myMata.setNum(BATCH_NUM);
36
37         System.err.println("启动一个事务: "+myMata.toString());
38
39         return myMata;
40     }
41
42
43     /**
44      * 只有返回为true,开启一个事务进入processing阶段,发射一个事务性的tuple到batch emit流,Emitter以广播方式订阅Coordinator的batch emit流
45      */
46     @Override
47     public boolean isReady() {
48
49         Utils.sleep(2000);
50         return true;
51     }
52
53     @Override
54     public void close() {
55
56     }
57 }

MyCoordinator

MyEmitter:负责为每个batch实际发射tuple,emitter根据并行度可以有多个实例

package transaction1;

import java.math.BigInteger;
import java.util.Map;

import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Values;

public class MyEmitter implements ITransactionalSpout.Emitter<MyMata>{

    Map<Long, String>  dbMap = null;

    public MyEmitter(){}

    public MyEmitter(Map<Long, String> dbMap) {

        this.dbMap = dbMap;
    }

    /**
     * 接收Coordinator事务tuple后,会进行batch tuple的发射,逐个发射batch的tuple
     *
     * @param tx
     *         必须以TransactionAttempt作为第一个field,含两个值:一个transaction id,一个attempt id。
     *         transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的
     *         ,而且不管这个batch    replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,
     *         它replay之后的attempt id跟replay之前就不一样了,
     */
    @Override
    public void emitBatch(TransactionAttempt tx, MyMata coordinatorMeta,
            BatchOutputCollector collector) {

        long beginPoint = coordinatorMeta.getBeginPoint();
        long num = coordinatorMeta.getNum();

        //每次发送数据源的量的多少,根据元数据决定
        for (long i = beginPoint; i < num+beginPoint; i++) {

            if (dbMap.get(i)==null) {
                continue;
            }
            collector.emit(new Values(tx,dbMap.get(i)));
        }
    }

    //清理之前事务的信息
    @Override
    public void cleanupBefore(BigInteger txid) {

    }

    @Override
    public void close() {

    }

}

MyEmitter

元数据:

 1 package transaction1;
 2
 3 import java.io.Serializable;
 4
 5 /**
 6  * 定义元数据
 7  */
 8 public class MyMata  implements Serializable{
 9
10     private static final long serialVersionUID = 1L;
11
12     private Long beginPoint ;//事务开始位置
13
14     private int  num; //batch 的tuple个数
15
16     public Long getBeginPoint() {
17         return beginPoint;
18     }
19
20     public void setBeginPoint(Long beginPoint) {
21         this.beginPoint = beginPoint;
22     }
23
24     public int getNum() {
25         return num;
26     }
27
28     public void setNum(int num) {
29         this.num = num;
30     }
31
32     @Override
33     public String toString() {
34         return getBeginPoint()+"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+getNum();
35     }
36
37
38 }

BaseTransactionalBolt:

 1 package transaction1;
 2
 3 import java.util.Map;
 4
 5 import backtype.storm.coordination.BatchOutputCollector;
 6 import backtype.storm.task.TopologyContext;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.topology.base.BaseTransactionalBolt;
 9 import backtype.storm.transactional.TransactionAttempt;
10 import backtype.storm.tuple.Fields;
11 import backtype.storm.tuple.Tuple;
12 import backtype.storm.tuple.Values;
13
14 public class MyTransactionBolt extends BaseTransactionalBolt{
15
16     private static final long serialVersionUID = 1L;
17
18     long count = 0;
19
20     BatchOutputCollector collector ;
21     TransactionAttempt id;
22     @Override
23     public void prepare(Map conf, TopologyContext context,
24             BatchOutputCollector collector, TransactionAttempt id) {
25         this.collector = collector;
26         this.id = id;
27         System.err.println("MyTransactionBolt   prepare :~~~~~~~~~~ TransactionId :"+id.getTransactionId() + " AttemptId : "+id.getAttemptId());
28     }
29
30     /**
31      * 执行batch里面处理每一个tuple
32      */
33     @Override
34     public void execute(Tuple tuple) {
35
36         TransactionAttempt tx = (TransactionAttempt)tuple.getValue(0);
37         System.err.println("MyTransactionBolt  execute ~~~~~~~~~~ TransactionId :"+tx.getTransactionId() + " AttemptId : "+tx.getAttemptId());
38         String log = tuple.getStringByField("log");
39
40         if (log != null && log.length()> 0) {
41                 count ++;
42         }
43     }
44
45     /**
46      * 同一个批次处理完一个批次调用
47      */
48     @Override
49     public void finishBatch() {
50         System.err.println("finishBatch "+count );
51         //继续发射
52         collector.emit(new Values(id,count));
53     }
54
55     @Override
56     public void declareOutputFields(OutputFieldsDeclarer declarer) {
57         declarer.declare(new Fields("tx","count"));
58     }
59 }

MyTransactionBolt

MyCommiter:

 1 package transaction1;
 2
 3 import java.math.BigInteger;
 4 import java.util.HashMap;
 5 import java.util.Map;
 6
 7 import backtype.storm.coordination.BatchOutputCollector;
 8 import backtype.storm.task.TopologyContext;
 9 import backtype.storm.topology.OutputFieldsDeclarer;
10 import backtype.storm.topology.base.BaseTransactionalBolt;
11 import backtype.storm.transactional.ICommitter;
12 import backtype.storm.transactional.TransactionAttempt;
13 import backtype.storm.tuple.Fields;
14 import backtype.storm.tuple.Tuple;
15
16 /**
17  * Commiting阶段
18  *
19  *
20  */
21 public class MyCommiter extends BaseTransactionalBolt implements ICommitter {
22
23     public static Map<String,DBValue> dbMap = new HashMap<String, MyCommiter.DBValue>();
24
25     private static final long serialVersionUID = 1L;
26
27     public static final String GLOBAL_KEY = "GLOBAL_KEY";
28
29     long sum = 0;
30
31     TransactionAttempt id;
32
33     BatchOutputCollector collector;
34
35     @Override
36     public void prepare(Map conf, TopologyContext context,
37             BatchOutputCollector collector, TransactionAttempt id) {
38         this.id = id;
39         this.collector = collector;
40     }
41
42     @Override
43     public void execute(Tuple tuple) {
44
45         TransactionAttempt tx = (TransactionAttempt) tuple.getValue(0);
46         Long count = tuple.getLong(1);
47         sum += count;
48     }
49
50     @Override
51     public void finishBatch() {
52
53         //更新数据库
54         DBValue value = dbMap.get("GLOBAL_KEY");
55         DBValue newvalue ;
56
57         //第一次写入或者写入最新的数据
58         if (value == null || !value.txid.equals(id.getTransactionId())) {
59
60             newvalue = new DBValue();
61             newvalue.txid = id.getTransactionId();
62
63             //第一次
64             if (value == null) {
65                 newvalue.count = sum;
66             }else{
67                 newvalue.count =value.count+sum;
68             }
69             dbMap.put(GLOBAL_KEY, newvalue);
70         }else{
71             newvalue = value;
72         }
73
74         System.out.println("total----------------------------->"+dbMap.get(GLOBAL_KEY).count);
75
76
77         //发送结果到下一级   collector.emit(new Values());
78     }
79
80     @Override
81     public void declareOutputFields(OutputFieldsDeclarer declarer) {
82         declarer.declare(new Fields(""));
83     }
84
85      public static class DBValue{
86
87          BigInteger txid;
88          long  count =0;
89     }
90 }

MyCommiter

MyToPo:

 1 package transaction1;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.transactional.TransactionalTopologyBuilder;
 7
 8
 9 public class MyToPo {
10     public static void main(String [] args) throws Exception{
11
12         TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "txSpoutId", new MyTxSpout(),1);
13         builder.setBolt("txBolt",new MyTransactionBolt(),5).shuffleGrouping("txSpoutId");
14         builder.setCommitterBolt("commit", new MyCommiter(),1).shuffleGrouping("txBolt");
15
16         //设置参数
17         Config conf = new Config();
18
19         if (args.length > 0) {
20             //分布式提交
21             StormSubmitter.submitTopology(args[0], conf, builder.buildTopology());
22         }else{
23             //本地模式提交
24             LocalCluster localCluster = new LocalCluster();
25             localCluster.submitTopology("mytopology", conf, builder.buildTopology());
26         }
27     }
28 }

MyToPo

1 TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "txSpoutId", new MyTxSpout(),1);
2  builder.setBolt("txBolt",new MyTransactionBolt(),5).shuffleGrouping("txSpoutId");
3  builder.setCommitterBolt("commit", new MyCommiter(),1).shuffleGrouping("txBolt");

运行结果:

时间: 2024-08-13 20:14:21

Storm批处理之ITransactionalSpout普通事务Spout的相关文章

Storm批处理事务API详解

看此博文前,建议先查看 Storm批处理事务原理详解 为什么要进行批处理(Batch)? 逐个处理单个tuple,增加很多开销,如写库.输出结果频率过高 事务处理单个tuple效率比较低,因此storm中引入batch处理 批处理是一次性处理一批(batch)tuple,而事务则确保该批次要么全部处理成功,如果有处理失败的则全部不计,Storm会对失败的批次重新发送,且确保每个batch被且仅被处理一次 Spout有三种: 分别为: 1. ITransactionalSpout<T>,同Bas

Storm系列(十九)普通事务ITransactionalSpout及示例

普通事务API详解 1  public interface ITransactionalSpout<T> extends IComponent { 2      public interface Coordinator<X> { 3          // 事务初始化 4          X initializeTransaction(BigInteger txid, X prevMetadata); 5          // 启动事务,返回true表示开始 6        

Storm批处理事务原理详解

1.事务-批处理 对于容错机制,Storm通过一个系统级别的组件acker,结合xor校验机制判断一个tuple是否发送成功,进而spout可以重发该tuple ,保证一个tuple在k\出错的情况下至少被重发一次. 但是在需要精确统计tuple的数量如销售金额场景时,希望每个tuple”被且仅被处理一次” .Storm 0.7.0引入了Transactional Topology, 它可以保证每个tuple”被且仅被处理一次”, 这样我们就可以实现一种非常准确,且高度容错方式来实现计数类应用.

Java-JDBC调用批处理、存储过程、事务

一.使用Batch批量处理数据库  当需要向数据库发送一批SQL语句执行时,应避免向数据库一条条的发送执行,而应采用JDBC的批处理机制,以提升执行效率.; 1.实现批处理有两种方式,第一种方式: Statement.addBatch(sql)  list 执行批处理SQL语句 executeBatch()方法:执行批处理命令 clearBatch()方法:清除批处理命令 例: Connection conn = null; Statement st = null; ResultSet rs =

Storm事务Topology的接口介绍

ITransactionalSpout 基本事务Topology的Spout接口,内含两部分接口:协调Spout接口以及消息发送Blot接口. TransactionalSpoutBatchExecutor Bolt类型,用于执行ITransactionalSpout中的消息发送Bolt节点. TransactionalSpoutCoordinator Spout类型,用于执行ITransactionalSpout中的协调Spout节点,是系统中唯一的Spout节点,具体功能为初始化事务以及产生

1 storm基本概念 + storm编程规范及demo编写

本博文的主要内容有 .Storm的单机模式安装 .Storm的分布式安装(3节点)   .No space left on device .storm工程的eclipse的java编写 http://storm.apache.org/ 分布式的一个计算系统,但是跟mr不一样,就是实时的,实时的跟Mr离线批处理不一样. 离线mr主要是做数据挖掘.数据分析.数据统计和br分析. Storm,主要是在线的业务系统.数据像水一样,源源不断的来,然后,在流动的过程中啊,就要把数据处理完.比如说,一些解析,

storm详细介绍

Storm应用场景 Twitter列举了Storm的三大类应用: 1. 信息流处理{Stream processing}Storm可用来实时处理新数据和更新数据库,兼具容错性和可扩展性. 2. 连续计算{Continuous computation}Storm可进行连续查询并把结果即时反馈给客户端.比如把Twitter上的热门话题发送到浏览器中. 3. 分布式远程程序调用{Distributed RPC} Storm可用来并行处理密集查询.Storm的拓扑结构是一个等待调用信息的分布函数,当它收

Storm笔记——技术点汇总

目录 · 概述 · 手工搭建集群 · 引言 · 安装Python · 配置文件 · 启动与测试 · 应用部署 · 参数配置 · Storm命令 · 原理 · Storm架构 · Storm组件 · Stream Grouping · 守护进程容错性(Daemon Fault Tolerance) · 数据可靠性(Guaranteeing Message Processing) · 消息传输机制 · API · WordCount示例 · 应用部署方式 · 组件接口 · 组件实现类 · 数据连接方

Storm概念、原理详解及其应用(一)BaseStorm

本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出.写这篇文章,是想把一些官文和资料中基础.重点拿出来,能总结出便于大家理解的话语.与大多数"wordcount"代码不同的是,并不会有如何运行第一storm代码等内容,只有在运行完代码后,发现需要明白:"知其然,并知其所以然". Storm是什么?为什么要用Storm?为什么不用Spark? 第一个问题,以下概念足以解释: Storm是基于数据流的实时处理系统,提供了大吞吐量的实