Trident中的DRPC实现

一:介绍

1.说明

  Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。

2.工作机制

  Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)DRPC服务器协调

  1) 接收一个RPC请求。

  2) 发送请求到storm topology

  3) 从storm topology接收结果。

  4) 把结果发回给等待的客户端。从客户端的角度来看一个DRPC调用跟一个普通的RPC调用没有任何区别。

3.工作流程

  

  客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。

  实现了这个函数的topology使用 DRPCSpout 从DRPC服务器接收函数调用流。

  每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做 ReturnResults 的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

二:本地DRPC

1.主驱动类

 1 package com.jun.tridentWithHbase;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.LocalDRPC;
 6 import backtype.storm.StormSubmitter;
 7 import backtype.storm.generated.AlreadyAliveException;
 8 import backtype.storm.generated.InvalidTopologyException;
 9 import backtype.storm.tuple.Fields;
10 import backtype.storm.tuple.Values;
11 import org.apache.storm.hbase.trident.state.HBaseMapState;
12 import storm.trident.Stream;
13 import storm.trident.TridentState;
14 import storm.trident.TridentTopology;
15 import storm.trident.operation.builtin.Count;
16 import storm.trident.operation.builtin.MapGet;
17 import storm.trident.operation.builtin.Sum;
18 import storm.trident.state.OpaqueValue;
19 import storm.trident.state.StateFactory;
20 import storm.trident.testing.FixedBatchSpout;
21 import storm.trident.testing.MemoryMapState;
22
23 public class TridentDemo {
24     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
25         TridentTopology tridentTopology=new TridentTopology();
26         //模拟数据
27         Fields field=new Fields("log","flag");
28         FixedBatchSpout spout=new FixedBatchSpout(field,5,
29             new Values("168.214.187.214 - - [1481953616092] \"GET /view.php HTTP/1.1\" 200 0 \"http://cn.bing.com/search?q=spark mllib\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","A"),
30             new Values("168.187.202.202 - - [1481953537038] \"GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)\" \"-\"","A"),
31             new Values("61.30.167.187 - - [1481953539039] \"GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","A"),
32             new Values("30.29.132.190 - - [1481953544042] \"GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53\" \"-\"","B"),
33             new Values("222.190.187.201 - - [1481953578068] \"GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","B"),
34             new Values("72.202.43.53 - - [1481953579069] \"GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","B")
35         );
36         //多次循环
37         spout.setCycle(true);
38         //流处理
39         Stream stream=tridentTopology.newStream("orderAnalyse",spout)
40                 //过滤
41             .each(new Fields("log"),new ValidLogFilter())
42                 //解析
43             .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
44                 //投影
45             .project(new Fields("orderId","orderTime","orderAmtStr","memberId"))
46                 //时间解析
47             .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter"))
48          ;
49         //分流
50         //1.基于minter统计订单数量,分组统计
51         TridentState state=stream.groupBy(new Fields("minter"))
52                 //全局聚合,使用内存存储状态信息
53                 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter"));
54 //        state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter());
55
56         //2.另一个流,基于分钟的订单金额,局部聚合
57         Stream partitionStream=stream.each(new Fields("orderAmtStr"),new TransforAmtToDoubleFunction(),new Fields("orderAmt"))
58             .groupBy(new Fields("minter"))
59                     //局部聚合
60                 .chainedAgg()    //聚合链
61             .partitionAggregate(new Fields("orderAmt"),new LocalSum(),new Fields("orderAmtSumOfLocal"))
62                 .chainEnd();      //聚合链
63
64         //做一次全局聚合
65         TridentState partitionState=partitionStream.groupBy(new Fields("minter"))
66                 //全局聚合
67                 .persistentAggregate(new MemoryMapState.Factory(),new Fields("orderAmtSumOfLocal"),new Sum(),new Fields("totalOrderAmt"));
68         partitionState.newValuesStream().each(new Fields("minter","totalOrderAmt"),new PrintFilter());
69
70         //提交
71         Config config=new Config();
72         if(args==null || args.length<=0){
73             //应该是构建一个DRPC的服务器
74             LocalDRPC localDRPC=new LocalDRPC();
75             tridentTopology.newDRPCStream("orderDataServer",localDRPC)
76                     //参数处理
77                 .each(new Fields("args"),new RequestParamsParserFunction(),new Fields("date"))
78                     //查询,重要的参数是上面的partitionState
79                 .stateQuery(partitionState,new Fields("date"),new MapGet(),new Fields("totalAmtByMinter"))
80                     //投影
81                 .project(new Fields("date","totalAmtByMinter"));
82             //提交任务
83             LocalCluster localCluster=new LocalCluster();
84             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
85             //获取值
86             String jsonResult=localDRPC.execute("orderDataServer","201612171345 201612171345");
87             System.out.println("***"+jsonResult+"***");
88
89         }else {
90             config.setNumWorkers(2);
91             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
92         }
93     }
94 }

