Trident中的解析包含的函数操作与投影操作

一:函数操作

1.介绍

  Tuple本身是不可变的

  Function只是在原有的基础上追加新的tuple

2.说明

  如果原来的字段是log,flag

  新增之后的tuple可以访问这些字段,log,flag,orderId,orderAmt,memberId

3.先写驱动类

  增加了解析

  然后再将解析的日志进行打印

 1 package com.jun.trident;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.generated.AlreadyAliveException;
 7 import backtype.storm.generated.InvalidTopologyException;
 8 import backtype.storm.tuple.Fields;
 9 import backtype.storm.tuple.Values;
10 import storm.trident.TridentTopology;
11 import storm.trident.operation.Function;
12 import storm.trident.operation.TridentCollector;
13 import storm.trident.operation.TridentOperationContext;
14 import storm.trident.testing.FixedBatchSpout;
15 import storm.trident.tuple.TridentTuple;
16
17 import java.util.Map;
18
19 public class TridentDemo {
20     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
21         TridentTopology tridentTopology=new TridentTopology();
22         //模拟数据
23         Fields field=new Fields("log","flag");
24         FixedBatchSpout spout=new FixedBatchSpout(field,5,
25             new Values("168.214.187.214 - - [1481953616092] \"GET /view.php HTTP/1.1\" 200 0 \"http://cn.bing.com/search?q=spark mllib\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","A"),
26             new Values("168.187.202.202 - - [1481953537038] \"GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)\" \"-\"","A"),
27             new Values("61.30.167.187 - - [1481953539039] \"GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","A"),
28             new Values("30.29.132.190 - - [1481953544042] \"GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53\" \"-\"","B"),
29             new Values("222.190.187.201 - - [1481953578068] \"GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","B"),
30             new Values("72.202.43.53 - - [1481953579069] \"GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","B")
31         );
32         //多次循环
33         spout.setCycle(true);
34         //提交
35         Config config=new Config();
36         tridentTopology.newStream("orderAnalyse",spout)
37                 //过滤
38             .each(new Fields("log"),new ValidLogFilter())
39                 //解析
40             .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
41                 //不添加log了,日志太长,不方便看控制台的现象
42             .each(new Fields("flag","orderId","orderTime","orderAmtStr","memberId"),new PrintFilter());
43         if(args==null || args.length<=0){
44             LocalCluster localCluster=new LocalCluster();
45             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
46         }else {
47             config.setNumWorkers(2);
48             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
49         }
50     }
51 }

4.解析方法类

 1 package com.jun.trident;
 2
 3 import backtype.storm.tuple.Values;
 4 import storm.trident.operation.Function;
 5 import storm.trident.operation.TridentCollector;
 6 import storm.trident.operation.TridentOperationContext;
 7 import storm.trident.tuple.TridentTuple;
 8
 9 import java.util.HashMap;
10 import java.util.Map;
11
12 public class LogParserFunction implements Function {
13     @Override
14     public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
15         //function对传进来的tuple子集
16         String log=tridentTuple.getStringByField("log");
17         //解析
18         int splitIndex=log.indexOf("IBEIfeng.gif")+13;
19         String orderInfo=log.substring(splitIndex).split(" ")[0];
20         String[] kvs=orderInfo.split("\\&");
21         Map<String,String> orderInfos=new HashMap<>();
22         for(String kv:kvs){
23             String[] keyValues=kv.split("=");
24             if(keyValues.length==2){
25                 orderInfos.put(keyValues[0],keyValues[1]);
26             }
27         }
28         String orderId=getValue(orderInfos,"order_id","");//注意这个在日志中是这个字段
29         String orderTime=getValue(orderInfos,"orderTime","");
30         String orderAmtStr=getValue(orderInfos,"orderAmt","");
31         String memberId=getValue(orderInfos,"memberId","");
32         tridentCollector.emit(new Values(orderId,orderTime,orderAmtStr,memberId));
33     }
34
35     @Override
36     public void prepare(Map map, TridentOperationContext tridentOperationContext) {
37
38     }
39
40     @Override
41     public void cleanup() {
42
43     }
44
45     public String getValue(Map<String,String> map,String key,String defaultValue){
46         if(map.containsKey(key)){
47             return map.get(key);
48         }else{
49             return defaultValue;
50         }
51     }
52 }

