Storm的DRPC

RPC:Remote Procedure Call

DRPC:Distributed RPC

Hadoop提供了RPC的实现机制,实现方法见:《》,本文主要介绍Storm的DRPC。

1、Storm DRPC工作流程

Storm的DRPC由DRPC Server和一个特殊的topology组成。DRPC Server主要起协调客户端和topology的作用,其接收客户端的请求,并将请求发送给topology,然后将topology的处理结果返回给客户端。

下面是官网给出的流程图:

1)客户端向DRPC Server发送请求,发送了想要执行的方法及其参数。

2)DRPC topology提供了一个特殊的spout DRPCSpout,DRPC Server将客户端的请求信息(函数、参数)发送给DRPCSpout,客户端请求进入了topology的处理。

3)DRPC topology的一系列bolt开始处理请求,最后一个bolt是ReturnResult,它负责将处理结果发送给DRPC Server。

4)DRPC Server将结果发送给客户端。

上面的流程也可以具体化为:

其中蓝色的bolt是需要用户自己定义的。

2、DRPC Server

(1)、编辑$STORM_HOME/conf/storm.yaml,配置drpc.servers,Storm会在配置的这些机器上启动DRPC Server


1

2

3

drpc.servers:

     - "vm1"

     - "vm2"

注意:drpc.servers:前面不能有空格(yaml文件格式)

(2)、在上面配置的那些机器上运行DRPC Server


1

storm drpc

3、DRPC topology

1)用户需要定义自己的bolt来执行自己的业务逻辑,如下面的MyBolt.java简单的将输入字符串转换成大写:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

package com.test.storm.bolt;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

public class MyBolt extends BaseBasicBolt{

    private static final long serialVersionUID = 1L;

    

    @Override

    public void execute(Tuple input, BasicOutputCollector collector) {

        try {

            //input里有两个字段:{request:6170525749586968710,args:hello}

            //request字段是Long类型, args是String类型

            Long id = input.getLong(0);

            System.out.println("request id: " + id);

            

            String args = input.getString(1);

            System.out.println("args: " + args);

            

            String result = args.toUpperCase();

            

            //如果这个bolt是最有一个用户的bolt,则必须是两个字段id、result

            //如果是中间bolt,则第一个字段必须是id

            //第一个字段是request的id,第二个字段是处理后的结果

            collector.emit(new Values(id, result));

        catch (Exception e) {

            e.printStackTrace();

        }

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        //如果这个bolt是最有一个用户的bolt,则必须是两个字段,建议是id、result(经测试,字段名字可以随意)

        //如果是中间bolt,则第一个字段必须是id

        //后面的内置bolt会根据字段位置获取值,0、1

        declarer.declare(new Fields("id""result"));

    }

}

代码中需要注意的地方和解释。

2)构建topology

Storm提供了LinearDRPCTopologyBuilder类来创建这个特殊的topology,它的作用有:

a、创建DRPCSpout

b、创建ReturnResult,返回topology的处理结果

c、提供有限的聚合操作


1

2

3

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(FUNC_NAME);

builder.addBolt(new MyBolt());

StormTopology drpcTopology = builder.createLocalTopology(drpcServer);

4、运行模式

1)本地模式

本地模式会在一个进程中模式DRPC Server,不需要绑定到端口,必须使用LocalDRPC对象才能调用方法,本地模式仅供测试用。

LocalDRPCTest.java:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

package com.test.storm.drpc;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.drpc.LinearDRPCTopologyBuilder;

import backtype.storm.generated.StormTopology;

import com.test.storm.bolt.MyBolt;

public class LocalDRPCTest {

    private static final String FUNC_NAME = "upper";

    public static void main(String[] args) {

        LocalDRPC drpcServer = new LocalDRPC();

        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(FUNC_NAME);

        builder.addBolt(new MyBolt());

        StormTopology drpcTopology = builder.createLocalTopology(drpcServer);

        

        LocalCluster cluster = new LocalCluster();

        Config config = new Config();

        config.setDebug(true);

        cluster.submitTopology("drpcupper", config, drpcTopology);

        

        String result = drpcServer.execute(FUNC_NAME, "hello");

        System.out.println("result: " + result);

        

        drpcServer.shutdown();

        cluster.shutdown();

    }

}

2)远程模式

RemoteDRPCTest.java如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

package com.test.storm.drpc;

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.drpc.LinearDRPCTopologyBuilder;

import com.test.storm.bolt.MyBolt;

public class RemoteDRPCTest {

    private static final String FUNC_NAME = "upper";

    public static void main(String[] args) throws Exception {

        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(FUNC_NAME);

        builder.addBolt(new MyBolt());

        

        StormSubmitter.submitTopology("drpcupper"new Config(), builder.createRemoteTopology());

    }

}

在真实的Storm集群上运行,需要如下三个步骤:

(1)如上面(2、DRPC Server)说明的那样配置并运行DRPC Server

(2)提交DRPC topology到Storm集群上


1

storm jar drpc.jar com.test.storm.drpc.RemoteDRPCTest