2.请求参数处理类

 1 package com.jun.tridentWithHbase;
 2
 3 import backtype.storm.tuple.Values;
 4 import storm.trident.operation.Function;
 5 import storm.trident.operation.TridentCollector;
 6 import storm.trident.operation.TridentOperationContext;
 7 import storm.trident.tuple.TridentTuple;
 8
 9 import java.util.Map;
10
11 public class RequestParamsParserFunction implements Function {
12     @Override
13     public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
14         String parameters=tridentTuple.getStringByField("args");
15         String[] params=parameters.split(" ");
16         for (String param:params){
17             tridentCollector.emit(new Values(param));
18         }
19     }
20
21     @Override
22     public void prepare(Map map, TridentOperationContext tridentOperationContext) {
23
24     }
25
26     @Override
27     public void cleanup() {
28
29     }
30 }

3.效果

  

三:集群模式的DRPC 

1.主驱动类

 1  config.setNumWorkers(2);
 2             //集群上构建DRPC服务器
 3             tridentTopology.newDRPCStream("orderDataServer")
 4                     //参数处理
 5                     .each(new Fields("args"),new RequestParamsParserFunction(),new Fields("date"))
 6                     //查询,重要的参数是上面的partitionState
 7                     .stateQuery(partitionState,new Fields("date"),new MapGet(),new Fields("totalAmtByMinter"))
 8                     //投影
 9                     .project(new Fields("date","totalAmtByMinter"));
10             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());

2.配置DRPC服务和端口

  

3.启动storm

4.启动Drpc进程

  在drpc.servers参数所指定的服务器上。

  命令:nohup bin/storm drpc >>/dev/null 2>&1 &

  查看端口是否正常打开:netstat -tlnup | grep 3772

5.将jar包提交到集群上

6.编写客户端

 1 package com.jun.tridentWithKafka;
 2
 3 import backtype.storm.generated.DRPCExecutionException;
 4 import backtype.storm.utils.DRPCClient;
 5 import org.apache.thrift7.TException;
 6
 7 public class DrpcClientDemo {
 8     public static void main(String[] args) {
 9         DRPCClient drpcClient=new DRPCClient("linux-hadoop01.ibeifeng.com",3772);
10         try {
11             String jsonResult=drpcClient.execute("orderDataServer","201612171345 201612171345");
12             System.out.println("==="+jsonResult+"===");
13         } catch (TException e) {
14             e.printStackTrace();
15         } catch (DRPCExecutionException e) {
16             e.printStackTrace();
17         }
18     }
19 }

  

原文地址:https://www.cnblogs.com/juncaoit/p/9175496.html

时间: 2024-10-23 20:06:00

Trident中的DRPC实现的相关文章

Trident中的解析包含的函数操作与投影操作

