storm高级原语-Transactional topology

参考:

  1. http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/
  2. http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/

示例代码:

package com.lky.topology;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;

import com.lky.util.FileUtil;
import com.lky.util.RunStorm;

import backtype.storm.Config;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.MemoryTransactionalSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.topology.base.BaseTransactionalBolt;
import backtype.storm.transactional.ICommitter;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.transactional.TransactionalTopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

@SuppressWarnings({ "deprecation", "serial", "rawtypes" })
/**
 * @Title: TransactionalGlobalCount.java
 * @Package com.lky.topology
 * @Description: 事务topology(模拟实时统计消息数量)
 * @author lky
 * @date 2015年10月25日 上午11:23:12
 * @version V1.0
 */
public class TransactionalGlobalCount {
    private static Log log=LogFactory.getLog(TransactionalGlobalCount.class);
    public static final int PARTITION_TAKE_PER_BATCH = 3;
    public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {
        {
            put(0, new ArrayList<List<Object>>() {
                {
                    add(new Values("cat"));
                    add(new Values("dog"));
                    add(new Values("chicken"));
                    add(new Values("cat"));
                    add(new Values("dog"));
                    add(new Values("apple"));
                }
            });
            put(1, new ArrayList<List<Object>>() {
                {
                    add(new Values("cat"));
                    add(new Values("dog"));
                    add(new Values("apple"));
                    add(new Values("banana"));
                }
            });
            put(2, new ArrayList<List<Object>>() {
                {
                    add(new Values("cat"));
                    add(new Values("cat"));
                    add(new Values("cat"));
                    add(new Values("cat"));
                    add(new Values("cat"));
                    add(new Values("dog"));
                    add(new Values("dog"));
                    add(new Values("dog"));
                    add(new Values("dog"));
                }
            });
        }
    };

    public static class Value {
        int count = 0;
        BigInteger txid;
    }

    public static Map<String, Value> DATABASE = new HashMap<String, Value>();
    public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";

    /**
    * @Title: TransactionalGlobalCount.java
    * @Package com.lky.topology
    * @Description: processing阶段(可以并行处理)
    * @author lky
    * @date 2015年10月25日 下午12:14:26
    * @version V1.0
     */
    public static class BatchCount extends BaseBatchBolt {
        BatchOutputCollector collector;
        Object id;
        Integer _count = 0;

        @Override
        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            this.collector = collector;
            this.id = id;
        }

        @Override
        public void execute(Tuple tuple) {
            _count++;
            log.info("-------------->"+_count);
        }

        @Override
        public void finishBatch() {
            log.info("--------"+_count+"----------");
            collector.emit(new Values(id, _count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "count"));
        }

    }

    /**
    * @Title: TransactionalGlobalCount.java
    * @Package com.lky.topology
    * @Description: committer 汇总阶段(强顺序流)
    * @author lky
    * @date 2015年10月25日 下午12:14:54
    * @version V1.0
     */
    public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {

        BatchOutputCollector collector;
        TransactionAttempt id;
        Integer _size = 0;

        @Override
        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
            this.collector = collector;
            this.id = id;
        }

        @Override
        public void execute(Tuple tuple) {
            Integer sum = tuple.getInteger(1);
            log.info("sum---------->"+sum);
            if (sum > 0) {
                _size += sum;
            }
        }

        @Override
        public void finishBatch() {
            Value oldValue = DATABASE.get(GLOBAL_COUNT_KEY);
            Value newValue;

            // 如果没有存储过,或者有新的事务到达,更新
            if (null == oldValue || !oldValue.txid.equals(id.getTransactionId())) {
                newValue = new Value();
                newValue.txid = id.getTransactionId();
                if (null == oldValue) {
                    newValue.count = _size;
                } else {
                    newValue.count = _size + oldValue.count;
                    collector.emit(new Values(id, newValue.count));
                    FileUtil.strToFile(Integer.valueOf(newValue.count).toString(), "sum.txt", true);
                }

                DATABASE.put(GLOBAL_COUNT_KEY, newValue);
            } else {
                newValue = oldValue;
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "size"));
        }

    }

    @Test
    public void test() {
         MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
            TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
            builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
            builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");

            Config config = new Config();
            config.setDebug(true);
            config.setMaxSpoutPending(3);

            RunStorm.runStormLocally(builder.buildTopology(), "ss", config, 5);
    }
}
时间: 2024-10-24 08:13:16

