storm的基本概念别人总结的,
https://blog.csdn.net/pickinfo/article/details/50488226
编程模型最关键最难就是实现局部聚合的业务逻辑聚合类实现Aggregator接口重写方法aggregate,聚合使用存储中间聚合过程状态的类,本地hashmap的去重逻辑还有加入redis后进行的一些去重操作,数据的持久(判断三天内的带播控量)
public class SaleSum implements Aggregator<SaleSumState> { private Logger logger = org.slf4j.LoggerFactory.getLogger(SaleSum.class); /** * */ private static final long serialVersionUID = -6879728480425771684L; private int partitionIndex ; @Override public SaleSumState init(Object batchId, TridentCollector collector) { return new SaleSumState(); } @Override public void aggregate(SaleSumState val, TridentTuple tuple, TridentCollector collector) { double oldSum=val.saleSum; double price=tuple.getDoubleByField("price"); double newSum=oldSum+price; val.saleSum=newSum; } @Override public void complete(SaleSumState val, TridentCollector collector) { collector.emit(new Values(val.saleSum)); } @Override public void prepare(Map conf, TridentOperationContext context) { } @Override public void cleanup() { } }
public class TridentDemo { public static final String SPOUT_ID = "kafak_spout"; public static void main(String[] args) { 1、创建一个strom此程序的topology 为TridentTopology
TridentTopology topology = new TridentTopology();2、连接kafka的三要素:zk地址:port topic //1.从kafak读取数据, //只会被成功处理 一次 ,有且只有此一次 提供容错机制 处理失败会在后续的批次进行提交 BrokerHosts zkHost = new ZkHosts("hadoop01:2181,hadoop02:2181,hadoop03:2181"); TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHost, "test");//两种构造器 定义从哪消费相当于spark中earliest与largest kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //透明事务kafka的spout OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig); //严格模式的事务级别 TransactionalTridentKafkaSpout kafkaSpout1 = new TransactionalTridentKafkaSpout(kafkaConfig); //普通的kafak级别 {"str","msg"} //严格的kafak级别 {"str","msg",上一批次的值} Stream stream = topology.newStream(SPOUT_ID, kafkaSpout); // stream.each(new Fields("str"),new PrintTestFilter2());3.进行日志数据的解析,自定义解析类实现了Funtion接口,重写execute方法进行字段解析,在发送出来collector.emit(new Values(timestamp,yyyyMMddStr,yyyyMMddHHStr,yyyyMMddHHmmStr,consumer,productName,price,country,province,city));
进去的字段名定义为"str",出来的解析字段分别定义了字段名 ,后续做打印测试Stream hasPraseSteam = stream.each(new Fields("str"), new ParseFunction(), new Fields("timeStamp", "yyyyMMddStr", "yyyyMMddHHStr", "yyyyMMddHHmmStr", "consumer", "productNmae", "price", "country", "provence", "city")); // .each(new Fields("str", "timeStamp", "yyyyMMddStr", "yyyyMMddHHStr", "yyyyMMddHHmmStr", "consumer", "productNmae", "price", "country", "provence", "city"), new PrintTestFilter2()); 4.进行一个同时进行次数与求和统计的例子,storm是一个服务器节点多个work(jvm),一个work中的task执行自己spout,bolt任务trident中最重要的地方就是自定义聚合的实现(SaleSum类),常常是实现业务逻辑的地方,规定如何进行数据的聚合, 进行的是各个分区的局部聚合
//1. 对每天电商的销售额 //去掉用不到的自地段 保留需要用到的字段 //分区统计的流 Stream partitionStatStream = hasPraseSteam.project(new Fields("yyyyMMddStr", "price")) .shuffle() .groupBy(new Fields("yyyyMMddStr")) .chainedAgg() .partitionAggregate(new Fields("price"), new SaleSum(), new Fields("saleTotalpartByDay")) //进行同一批次各个分区的局部销售额统计 .partitionAggregate(new Fields("price"), new Count(), new Fields("oderNumOfpartDay"))//同一批次中各个分区的订单数 .chainEnd() .toStream() .parallelismHint(2); 5. //全局统计 每天的总销售额进行 进行分组全局聚合一般的 顺序=============先进行分区统计,在进行全局统计(相当于hadoop的combine与spark中reducebykey) TridentState saleGlobalState = partitionStatStream.groupBy(new Fields("yyyyMMddStr")) .persistentAggregate(new MemoryMapState.Factory(), new Fields("saleTotalpartByDay"), new Sum(), new Fields("saleGlobalAmtDay")); //测试 saleGlobalState.newValuesStream().each(new Fields("yyyyMMddStr", "saleGlobalAmtDay"), new PrintTestFilter2()); //全局统计 每天的订单总数 TridentState oderGlobalState = partitionStatStream.groupBy(new Fields("yyyyMMddStr")) .persistentAggregate(new MemoryMapState.Factory(), new Fields("oderNumOfpartDay"), new Sum(), new Fields("oderGlobalAmtDay")); oderGlobalState.newValuesStream().each(new Fields("yyyyMMddStr", "oderGlobalAmtDay"), new PrintTestFilter2()); //2.给与地域时段 维度 统计 // "timeStamp","yyyyMMddStr","yyyyMMddHHStr","yyyyMMddHHmmStr","consumer","productNmae","price","country","provence","city" TridentState state = hasPraseSteam.project(new Fields("yyyyMMddHHStr", "price", "country", "provence", "city")) .each(new Fields("yyyyMMddHHStr", "country", "provence", "city"), new ContactKey(), new Fields("addrAndHour")) // .project() .groupBy(new Fields("addrAndHour")) .persistentAggregate(new MemoryMapState.Factory(), new Fields("price"), new Sum(), new Fields("saleAmtOfAddrAndHour")); //测试 state.newValuesStream().each(new Fields("addrAndHour"), new PrintTestFilter2()); //3.使用hbase存入 结果状态 /**rowkey * value * 非实物 :就简单存储一个value * 严格的事实控制: 存储: batchId和统计值 * 透明事务控制 : batchId和统计值和上个批次的统计值 */ HBaseMapState.Options<OpaqueValue> opts=new HBaseMapState.Options<OpaqueValue>(); opts.tableName="test"; opts.columnFamily="info"; //1.1以后设置列名使用下面类 TridentHBaseMapMapper mapMapper= new SimpleTridentHBaseMapMapper("saleAmtOfAddrAndHour"); opts.mapMapper = mapMapper; StateFactory Hbasefactory=HBaseMapState.opaque(opts); 6.进行hbase存储,storm在给apache后,(1.0版本?后)已经实现与hbase的集成接口,事物类型要与topoloy一致
persistentAggregate为最终的持久化函数,存储可以为内存/hbase,返回值为tridentState
// HBaseMapState.Options<Object> opts=new HBaseMapState.Options<Object>();// opts.tableName="test";// opts.columnFamily="info";// //1.1以后设置列名使用下面类,存入hbase的列名// TridentHBaseMapMapper mapMapper= new SimpleTridentHBaseMapMapper("saleAmtOfAddrAndHour");// opts.mapMapper = mapMapper;// StateFactory Hbasefactory1=HBaseMapState.nonTransactional(opts); TridentState HbaseState = hasPraseSteam.project(new Fields("yyyyMMddHHStr", "price", "country", "provence", "city")) .each(new Fields("yyyyMMddHHStr", "country", "provence", "city"), new ContactKey(), new Fields("addrAndHour")) // .project() .groupBy(new Fields("addrAndHour")) .persistentAggregate(Hbasefactory, new Fields("price"), new Sum(), new Fields("saleAmtOfAddrAndHour")); //进行drpc查询 LocalDRPC localDRPC = new LocalDRPC(); topology.newDRPCStream("saleAmtOfDay", localDRPC) .each(new Fields("args"), new SplitFunction1(), new Fields("requestDate")) .stateQuery(saleGlobalState, new Fields("requestDate"), new MapGet(), new Fields("saleGlobalAmtOfDay1")) .project(new Fields("requestDate", "saleGlobalAmtOfDay1")) .each(new Fields("saleGlobalAmtOfDay1"), new FilterNull()) // .each(new Fields("requestDate", "saleGlobalAmtOfDay1"), new PrintTestFilter2()) ; topology.newDRPCStream("numOrderOfDay", localDRPC) .each(new Fields("args"), new SplitFunction1(), new Fields("requestDate")) .stateQuery(oderGlobalState, new Fields("requestDate"), new MapGet(), new Fields("numOrderGlobalOfDay1")) .project(new Fields("requestDate", "numOrderGlobalOfDay1")) .each(new Fields("numOrderGlobalOfDay1"), new FilterNull()) ; topology.newDRPCStream("saleTotalAmtOfAddrAndHour", localDRPC) .each(new Fields("args"), new SplitFunction1(), new Fields("requestAddrAndHour")) .stateQuery(HbaseState, new Fields("requestAddrAndHour"), new MapGet(), new Fields("saleTotalAmtOfAddrAndHour")) .project(new Fields("requestAddrAndHour", "saleTotalAmtOfAddrAndHour")) .each(new Fields("saleTotalAmtOfAddrAndHour"), new FilterNull()) ; 7.提交本地还是集群运行,drpc可以进行对持久化后的state进行数据查询 Config conf = new Config(); if (args == null || args.length <= 0) { // 本地测试 LocalCluster localCluster = new LocalCluster(); // topology名称唯一 localCluster.submitTopology("odeR", conf, topology.build()); while (true) { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } String saleAmtResult = localDRPC.execute("saleAmtOfDay", "20160828 20160827"); System.err.println("saleAmtResult=" + saleAmtResult); String numberOrderResult = localDRPC.execute("numOrderOfDay", "20160828 20160827"); System.err.println("numberOrderResult=" + numberOrderResult); String saleTotalAmtOfAddrAndHourRessult = localDRPC.execute("saleTotalAmtOfAddrAndHour", "苏州_江苏_中国_2016082815"); System.err.println(saleTotalAmtOfAddrAndHourRessult); } } else { try { StormSubmitter.submitTopology(args[0], conf, topology.build()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } }}
原文地址:https://www.cnblogs.com/hejunhong/p/10384492.html
时间: 2024-11-10 20:57:26