DRPC详解

什么是DRPC?

分布式RPC(distributed RPC,DRPC)用于对Storm上大量的函数调用进行并行计算。

对于每一次函数调用,Storm集群上运行的拓扑接收调用函数的参数信息作为输入流,并将计算结果作为输出流发射出去。

一句话概括:Storm进行计算,根据客户端提交的请求参数,而返回Storm计算的结果。

DRPC通过DRPC Server来实现,DRPC Server的整体工作过程如下:

接收到一个RPC调用请求;

发送请求到Storm上的拓扑;

从Storm上接收计算结果;

将计算结果返回给客户端。

注:在client客户端看来,一个DRPC调用看起来和一般的RPC调用没什么区别

工作流程

Client向DRPC Server发送被调用执行的DRPC函数名称及参数;

Storm上的topology通过DRPCSpout实现这一函数,从DPRC Server接收到函数调用流;

DRPC Server会为每次函数调用生成唯一的id;

Storm上运行的topology开始计算结果,最后通过一个ReturnResults的Bolt连接到DRPC     Server,发送指定id的计算结果;

DRPC Server通过使用之前为每个函数调用生成的id,将结果关联到对应的发起调用的client,将计算结果返回给client。

DRPC包括服务端和客户端两部分:

1)服务端
服务端由四部分组成:包括一个DRPC Server, 一个 DPRC Spout,一个Topology和一个ReturnResult。

在实际使用中,主要有三个步骤:

a.启动Storm中的DRPC Server;

首先,修改Storm/conf/storm.yaml中的drpc server地址;需要注意的是:必须修改所有Nimbus和supervisor上的配置文件,设置drpc server地址。否则在运行过程中可能无法返回结果。

然后,通过 storm drpc命令启动drpc server。

b.创建一个DRPC 的Topology,提交到storm中运行。

该Toplogy和普通的Topology稍有不同,可以通过两种方式创建:

创建方法一:直接使用 Storm 提供的LinearDRPCTopologyBuilder。 (不过该方法在0.82版本中显示为已过期,不建议使用)

 1 package com.storm.drpc;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.LocalDRPC;
 6 import backtype.storm.StormSubmitter;
 7 import backtype.storm.drpc.LinearDRPCTopologyBuilder;
 8 import backtype.storm.topology.BasicOutputCollector;
 9 import backtype.storm.topology.OutputFieldsDeclarer;
10 import backtype.storm.topology.base.BaseBasicBolt;
11 import backtype.storm.tuple.Fields;
12 import backtype.storm.tuple.Tuple;
13 import backtype.storm.tuple.Values;
14
15 public class BasicDRPCTopology {
16
17    /**
18     *写多客户端的数据要做什么处理 ,就可以
19     *
20     */
21   public static class ExclaimBolt extends BaseBasicBolt {
22     private static final long serialVersionUID = 1L;
23
24     @Override
25     public void execute(Tuple tuple, BasicOutputCollector collector) {
26       String input = tuple.getString(1);
27
28       //第一列request请求ID
29       collector.emit(new Values(tuple.getValue(0), input + "!"));
30
31     }
32
33     @Override
34     public void declareOutputFields(OutputFieldsDeclarer declarer) {
35       declarer.declare(new Fields("id", "result"));
36     }
37   }
38
39   public static void main(String[] args) throws Exception {
40
41     /**
42         构建spout;
43         向DRPC Server返回结果;
44         为Bolt提供函数用于对tuples进行处理。
45      */
46     LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
47     //没有数据源,只是封装了一个算法模块
48     builder.addBolt(new ExclaimBolt(), 3);
49
50     Config conf = new Config();
51
52     if (args == null || args.length == 0) {
53
54      //本地DRPC模式
55       LocalDRPC drpc = new LocalDRPC();
56       LocalCluster cluster = new LocalCluster();
57
58       cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
59
60       //客户端的代码
61       for (String word : new String[]{ "hello", "goodbye" }) {
62
63           System.err.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
64       }
65
66       cluster.shutdown();
67       drpc.shutdown();
68     }
69     else {
70
71       //远程DRPC提交,远程提交必须先启动服务
72       conf.setNumWorkers(3);
73       StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
74     }
75   }
76 }