一:函数操作 1.介绍 Tuple本身是不可变的 Function只是在原有的基础上追加新的tuple 2.说明 如果原来的字段是log,flag 新增之后的tuple可以访问这些字段,log,flag,orderId,orderAmt,memberId 3.先写驱动类 增加了解析 然后再将解析的日志进行打印 1 package com.jun.trident; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalClus

Trident中的过滤与函数的区别

1.共同点 都需要实现storm.trident.operation.Function接口 2.不同点 其中函数有发射这个步骤. .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter")) 其中DateTransFormerFunction中有这样的程序:tridentCollector.emit(

Trident中 FixedBatchSpout分析

FixedBatchSpout 继承自 IBatchSpout IBatchSpout 方法 public interface IBatchSpout extends Serializable { void open(Map conf, TopologyContext context); void emitBatch(long batchId, TridentCollector collector); void ack(long batchId); void close(); Map getCo

Apache Storm 官方文档 —— Trident State

转载自并发编程网 – ifeve.com本文链接地址: Apache Storm 官方文档 -- Trident State Trident 中含有对状态化(stateful)的数据源进行读取和写入操作的一级抽象封装工具.这个所谓的状态(state)既可以保存在拓扑内部(保存在内存中并通过 HDFS 来实现备份),也可以存入像 Memcached 或者 Cassandra 这样的外部数据库中.而对于 Trident API 而言,这两种机制并没有任何区别. Trident 使用一种容错性的方式实

Storm高级原语(四)Trident API 综述

"Stream"是Trident中的核心数据模型,它被当做一系列的batch来处理.在Storm集群的节点之间,一个stream被划分成很多partition(分区),对流的操作(operation)是在每个partition上并行进行的. 注: ①"Stream"是Trident中的核心数据模型:有些地方也说是TridentTuple,没有个标准的说法. ②一个stream被划分成很多partition:partition是stream的一个子集,里面可能有多个b

storm trident 示例

Storm Trident的核心数据模型是一批一批被处理的“流”,“流”在集群的分区在集群的节点上,对“流”的操作也是并行的在每个分区上进行. Trident有五种对“流”的操作: 1.      不需要网络传输的本地批次运算 2.      需要网络传输的“重分布”操作,不改变数据的内容 3.      聚合操作,网络传输是该操作的一部分 4.      “流”分组(grouby)操作 5.      合并和关联操作 批次本地操作: 批次本地操作不需要网络传输,本格分区(partion)的运算

storm的trident编程模型

storm的基本概念别人总结的, https://blog.csdn.net/pickinfo/article/details/50488226 编程模型最关键最难就是实现局部聚合的业务逻辑聚合类实现Aggregator接口重写方法aggregate,聚合使用存储中间聚合过程状态的类,本地hashmap的去重逻辑还有加入redis后进行的一些去重操作,数据的持久(判断三天内的带播控量) public class SaleSum implements Aggregator<SaleSumState

DRPC详解

什么是DRPC? 分布式RPC(distributed RPC,DRPC)用于对Storm上大量的函数调用进行并行计算. 对于每一次函数调用,Storm集群上运行的拓扑接收调用函数的参数信息作为输入流,并将计算结果作为输出流发射出去. 一句话概括:Storm进行计算,根据客户端提交的请求参数,而返回Storm计算的结果. DRPC通过DRPC Server来实现,DRPC Server的整体工作过程如下: 接收到一个RPC调用请求: 发送请求到Storm上的拓扑: 从Storm上接收计算结果:

Storm的DRPC

RPC:Remote Procedure Call DRPC:Distributed RPC Hadoop提供了RPC的实现机制,实现方法见:<>,本文主要介绍Storm的DRPC. 1.Storm DRPC工作流程 Storm的DRPC由DRPC Server和一个特殊的topology组成.DRPC Server主要起协调客户端和topology的作用,其接收客户端的请求,并将请求发送给topology,然后将topology的处理结果返回给客户端. 下面是官网给出的流程图: 1)客户端向