storm高级原语-Transactional topology的相关文章

Storm高级原语(二) -- DRPC详解

Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU密集型(CPU intensive)的计算任务.DRPC的stormtopology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流. DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语stream.spout.bolt. topology而成的一种模式(pattern).本来应该把DRPC单独打成一个包的, 但是DRPC实在是太有用了,所以我们我们把它和storm捆绑

Storm高级原语(四)Trident API 综述

"Stream"是Trident中的核心数据模型,它被当做一系列的batch来处理.在Storm集群的节点之间,一个stream被划分成很多partition(分区),对流的操作(operation)是在每个partition上并行进行的. 注: ①"Stream"是Trident中的核心数据模型:有些地方也说是TridentTuple,没有个标准的说法. ②一个stream被划分成很多partition:partition是stream的一个子集,里面可能有多个b

Storm基本概念以及Topology的并发度

Spouts,流的源头 Spout是Storm里面特有的名词,Stream的源头,通常是从外部数据源读取tuples,并emit到topology Spout可以同时emit多个tupic stream,通过OutputFieldsDeclarer中的declareStream,method来定义 Spout需要实现RichSpout端口,最重要的方法是nextTuple,storm会不断调用接口从spout中取数据,同时需要注意的是Spout分为reliable or unreliable两种

Storm高级属性

之前写了2篇文章讲述了Storm的基本知识.最近也学习了Storm的高级属性了,单纯粹还是属于了解的状态.我就简单介绍几个比较简单的属性. 1.DRPC.简称分布式RPC,模型图: 用户直接面对的是DRPC Server,避开了所有的拓扑逻辑,Storm提供了一个名为LinearDRPCTopologyBuilder的线性拓扑构建器,他把DRPC所做的几乎所有步骤都自动实现了.本地模式的代码例子如下: LocalDRPC drpc = new LocalDRPC(); LocalCluster

storm源代码分析---Transactional spouts

Transactionalspouts Trident是以小批量(batch)的形式在处理tuple.而且每一批都会分配一个唯一的transaction id.不同spout的特性不同,一个transactionalspout会有例如以下这些特性: 1.有着相同txid的batch一定是一样的. 当重播一个txid相应的batch时,一定会重播和之前相应txid的batch中相同的tuples. 2.各个batch之间是没有交集的.每一个tuple仅仅能属于一个batch 3.每个tuple都属

使用Thrift API监控Storm集群和Topology

如要监控Storm集群和运行在其上的Topology,该如何做呢? Storm已经为你考虑到了,Storm支持Thrift的C/S架构,在部署Nimbus组件的机器上启动一个Thrift Server进程来提供服务,我们可以通过编写一个Thrift Client来请求Thrift Server,来获取你想得到的集群和Topology的相关数据,来接入监控平台,如Zabbix等,我目前使用的就是Zabbix. 整体的流程已经清楚了,下面就来实践吧. 1 安装Thrift 由于我们要使用Thrift

第3节 storm高级应用:5、定时器任务

5.storm的定时器以及与mysql的整合使用 功能需求:实现每五秒钟打印出当前时间,并将发送出来的数据存入到mysql数据库当中. 详见代码. 原文地址:https://www.cnblogs.com/mediocreWorld/p/11273557.html

storm transaction

storm transaction storm的事务主要用于对数据准确性要求非常高的环境中,尤其是在计算交易金额或笔数,数据库同步的场景中. storm 事务逻辑是挺复杂的,而且坦白讲,代码写的挺烂的. JStorm下一步将重新设计基于Meta 1 和Meta3 的事务模型,让使用者更简便,代码更清晰. 一个基本的例子 你可以通过使用TransactionalTopologyBuilder来创建transactional topology. 下面就是一个transactional topolog

Storm入门教程 第五章 一致性事务【转】

Storm是一个分布式的流处理系统,利用anchor和ack机制保证所有tuple都被成功处理.如果tuple出错,则可以被重传,但是如何保证出错的tuple只被处理一次呢?Storm提供了一套事务性组件Transaction Topology,用来解决这个问题. Transactional Topology目前已经不再维护,由Trident来实现事务性topology,但是原理相同. 5.1一致性事务的设计 Storm如何实现即对tuple并行处理,又保证事务性.本节从简单的事务性实现方法入手