什么是DRPC?
分布式RPC(distributed RPC,DRPC)用于对Storm上大量的函数调用进行并行计算。
对于每一次函数调用,Storm集群上运行的拓扑接收调用函数的参数信息作为输入流,并将计算结果作为输出流发射出去。
一句话概括:Storm进行计算,根据客户端提交的请求参数,而返回Storm计算的结果。
DRPC通过DRPC Server来实现,DRPC Server的整体工作过程如下:
接收到一个RPC调用请求;
发送请求到Storm上的拓扑;
从Storm上接收计算结果;
将计算结果返回给客户端。
注:在client客户端看来,一个DRPC调用看起来和一般的RPC调用没什么区别
工作流程
Client向DRPC Server发送被调用执行的DRPC函数名称及参数;
Storm上的topology通过DRPCSpout实现这一函数,从DPRC Server接收到函数调用流;
DRPC Server会为每次函数调用生成唯一的id;
Storm上运行的topology开始计算结果,最后通过一个ReturnResults的Bolt连接到DRPC Server,发送指定id的计算结果;
DRPC Server通过使用之前为每个函数调用生成的id,将结果关联到对应的发起调用的client,将计算结果返回给client。
DRPC包括服务端和客户端两部分:
1)服务端
服务端由四部分组成:包括一个DRPC Server, 一个 DPRC Spout,一个Topology和一个ReturnResult。
在实际使用中,主要有三个步骤:
a.启动Storm中的DRPC Server;
首先,修改Storm/conf/storm.yaml中的drpc server地址;需要注意的是:必须修改所有Nimbus和supervisor上的配置文件,设置drpc server地址。否则在运行过程中可能无法返回结果。
然后,通过 storm drpc命令启动drpc server。
b.创建一个DRPC 的Topology,提交到storm中运行。
该Toplogy和普通的Topology稍有不同,可以通过两种方式创建:
创建方法一:直接使用 Storm 提供的LinearDRPCTopologyBuilder。 (不过该方法在0.82版本中显示为已过期,不建议使用)
1 package com.storm.drpc; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.LocalDRPC; 6 import backtype.storm.StormSubmitter; 7 import backtype.storm.drpc.LinearDRPCTopologyBuilder; 8 import backtype.storm.topology.BasicOutputCollector; 9 import backtype.storm.topology.OutputFieldsDeclarer; 10 import backtype.storm.topology.base.BaseBasicBolt; 11 import backtype.storm.tuple.Fields; 12 import backtype.storm.tuple.Tuple; 13 import backtype.storm.tuple.Values; 14 15 public class BasicDRPCTopology { 16 17 /** 18 *写多客户端的数据要做什么处理 ,就可以 19 * 20 */ 21 public static class ExclaimBolt extends BaseBasicBolt { 22 private static final long serialVersionUID = 1L; 23 24 @Override 25 public void execute(Tuple tuple, BasicOutputCollector collector) { 26 String input = tuple.getString(1); 27 28 //第一列request请求ID 29 collector.emit(new Values(tuple.getValue(0), input + "!")); 30 31 } 32 33 @Override 34 public void declareOutputFields(OutputFieldsDeclarer declarer) { 35 declarer.declare(new Fields("id", "result")); 36 } 37 } 38 39 public static void main(String[] args) throws Exception { 40 41 /** 42 构建spout; 43 向DRPC Server返回结果; 44 为Bolt提供函数用于对tuples进行处理。 45 */ 46 LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); 47 //没有数据源,只是封装了一个算法模块 48 builder.addBolt(new ExclaimBolt(), 3); 49 50 Config conf = new Config(); 51 52 if (args == null || args.length == 0) { 53 54 //本地DRPC模式 55 LocalDRPC drpc = new LocalDRPC(); 56 LocalCluster cluster = new LocalCluster(); 57 58 cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); 59 60 //客户端的代码 61 for (String word : new String[]{ "hello", "goodbye" }) { 62 63 System.err.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word)); 64 } 65 66 cluster.shutdown(); 67 drpc.shutdown(); 68 } 69 else { 70 71 //远程DRPC提交,远程提交必须先启动服务 72 conf.setNumWorkers(3); 73 StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); 74 } 75 } 76 }
创建方法二:
直接使用 Storm 提供的通用TopologyBuilder。 不过需要自己手动加上开始的DRPCSpout和结束的ReturnResults。
其实Storm 提供的LinearDRPCTopologyBuilder也是通过这种封装而来的。
1 package com.storm.drpc; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.LocalDRPC; 6 import backtype.storm.StormSubmitter; 7 import backtype.storm.drpc.DRPCSpout; 8 import backtype.storm.drpc.ReturnResults; 9 import backtype.storm.generated.AlreadyAliveException; 10 import backtype.storm.generated.InvalidTopologyException; 11 import backtype.storm.topology.BasicOutputCollector; 12 import backtype.storm.topology.OutputFieldsDeclarer; 13 import backtype.storm.topology.TopologyBuilder; 14 import backtype.storm.topology.base.BaseBasicBolt; 15 import backtype.storm.tuple.Fields; 16 import backtype.storm.tuple.Tuple; 17 import backtype.storm.tuple.Values; 18 19 20 public class ManualDRPC { 21 public static class ExclamationBolt extends BaseBasicBolt { 22 23 private static final long serialVersionUID = 1L; 24 25 @Override 26 public void declareOutputFields(OutputFieldsDeclarer declarer) { 27 declarer.declare(new Fields("result", "return-info")); 28 } 29 30 @Override 31 public void execute(Tuple tuple, BasicOutputCollector collector) { 32 String arg = tuple.getString(0); 33 Object retInfo = tuple.getValue(1); 34 collector.emit(new Values(arg + "!!!", retInfo)); 35 } 36 37 } 38 39 public static void main(String[] args) { 40 41 TopologyBuilder builder = new TopologyBuilder(); 42 LocalDRPC drpc = new LocalDRPC(); 43 44 45 //远程DRPC提交,远程提交必须先启动服务 46 if (args.length > 0) { 47 //开始的Spout 48 DRPCSpout spout = new DRPCSpout("exclamation"); 49 builder.setSpout("drpc", spout); 50 //真正处理的Bolt 51 builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc"); 52 //结束的ReturnResults 53 builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim"); 54 55 Config conf = new Config(); 56 try { 57 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 58 } catch (AlreadyAliveException e) { 59 e.printStackTrace(); 60 } catch (InvalidTopologyException e) { 61 e.printStackTrace(); 62 } 63 }else { 64 65 66 //本地DRPC提交 67 DRPCSpout spout = new DRPCSpout("exclamation", drpc); 68 builder.setSpout("drpc", spout); 69 builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc"); 70 builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim"); 71 72 LocalCluster cluster = new LocalCluster(); 73 Config conf = new Config(); 74 cluster.submitTopology("exclaim", conf, builder.createTopology()); 75 76 System.err.println(drpc.execute("exclamation", "aaa")); 77 System.err.println(drpc.execute("exclamation", "bbb")); 78 } 79 80 } 81 }
客户端调用代码:
1 package com.storm.drpc; 2 3 import org.apache.thrift7.TException; 4 5 import backtype.storm.generated.DRPCExecutionException; 6 import backtype.storm.utils.DRPCClient; 7 /** 8 * DRPC客户端调用代码 9 */ 10 public class MyDRPCclient { 11 12 /** 13 指定DRPC地址和端口,storm.yaml文件的配置: 14 drpc.servers: 15 - "drpcserver1" 16 - "drpcserver12" 17 */ 18 public static void main(String[] args) { 19 20 DRPCClient client = new DRPCClient("192.168.1.107", 3772); 21 try { 22 String result = client.execute("exclamation", "hello,world"); 23 24 System.out.println(result); 25 } catch (TException e) { 26 e.printStackTrace(); 27 } catch (DRPCExecutionException e) { 28 e.printStackTrace(); 29 } 30 31 } 32 33 }
在Storm执行 storm jar ./storm_drpc.jar com.storm.drpc.ManualDRPC testDrpc
运行结果 : hello,world !!!