storm的trident编程模型

storm的基本概念别人总结的,
https://blog.csdn.net/pickinfo/article/details/50488226 
编程模型最关键最难就是实现局部聚合的业务逻辑聚合类实现Aggregator接口重写方法aggregate,聚合使用存储中间聚合过程状态的类,本地hashmap的去重逻辑还有加入redis后进行的一些去重操作,数据的持久(判断三天内的带播控量)
public class SaleSum implements Aggregator<SaleSumState> {
    private Logger logger  = org.slf4j.LoggerFactory.getLogger(SaleSum.class);

    /**
     *
     */
    private static final long serialVersionUID = -6879728480425771684L;

    private int partitionIndex ;
    @Override
    public SaleSumState init(Object batchId, TridentCollector collector) {
        return new SaleSumState();

    }

    @Override
    public void aggregate(SaleSumState val, TridentTuple tuple, TridentCollector collector) {
      double oldSum=val.saleSum;
      double price=tuple.getDoubleByField("price");
      double newSum=oldSum+price;
      val.saleSum=newSum;
      }

    @Override
    public void complete(SaleSumState val, TridentCollector collector) {
        collector.emit(new Values(val.saleSum));
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {

    }

    @Override
    public void cleanup() {

    }
}

public class TridentDemo {

public static final String SPOUT_ID = "kafak_spout";

public static void main(String[] args) {

1、创建一个strom此程序的topology 为TridentTopology
TridentTopology topology = new TridentTopology();2、连接kafka的三要素:zk地址:port  topic        //1.从kafak读取数据,        //只会被成功处理 一次 ,有且只有此一次 提供容错机制  处理失败会在后续的批次进行提交        BrokerHosts zkHost = new ZkHosts("hadoop01:2181,hadoop02:2181,hadoop03:2181");        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHost, "test");//两种构造器         定义从哪消费相当于spark中earliest与largest        kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());        //透明事务kafka的spout        OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);        //严格模式的事务级别        TransactionalTridentKafkaSpout kafkaSpout1 = new TransactionalTridentKafkaSpout(kafkaConfig);        //普通的kafak级别 {"str","msg"}        //严格的kafak级别 {"str","msg",上一批次的值}        Stream stream = topology.newStream(SPOUT_ID, kafkaSpout);        // stream.each(new Fields("str"),new PrintTestFilter2());3.进行日志数据的解析,自定义解析类实现了Funtion接口,重写execute方法进行字段解析,在发送出来collector.emit(new Values(timestamp,yyyyMMddStr,yyyyMMddHHStr,yyyyMMddHHmmStr,consumer,productName,price,country,province,city));
   进去的字段名定义为"str",出来的解析字段分别定义了字段名 ,后续做打印测试Stream hasPraseSteam = stream.each(new Fields("str"), new ParseFunction(), new Fields("timeStamp", "yyyyMMddStr", "yyyyMMddHHStr", "yyyyMMddHHmmStr", "consumer", "productNmae", "price", "country", "provence", "city"));        //  .each(new Fields("str", "timeStamp", "yyyyMMddStr", "yyyyMMddHHStr", "yyyyMMddHHmmStr", "consumer", "productNmae", "price", "country", "provence", "city"), new PrintTestFilter2());

4.进行一个同时进行次数与求和统计的例子,storm是一个服务器节点多个work(jvm),一个work中的task执行自己spout,bolt任务trident中最重要的地方就是自定义聚合的实现(SaleSum类),常常是实现业务逻辑的地方,规定如何进行数据的聚合,  进行的是各个分区的局部聚合
        //1. 对每天电商的销售额        //去掉用不到的自地段 保留需要用到的字段        //分区统计的流        Stream partitionStatStream = hasPraseSteam.project(new Fields("yyyyMMddStr", "price"))                .shuffle()                .groupBy(new Fields("yyyyMMddStr"))                .chainedAgg()                .partitionAggregate(new Fields("price"), new SaleSum(), new Fields("saleTotalpartByDay")) //进行同一批次各个分区的局部销售额统计                .partitionAggregate(new Fields("price"), new Count(), new Fields("oderNumOfpartDay"))//同一批次中各个分区的订单数                .chainEnd()                .toStream()                .parallelismHint(2);

5. //全局统计 每天的总销售额进行 进行分组全局聚合一般的 顺序=============先进行分区统计,在进行全局统计(相当于hadoop的combine与spark中reducebykey)        TridentState saleGlobalState = partitionStatStream.groupBy(new Fields("yyyyMMddStr"))                .persistentAggregate(new MemoryMapState.Factory(), new Fields("saleTotalpartByDay"), new Sum(), new Fields("saleGlobalAmtDay"));        //测试        saleGlobalState.newValuesStream().each(new Fields("yyyyMMddStr", "saleGlobalAmtDay"), new PrintTestFilter2());        //全局统计 每天的订单总数        TridentState oderGlobalState = partitionStatStream.groupBy(new Fields("yyyyMMddStr"))                .persistentAggregate(new MemoryMapState.Factory(), new Fields("oderNumOfpartDay"), new Sum(), new Fields("oderGlobalAmtDay"));        oderGlobalState.newValuesStream().each(new Fields("yyyyMMddStr", "oderGlobalAmtDay"), new PrintTestFilter2());

//2.给与地域时段  维度 统计

//    "timeStamp","yyyyMMddStr","yyyyMMddHHStr","yyyyMMddHHmmStr","consumer","productNmae","price","country","provence","city"

TridentState state = hasPraseSteam.project(new Fields("yyyyMMddHHStr", "price", "country", "provence", "city"))                .each(new Fields("yyyyMMddHHStr", "country", "provence", "city"), new ContactKey(), new Fields("addrAndHour"))                //  .project()                .groupBy(new Fields("addrAndHour"))                .persistentAggregate(new MemoryMapState.Factory(), new Fields("price"), new Sum(), new Fields("saleAmtOfAddrAndHour"));

//测试        state.newValuesStream().each(new Fields("addrAndHour"), new PrintTestFilter2());

//3.使用hbase存入 结果状态        /**rowkey         * value         * 非实物 :就简单存储一个value         * 严格的事实控制: 存储: batchId和统计值         * 透明事务控制 : batchId和统计值和上个批次的统计值

*/        HBaseMapState.Options<OpaqueValue> opts=new HBaseMapState.Options<OpaqueValue>();        opts.tableName="test";        opts.columnFamily="info";        //1.1以后设置列名使用下面类        TridentHBaseMapMapper mapMapper= new SimpleTridentHBaseMapMapper("saleAmtOfAddrAndHour");        opts.mapMapper = mapMapper;        StateFactory Hbasefactory=HBaseMapState.opaque(opts);

6.进行hbase存储,storm在给apache后,(1.0版本?后)已经实现与hbase的集成接口,事物类型要与topoloy一致
persistentAggregate为最终的持久化函数,存储可以为内存/hbase,返回值为tridentState
//        HBaseMapState.Options<Object> opts=new HBaseMapState.Options<Object>();//        opts.tableName="test";//        opts.columnFamily="info";//        //1.1以后设置列名使用下面类,存入hbase的列名//        TridentHBaseMapMapper mapMapper= new SimpleTridentHBaseMapMapper("saleAmtOfAddrAndHour");//        opts.mapMapper = mapMapper;//       StateFactory Hbasefactory1=HBaseMapState.nonTransactional(opts);

TridentState HbaseState = hasPraseSteam.project(new Fields("yyyyMMddHHStr", "price", "country", "provence", "city"))                .each(new Fields("yyyyMMddHHStr", "country", "provence", "city"), new ContactKey(), new Fields("addrAndHour"))                //  .project()                .groupBy(new Fields("addrAndHour"))                .persistentAggregate(Hbasefactory, new Fields("price"), new Sum(), new Fields("saleAmtOfAddrAndHour"));

//进行drpc查询        LocalDRPC localDRPC = new LocalDRPC();        topology.newDRPCStream("saleAmtOfDay", localDRPC)                .each(new Fields("args"), new SplitFunction1(), new Fields("requestDate"))                .stateQuery(saleGlobalState, new Fields("requestDate"), new MapGet(),                        new Fields("saleGlobalAmtOfDay1"))                .project(new Fields("requestDate", "saleGlobalAmtOfDay1"))                .each(new Fields("saleGlobalAmtOfDay1"), new FilterNull())        //  .each(new Fields("requestDate", "saleGlobalAmtOfDay1"), new PrintTestFilter2())        ;

topology.newDRPCStream("numOrderOfDay", localDRPC)                .each(new Fields("args"), new SplitFunction1(), new Fields("requestDate"))                .stateQuery(oderGlobalState, new Fields("requestDate"), new MapGet(),                        new Fields("numOrderGlobalOfDay1"))                .project(new Fields("requestDate", "numOrderGlobalOfDay1"))                .each(new Fields("numOrderGlobalOfDay1"), new FilterNull())        ;

topology.newDRPCStream("saleTotalAmtOfAddrAndHour", localDRPC)                .each(new Fields("args"), new SplitFunction1(), new Fields("requestAddrAndHour"))                .stateQuery(HbaseState, new Fields("requestAddrAndHour"),                        new MapGet(), new Fields("saleTotalAmtOfAddrAndHour"))                .project(new Fields("requestAddrAndHour", "saleTotalAmtOfAddrAndHour"))                .each(new Fields("saleTotalAmtOfAddrAndHour"), new FilterNull())        ;

7.提交本地还是集群运行,drpc可以进行对持久化后的state进行数据查询        Config conf = new Config();        if (args == null || args.length <= 0) {            // 本地测试            LocalCluster localCluster = new LocalCluster();            // topology名称唯一            localCluster.submitTopology("odeR", conf, topology.build());            while (true) {

try {                    Thread.sleep(10000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                String saleAmtResult =                        localDRPC.execute("saleAmtOfDay", "20160828 20160827");

System.err.println("saleAmtResult=" + saleAmtResult);

String numberOrderResult =                        localDRPC.execute("numOrderOfDay", "20160828 20160827");                System.err.println("numberOrderResult=" + numberOrderResult);

String saleTotalAmtOfAddrAndHourRessult =                        localDRPC.execute("saleTotalAmtOfAddrAndHour", "苏州_江苏_中国_2016082815");

System.err.println(saleTotalAmtOfAddrAndHourRessult);            }        } else {            try {                StormSubmitter.submitTopology(args[0], conf, topology.build());            } catch (AlreadyAliveException e) {                e.printStackTrace();            } catch (InvalidTopologyException e) {                e.printStackTrace();            } catch (AuthorizationException e) {                e.printStackTrace();            }        }    }}

原文地址:https://www.cnblogs.com/hejunhong/p/10384492.html

时间: 2024-11-10 20:57:26

storm的trident编程模型的相关文章

Storm 第一章 核心组件及编程模型

1 流式计算 流式计算:数据实时产生.实时传输.实时计算.实时展示 代表技术:Flume实时获取数据.Kafka/metaq实时数据存储.Storm/JStorm实时数据计算.Redis实时结果缓存.持久化存储(mysql). 一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果. 2 Storm是什么 Storm 是用来实时处理数据,特点:低延迟.高可用.分布式.可扩展.数据不丢失,提供简单容易理解的接口,便于开发. 3 Storm 与Hadoop的区别 Storm用于实

Storm介绍及核心组件和编程模型

离线计算 离线计算:批量获取数据.批量传输数据.周期性批量计算数据.数据展示 代表技术:Sqoop批量导入数据.HDFS批量存储数据.MapReduce批量计算数据.Hive批量计算数据.azkaban/oozie任务调度 流式计算 流式计算:数据实时产生.数据实时传输.数据实时计算.实时展示 代表技术:Flume实时获取数据.Kafka/metaq实时数据存储.Storm/JStorm实时数据计算.Redis实时结果缓存.持久化存储(mysql). 一句话总结:将源源不断产生的数据实时收集并实

Storm集群上的开发 ,Topology任务的编写 之 WordCountTopology数据流分析(storm编程模型)(一张图说明问题)(四)

WordCountTopology数据流分析(storm编程模型) 上一章的example的单词统计在storm的数据流动到底是怎么进行的呢,这一章节开始介绍:

Storm集群组件和编程模型