创建方法二:

直接使用 Storm 提供的通用TopologyBuilder。 不过需要自己手动加上开始的DRPCSpout和结束的ReturnResults。
其实Storm 提供的LinearDRPCTopologyBuilder也是通过这种封装而来的。

 1 package com.storm.drpc;
 2
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.LocalDRPC;
 6 import backtype.storm.StormSubmitter;
 7 import backtype.storm.drpc.DRPCSpout;
 8 import backtype.storm.drpc.ReturnResults;
 9 import backtype.storm.generated.AlreadyAliveException;
10 import backtype.storm.generated.InvalidTopologyException;
11 import backtype.storm.topology.BasicOutputCollector;
12 import backtype.storm.topology.OutputFieldsDeclarer;
13 import backtype.storm.topology.TopologyBuilder;
14 import backtype.storm.topology.base.BaseBasicBolt;
15 import backtype.storm.tuple.Fields;
16 import backtype.storm.tuple.Tuple;
17 import backtype.storm.tuple.Values;
18
19
20 public class ManualDRPC {
21   public static class ExclamationBolt extends BaseBasicBolt {
22
23     private static final long serialVersionUID = 1L;
24
25     @Override
26     public void declareOutputFields(OutputFieldsDeclarer declarer) {
27       declarer.declare(new Fields("result", "return-info"));
28     }
29
30     @Override
31     public void execute(Tuple tuple, BasicOutputCollector collector) {
32       String arg = tuple.getString(0);
33       Object retInfo = tuple.getValue(1);
34       collector.emit(new Values(arg + "!!!", retInfo));
35     }
36
37   }
38
39   public static void main(String[] args) {
40
41     TopologyBuilder builder = new TopologyBuilder();
42     LocalDRPC drpc = new LocalDRPC();
43
44
45   //远程DRPC提交,远程提交必须先启动服务
46     if (args.length > 0) {
47             //开始的Spout
48             DRPCSpout spout = new DRPCSpout("exclamation");
49             builder.setSpout("drpc", spout);
50             //真正处理的Bolt
51             builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
52             //结束的ReturnResults
53             builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
54
55             Config conf = new Config();
56             try {
57                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
58             } catch (AlreadyAliveException e) {
59                 e.printStackTrace();
60             } catch (InvalidTopologyException e) {
61                 e.printStackTrace();
62             }
63     }else {
64
65
66         //本地DRPC提交
67         DRPCSpout spout = new DRPCSpout("exclamation", drpc);
68         builder.setSpout("drpc", spout);
69         builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
70         builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
71
72         LocalCluster cluster = new LocalCluster();
73         Config conf = new Config();
74         cluster.submitTopology("exclaim", conf, builder.createTopology());
75
76          System.err.println(drpc.execute("exclamation", "aaa"));
77          System.err.println(drpc.execute("exclamation", "bbb"));
78     }
79
80   }
81 }

客户端调用代码:

 1 package com.storm.drpc;
 2
 3 import org.apache.thrift7.TException;
 4
 5 import backtype.storm.generated.DRPCExecutionException;
 6 import backtype.storm.utils.DRPCClient;
 7 /**
 8  * DRPC客户端调用代码
 9  */
10 public class MyDRPCclient {
11
12     /**
13      指定DRPC地址和端口,storm.yaml文件的配置:
14     drpc.servers:
15        -  "drpcserver1"
16        -  "drpcserver12"
17      */
18     public static void main(String[] args) {
19
20         DRPCClient client = new DRPCClient("192.168.1.107", 3772);
21         try {
22             String result = client.execute("exclamation", "hello,world");
23
24             System.out.println(result);
25         } catch (TException e) {
26             e.printStackTrace();
27         } catch (DRPCExecutionException e) {
28             e.printStackTrace();
29         }
30
31     }
32
33 }

在Storm执行 storm jar   ./storm_drpc.jar   com.storm.drpc.ManualDRPC    testDrpc

运行结果  :  hello,world !!!

时间: 2024-10-27 05:40:00

DRPC详解的相关文章