5.效果

  

二:投影操作

1.说明

  可以对tuple进行裁剪操作。

2.驱动类

  先投影

  然后打印

 1 package com.jun.trident;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.generated.AlreadyAliveException;
 7 import backtype.storm.generated.InvalidTopologyException;
 8 import backtype.storm.tuple.Fields;
 9 import backtype.storm.tuple.Values;
10 import storm.trident.TridentTopology;
11 import storm.trident.operation.Function;
12 import storm.trident.operation.TridentCollector;
13 import storm.trident.operation.TridentOperationContext;
14 import storm.trident.testing.FixedBatchSpout;
15 import storm.trident.tuple.TridentTuple;
16
17 import java.util.Map;
18
19 public class TridentDemo {
20     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
21         TridentTopology tridentTopology=new TridentTopology();
22         //模拟数据
23         Fields field=new Fields("log","flag");
24         FixedBatchSpout spout=new FixedBatchSpout(field,5,
25             new Values("168.214.187.214 - - [1481953616092] \"GET /view.php HTTP/1.1\" 200 0 \"http://cn.bing.com/search?q=spark mllib\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","A"),
26             new Values("168.187.202.202 - - [1481953537038] \"GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)\" \"-\"","A"),
27             new Values("61.30.167.187 - - [1481953539039] \"GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","A"),
28             new Values("30.29.132.190 - - [1481953544042] \"GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53\" \"-\"","B"),
29             new Values("222.190.187.201 - - [1481953578068] \"GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","B"),
30             new Values("72.202.43.53 - - [1481953579069] \"GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","B")
31         );
32         //多次循环
33         spout.setCycle(true);
34         //提交
35         Config config=new Config();
36         tridentTopology.newStream("orderAnalyse",spout)
37                 //过滤
38             .each(new Fields("log"),new ValidLogFilter())
39                 //解析
40             .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
41                 //投影
42             .project(new Fields("orderId","orderTime","orderAmtStr","memberId"))
43                 .each(new Fields("orderId","orderTime","orderAmtStr","memberId"),new PrintFilter())
44
45         ;
46         if(args==null || args.length<=0){
47             LocalCluster localCluster=new LocalCluster();
48             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
49         }else {
50             config.setNumWorkers(2);
51             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
52         }
53     }
54 }

3.效果

  

三:解析

1.说明

  这个部分在解析之后,进入分流阶段。涉及到全局分流与局部分流,现在先使用全局分流,进行打印验证。

2.聚合操作

  部分删除,部分增加

3.驱动类

 1 package com.jun.trident;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.generated.AlreadyAliveException;
 7 import backtype.storm.generated.InvalidTopologyException;
 8 import backtype.storm.tuple.Fields;
 9 import backtype.storm.tuple.Values;