 Storm工作原理: Storm是一个开源的分布式实时计算系统,常被称为流式计算框架.什么是流式计算呢?通俗来讲,流式计算顾名思义:数据流源源不断的来,一边来,一边计算结果,再进入下一个流. 比如一般金融系统一直不断的执行,金融交易.用户全部行为都记录进日志里,日志分析出站点运维.猎户信息.海量数据使得单节点处理只是来.所以就用到分布式计算机型,storm 是当中的典型代表之中的一个,一般应用场景是:中间使用一个消息队列系统如kafka,先将消息缓存起来,storm 中有非常多的节点,分布

Storm中Trident流合并的例子demo

流的合并操作,是指根据两个流的关联条件将两个流合并成一个流,然后在进行后面的处理操作,如果使用Spout和Bolt这种编程模型的话写起来会比较困难和繁琐,因为要设置缓冲区来保存第一次过来的数据,后续还要进行两者的比较,使用Trident应用起来比较方便,对原来的编程模型进行了一定的抽象.代码实例: 需求: 两个spout: spout1:里面的数据是 name ,id 和tel, spout2是sex 和id: 首先对spout1进行过滤,过滤掉不是186的电话号码,然后显示 然后根据将过滤后的

JStorm:概念与编程模型

1.集群架构 JStorm从设计的角度,就是一个典型的调度系统,简单集群的架构如下图所示,其中Nimbus可增加一个备节点,多个Supervisor节点组成任务执行集群. 1.1.Nimbus Nimbus是作为整个集群的调度器角色,负责分发topology代码.分配任务,监控集群运行状态等,其主要通过ZK与supervisor交互.可以和Supervisor运行在同一物理机上,JStorm中Nimbus可采用主从备份,支持热切. 1.2.Supervisor Supervisor 是集群中任务

Linux的I/O模式、事件驱动编程模型

大纲: (1)基础概念回顾 (2)Linux的I/O模式 (3)事件驱动编程模型 (4)select/poll/epoll的区别和Python示例 网络编程里常听到阻塞IO.非阻塞IO.同步IO.异步IO等概念,总听别人装13不如自己下来钻研一下.不过,搞清楚这些概念之前,还得先回顾一些基础的概念. 1.基础知识回顾 注意:咱们下面说的都是Linux环境下,跟Windows不一样哈~~~ 1.1 用户空间和内核空间 现在操作系统都采用虚拟寻址,处理器先产生一个虚拟地址,通过地址翻译成物理地址(内

ARMV8 datasheet学习笔记4:AArch64系统级体系结构之编程模型(4)- 其它

1. 前言 2.可配置的指令使能/禁用控制和trap控制 指令使能/禁用 当指令被禁用,则这条指令就会变成未定义 指令Trap控制 控制某条或某些指令在运行时进入陷阱,进入陷阱的指令会产生trap异常,路由规则如下: (1)当前为EL1,则陷阱异常传递给EL1(HCR_EL2.TGE定义为1时,会路由到EL2); (2)当前为EL2,则陷阱异常传递给EL2; (3)当前为EL3,则陷阱异常传递给EL3; 3. 系统调用 SVC 默认情况下SVC产生supervisor call,同步异常目标级别

网络编程模型

课程索引 1. 编程模型 2. 编程模型 Socket的实质就是一个接口 , 利用该接口,用户在使用不同的网络协议时,操作函数得以统一. 而针对不同协以统一. 而针对不同协议的差异性操作,则交给了 socket去自行解决. 3. TCP编程模型 4. UDP编程模型