Storm高级原语(二) -- DRPC详解

Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU密集型(CPU intensive)的计算任务.DRPC的stormtopology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流. DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语stream.spout.bolt. topology而成的一种模式(pattern).本来应该把DRPC单独打成一个包的, 但是DRPC实在是太有用了,所以我们我们把它和storm捆绑

Storm配置项详解

什么是Storm? Storm是twitter开源的一套实时数据处理框架,基于该框架你可以通过简单的编程来实现对数据流的实时处理变换. Storm的配置文件一般存放在$STORM_HOME/conf下,通常名为storm.yaml,它符合yaml格式要求. 配置项详解: 以下是从storm的backtype.storm.Config类中搜集的所有storm支持的配置项(Based storm 0.6.0): 配置项 配置说明 storm.zookeeper.servers ZooKeeper服务

Spring事务管理(详解+实例)

写这篇博客之前我首先读了<Spring in action>,之后在网上看了一些关于Spring事务管理的文章,感觉都没有讲全,这里就将书上的和网上关于事务的知识总结一下,参考的文章如下: Spring事务机制详解 Spring事务配置的五种方式 Spring中的事务管理实例详解 1 初步理解 理解事务之前,先讲一个你日常生活中最常干的事:取钱. 比如你去ATM机取1000块钱,大体有两个步骤:首先输入密码金额,银行卡扣掉1000元钱:然后ATM出1000元钱.这两个步骤必须是要么都执行要么都

转载:DenseNet算法详解

原文连接:http://blog.csdn.net/u014380165/article/details/75142664 参考连接:http://blog.csdn.net/u012938704/article/details/53468483 本文这里仅当学习笔记使用,具体细节建议前往原文细度. 论文:Densely Connected Convolutional Networks 论文链接:https://arxiv.org/pdf/1608.06993.pdf 代码的github链接:h

MariaDB(MySQL)创建、删除、选择及数据类型使用详解

一.MariaDB简介(MySQL简介略过) MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可 MariaDB的目的是完全兼容MySQL,包括API和命令行,使之能轻松成为MySQL的代替品.在存储引擎方面,使用XtraDB(英语:XtraDB)来代替MySQL的InnoDB. MariaDB由MySQL的创始人Michael Widenius(英语:Michael Widenius)主导开发,他早前曾以10亿美元的价格,将自己创建的公司MySQL A

HttpServletResponse和HttpServletRequest详解

HttpServletResponse,HttpServletRequest详解 1.相关的接口 HttpServletRequest HttpServletRequest接口最常用的方法就是获得请求中的参数,这些参数一般是客户端表单中的数据.同时,HttpServletRequest接口可以获取由客户端传送的名称,也可以获取产生请求并且接收请求的服务器端主机名及IP地址,还可以获取客户端正在使用的通信协议等信息.下表是接口HttpServletRequest的常用方法. 说明:HttpServ

POSIX 线程详解(经典必看)

总共三部分: 第一部分:POSIX 线程详解                                   Daniel Robbins ([email protected]), 总裁/CEO, Gentoo Technologies, Inc.  2000 年 7 月 01 日 第二部分:通用线程:POSIX 线程详解,第 2部分       Daniel Robbins ([email protected]), 总裁/CEO, Gentoo Technologies, Inc.  20

.NET深入解析LINQ框架(五:IQueryable、IQueryProvider接口详解)

阅读目录: 1.环路执行对象模型.碎片化执行模型(假递归式调用) 2.N层对象执行模型(纵横向对比链式扩展方法) 3.LINQ查询表达式和链式查询方法其实都是空壳子 4.详细的对象结构图(对象的执行原理) 5.IQueryable<T>与IQueryProvider一对一的关系能否改成一对多的关系 6.完整的自定义查询 1]. 环路执行对象模型.碎片化执行模型(假递归式调用) 这个主题扯的可能有点远,但是它关系着整个LINQ框架的设计结构,至少在我还没有搞懂LINQ的本意之前,在我脑海里一直频

netstat状态详解

一.生产服务器netstat tcp连接状态................................................................................ 2 1.1生产服务器某个业务LVS负载均衡上连接状态数量............................................... 2 1.2生产服务器某个业务web上连接状态数量...............................................