10 import storm.trident.Stream;
11 import storm.trident.TridentState;
12 import storm.trident.TridentTopology;
13 import storm.trident.operation.Function;
14 import storm.trident.operation.TridentCollector;
15 import storm.trident.operation.TridentOperationContext;
16 import storm.trident.operation.builtin.Count;
17 import storm.trident.testing.FixedBatchSpout;
18 import storm.trident.testing.MemoryMapState;
19 import storm.trident.tuple.TridentTuple;
20
21 import java.util.Map;
22
23 public class TridentDemo {
24     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
25         TridentTopology tridentTopology=new TridentTopology();
26         //模拟数据
27         Fields field=new Fields("log","flag");
28         FixedBatchSpout spout=new FixedBatchSpout(field,5,
29             new Values("168.214.187.214 - - [1481953616092] \"GET /view.php HTTP/1.1\" 200 0 \"http://cn.bing.com/search?q=spark mllib\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","A"),
30             new Values("168.187.202.202 - - [1481953537038] \"GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)\" \"-\"","A"),
31             new Values("61.30.167.187 - - [1481953539039] \"GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","A"),
32             new Values("30.29.132.190 - - [1481953544042] \"GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53\" \"-\"","B"),
33             new Values("222.190.187.201 - - [1481953578068] \"GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","B"),
34             new Values("72.202.43.53 - - [1481953579069] \"GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","B")
35         );
36         //多次循环
37         spout.setCycle(true);
38         //流处理
39         Stream stream=tridentTopology.newStream("orderAnalyse",spout)
40                 //过滤
41             .each(new Fields("log"),new ValidLogFilter())
42                 //解析
43             .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
44                 //投影
45             .project(new Fields("orderId","orderTime","orderAmtStr","memberId"))
46                 //时间解析
47             .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter"))
48          ;
49         //分流
50         //1.基于minter统计订单数量,分组统计
51         TridentState state=stream.groupBy(new Fields("minter"))
52                 //全局聚合,使用内存存储状态信息
53                 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter"));
54         state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter());
55
56         //提交
57         Config config=new Config();
58         if(args==null || args.length<=0){
59             LocalCluster localCluster=new LocalCluster();
60             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
61         }else {
62             config.setNumWorkers(2);
63             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
64         }
65     }
66 }

4.时间解析方法

 1 package com.jun.trident;
 2
 3
 4 import backtype.storm.tuple.Values;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7
 8 import storm.trident.operation.Function;
 9 import storm.trident.operation.TridentCollector;
10 import storm.trident.operation.TridentOperationContext;
11 import storm.trident.tuple.TridentTuple;
12
13 import java.text.DateFormat;
14 import java.text.SimpleDateFormat;
15 import java.util.Date;
16 import java.util.Map;
17
18 public class DateTransFormerFunction implements Function {
19     private static final Logger logger = LoggerFactory.getLogger(DateTransFormerFunction.class);
20     @Override
21     public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
22         String orderTime=tridentTuple.getStringByField("orderTime");
23         // 处理时间
24         try {
25             long timestamp = Long.parseLong(orderTime);
26             Date date = new Date();
27             date.setTime(timestamp);
28             DateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
29             String dateStr = df.format(date);
30             String day = dateStr.substring(0,8);
31             String hour = dateStr.substring(0,10);
32             String minute = dateStr ;
33             //发射
34             tridentCollector.emit(new Values(day,hour,minute));
35         }catch (Exception e){
36             logger.error("日期解析出错"+orderTime);
37         }
38     }
39
40     @Override
41     public void prepare(Map map, TridentOperationContext tridentOperationContext) {
42
43     }
44
45     @Override
46     public void cleanup() {
47
48     }
49 }

5.效果

  

原文地址:https://www.cnblogs.com/juncaoit/p/9162637.html

时间: 2024-10-07 05:16:43

Trident中的解析包含的函数操作与投影操作的相关文章

Linq查询操作之投影操作

投影操作,乍一看不知道在说啥.那么什么是投影操作呢?其实就是Select操作,名字起的怪怪的.和Linq查询表达式中的select操作是一样的.它能够选择数据源中的元素,并指定元素的表现形式.投影操作包括以下2种操作: 1.Select操作,将数据源中的元素投影到新的序列中,并指定元素的类型和表现形式. 2.SelectMany操作,也可以将数据源中的元素投影到新的序列中,并指定元素的类型和表现形式.但该操作可以将该函数应用到多个序列之上,并将结果合并成一个序列. 下面我们逐一看一下这两个操作.

AngularJS指令中的compile与link函数解析