(3)客户端调用程序

DRPCClientTest:


1

2

3

4

5

6

7

8

9

10

11

12

13

package com.test.storm.drpc;

import backtype.storm.utils.DRPCClient;

public class DRPCClientTest {

    public static void main(String[] args) throws Exception {

        DRPCClient client = new DRPCClient("vm1"3772);

        String result = client.execute("upper""hellmmo");

        System.out.println(result);

    }

}

5、补充

一次可以部署多个DRPC:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

package com.test.storm.drpc;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.drpc.LinearDRPCTopologyBuilder;

import backtype.storm.generated.StormTopology;

import com.test.storm.bolt.MyBolt;

public class LocalDRPCMutipleTest {

    public static void main(String[] args) {

        LocalDRPC drpcServer = new LocalDRPC();

        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("upper");

        builder.addBolt(new MyBolt());

        StormTopology drpcTopology = builder.createLocalTopology(drpcServer);

        

        LinearDRPCTopologyBuilder builder2 = new LinearDRPCTopologyBuilder("upper2");

        builder2.addBolt(new MyBolt());

        StormTopology drpcTopology2 = builder2.createLocalTopology(drpcServer);

        

        LocalCluster cluster = new LocalCluster();

        Config config = new Config();

        config.setDebug(true);

        cluster.submitTopology("drpcupper", config, drpcTopology);

        cluster.submitTopology("drpcupper2", config, drpcTopology2);

        

        String result = drpcServer.execute("upper""hello");

        System.out.println("result1: " + result);

        String result2 = drpcServer.execute("upper2""hello");

        System.out.println("result2: " + result);

        

        drpcServer.shutdown();

        cluster.shutdown();

    }

}

附件:http://files.cnblogs.com/files/lishouguang/storm_drpc_source.zip

时间: 2024-08-25 08:59:20

Storm的DRPC的相关文章

Storm实验 -- DRPC的使用

1. 修改所有节点的 storm.yaml 配置文件,设置 drpc server [[email protected] ~]$ vim storm-0.9.4/conf/storm.yaml drpc.servers: - "hadoop4" 2. 启动drcp服务 storm drpc & 3. 编写服务端程序 //来源于 https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starte

Storm 中drpc调用

package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.DRPCSpout; import backtype.storm.task.ShellBolt; import backtyp

Storm集群的DRPC模式

storm的DRPC模式的作用是实现从远程调用storm集群的计算资源,而不需要连接到集群的某一个节点.OK.那么storm实现DRPC主要是使用LinearDRPCTopologyBuilder这个类.下面就先来看看一个简单的例子,它的源码的github上. import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.stor

storm源码之storm代码结构【译】【转】

[原]storm源码之storm代码结构[译] 说明:本文翻译自Storm在GitHub上的官方Wiki中提供的Storm代码结构描述一节Structure of the codebase,希望对正在基于Storm进行源码级学习和研究的朋友有所帮助. Storm的源码共分为三个不同的层次. 首先,Storm在设计之初就考虑到了兼容多语言开发.Nimbus是一个thrift服务,topologies被定义为Thrift结构体.Thrift的运用使得Storm可以被任意开发语言使用. 其次,Stor

storm源码之storm代码结构【译】

说明:本文翻译自Storm在GitHub上的官方Wiki中提供的Storm代码结构描述一节Structure of the codebase,希望对正在基于Storm进行源码级学习和研究的朋友有所帮助. Storm的源码共分为三个不同的层次. 首先,Storm在设计之初就考虑到了兼容多语言开发.Nimbus是一个thrift服务,topologies被定义为Thrift结构体. Thrift优势 : 使得Storm可以被任意开发语言使用. 其次,Storm的所有接口都是Java语言来定义的.因此

STORM_0008_Structure-of-the-codebase_Storm的代码库的结构

http://storm.apache.org/releases/1.0.1/Structure-of-the-codebase.html Structure of the codebase 源码分成独立的三层 第一:在最开始的时候Storm就被设计成支持多种语言,Nimbus是一种Thrift的服务,Topologies是一种Thrift类型的结构.Thrift的使用使得Storm可以被任何语言使用. 第二:所有的Storm的接口都被声明为java接口,虽然内部的实现有很多的Clojure但是

Hortonworks(HDP)关闭不须要的组件(服务)

Hortonworks(HDP)设置开机启动的组件(服务)是在一个makefile(.mf)文件里配置的,这个文件位于: /usr/lib/hue/tools/start_scripts/start_deps.mf 我们仅仅须要改动这一个文件就能够,所以.在此之前.我们最好先做一个备份: cp /usr/lib/hue/tools/start_scripts/start_deps.mf /usr/lib/hue/tools/start_scripts/start_deps.mf.bak 假设我们

storm 事务和DRPC结合

示例代码: package com.lky.topology; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import b

storm DRPC例子

1,DRPC原理 客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数.实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函 数调用流.每个函数调用被DRPC服务器标记了一个唯一的id. 这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结 果发送给DRPC服务器(通过那个唯一的id标识).DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送