AngularJS指令中的compile与link函数解析 通常大家在使用ng中的指令的时候,用的链接函数最多的是link属性,下面这篇文章将告诉大家complie,pre-link,post-link的用法与区别. 原文地址 angularjs里的指令非常神奇,允许你创建非常语义化以及高度重用的组件,可以理解为web components的先驱者. 网上已经有很多介绍怎么使用指令的文章以及相关书籍,相互比较的话,很少有介绍compile与link的区别,更别说pre-link与post-lin

C语言头文件中包含的函数

math.h常用函数 int  abs   (int x); double  acos  (double x); double  asin  (double x); double  atan  (double x); double  atan2 (double y, double x); double  atof  (const char *s); double  ceil  (double x); double  cos   (double x); double  cosh  (double

Python的各种解析操作,和数学概念中的解析有何联系?

python中的解析 Python支持各种解析(comprehension)操作,比如列表解析.集合解析.元组解析.字典解析.它们根据某些元素来创建(推导)出一个新的列表.集合.元组.字典等.所以有的地方也称为推导,比如列表推导.集合推导等. 下面是一个列表解析的示例: 1 >>> [ i*2 for i in range(10) if i % 2 == 0 ] 2 [0, 4, 8, 12, 16] 这里是列表解析,因为使用的中括号[ xxxx ],它表示根据条件推导出一个新的列表.P

PHP中常用的字符串格式化函数总结

注意:在PHP中提供的字符串函数处理的字符串,大部分都不是在原字符串上修改,而是返回一个格式化后的新字符串. 一.取出空格和字符串填补函数 空格也是一个有效的字符,在字符串中也会占据一个位置.用户在表单输入数据时,经常在无意中会多输入一些无意义的空格.因此PHP脚本在接收到通过表单处理过来的数据时,首先处理的就是字符串中多余的空格,或者其他一些没有意义的符号.在PHP中可以通过ltrim().rtrim()和trim()函数来完成这项工作.这三个函数的语法格式相同,但作用有所不同.他们的语法格式

跟着百度学PHP[5]函数篇2-PHP中的特殊形式的函数

目录...................................................... .00x1 可变函数 .00x2 回调函数 .00x3  . ........................................................... 00x1 可变函数的使用 在PHP里面如果说将“函数名称”赋予字符串类型的变量.在调用这个变量的时候如果使用带有小括号,那么PHP就会将该作为函数解析.这就叫可变函数. 注:可变函数不可以用于echo,p

Linux中的shell脚本编程——函数

概述: 本章节将总结while,for循环语句的特殊用法,在不同的场景当中,更能发挥其功能和价值.除此之外,还会介绍一种特殊的循环语句select语句,实现菜单的作用.最重要的是讲解shell脚本编程中函数的用法.这些内容都要熟练掌握. 一.循环语句的特殊用法: 1.while循环的特殊用法(遍历文件的每一行): □语法:while read line; do 循环体 done < /PATH/FROM/SOMEFILE □意义:依次读取/PATH/FROM/SOMEFILE文件中的每一行,且将

【转】angularjs指令中的compile与link函数详解

这篇文章主要介绍了angularjs指令中的compile与link函数详解,本文同时诉大家complie,pre-link,post-link的用法与区别等内容,需要的朋友可以参考下 通常大家在使用ng中的指令的时候,用的链接函数最多的是link属性,下面这篇文章将告诉大家complie,pre-link,post-link的用法与区别. angularjs里的指令非常神奇,允许你创建非常语义化以及高度重用的组件,可以理解为web components的先驱者. 网上已经有很多介绍怎么使用指令

javascript 中break、 continue、函数不能重载

在javascript中,break与continue有着显著的差别. 如果遇到break语句,会终止最内层循环,无论后面还有多少计算. 如果遇到continue,只会终止此次循环,后面的自循环依然执行. var num = 0; for (var k = 0; k < 3; k++) { for (var i = 0; i < 3; i++) { for (var j = 0; j < 3; j++) { if (i === 1 && j === 1) { break;