storm翻译(3)Distributed RPC(分布式远程调用)

原文地址:http://storm.apache.org/documentation/Distributed-RPC.html

分布式RPC的目的是在storm进行大量的实时计算时,能够并行的调用storm上的函数。Storm topology可以将函数参数作为输入Stream,并且将被调用方法产生的结果作为返回发送出去。

与其说DRPC是storm的一个特点,不如说它只是storm基本概念如steams,spouts,bolts和topologies的一种表达方式。DRPC可以独立于storm作为一个库发布,但在和storm捆绑在一起时将会非常有用。

高级概述

分布式RPC被“DRPC server”实现(storm包中已经有了对应的实例)。DRPC server协调接收一个RPC请求并将这个请求发送给storm topology,然后接收storm topology算出的结果,再将结果发送给等待中的客户端。从客户端的视角来看,分布式RPC调用过程跟普通的RPC调用过程一样。举例:这里有一个客户端调用“reach”方法,输入参数是http://twitter.com,然后得到计算结果的例子。

DRPCClient client = new DRPCClient("drpc-host", 3772);

String result = client.execute("reach", "http://twitter.com");

这个DRPC的工作流可以描述为:

一个客户端向DRPC Server发送了想要调用的方法名称和方法参数。实现了这个方法的topology用一个DRPCSpout从DRPC Server接收了函数调用Stream。每个远程方法在DRPC Server上都有一个唯一的ID。Topology计算出结果之后,使用一个ReturnResults的bolt连接DPRC Server后,将结果交给它。DRPC Server根据方法ID匹配出结果,然后唤醒等待的客户端,将结果发送给客户端

LinearDRPCTopologyBuilder(线性)

Storm中有一个LinearDRPCTopologyBuilder 的topology 生成,已经自动实现了DPRC调用过程中的绝大部分。包括:

1:配置spout

2:将结果返回给DPRC Server

3:为bolt提供了简单的tuple之间的聚合操作

让我们看一个简单的例子,这里有一个DPRC topology的实现,他可以给输入的参数后面添加一个“!”。

public static class ExclaimBolt extends BaseBasicBolt {

public void execute(Tuple tuple, BasicOutputCollector collector) {

String input = tuple.getString(1);

collector.emit(new Values(tuple.getValue(0), input + "!"));

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("id", "result"));

}

}

public static void main(String[] args) throws Exception {

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");

builder.addBolt(new ExclaimBolt(), 3);

// ...

}

就像我们看到的,代码非常简单。当我们创建一个LinearDRPCTopologyBuilder时,你告诉它实现了DRPC功能的topology的名字。一个DRPC server可以配置很多topology名称,这些名称不能重复。第一个bolt(不在代码中,应该是LinearDRPCTopologyBuilder内部的—译者注)会接受2个tuples,第一个属性是请求id,第二个是方法的参数。LinearDRPCTopologyBuilder中最后一个bolt返回了2个tuples的输出Stream,Stream的格式为[id, result],当然了,过程中产生的所有tuple的第一个属性都是请求id。

在这个例子中,ExclaimBolt只是简单的在tuple的第二种属性上添加了一个“!”。LinearDRPCTopologyBuilder完成了连接DRPC server并返回结果的其他过程。

本地DRPC模式

DRPC可以在本地运行,例子:

LocalDRPC drpc = new LocalDRPC();

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

System.out.println("Results for ‘hello‘:" + drpc.execute("exclamation", "hello"));

cluster.shutdown();

drpc.shutdown();

第一步,创建一个LocalDRPC对象。这个队形模拟DPRC server的运行,就像LocalCluster模拟集群运行一样。然后你创建一个LocalCluster在本地运行topology。LinearDRPCTopologyBuilder有独立的方法分布创建本地topology和远程topology。本地运行的LocalDRPC不需要绑定端口,因为在本地topology不需要端口来传递对象。这就是为什么createLocalTopology将LocalDRPC作为输入。

启动之后,你就可以看到DRPC调用过程在本地执行。

远程DRPC模式

在实际的集群上使用DRPC也非常简单,只需要三步:

1:启动DRPC server

2:配置DRPC server地址

3:将DRPC topology提交到storm集群

启动DRPC server就行启动Nimbus或UI一样简单
bin/storm drpc

接下来,你需要为storm集群配置DRPC的地址,才能DRPCSpout让知道在哪里读取方法调用。可以在storm.yaml中配置或者通过topology配置。在storm.yaml中配置如下

drpc.servers:
  - "drpc1.foo.com"
  - "drpc2.foo.com"

最后,你要通过StormSubmitter启动DPRC topology,就想启动任何topology一样。远程模式运行上面的例子你可以:

StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology用来为storm集群创建topology。

一个更复杂的例子

上面那个感叹号例子用来熟悉DPRC的概念还是过于简单。让我们看一个更复杂的例子,一个真正需要通过并行运行storm来计算的DRPC方法。例子就是计算Twitter上的URL的reach。

URL的reach就是这个URL暴漏给多少用户。为了计算reach,你需要:

1:获取这个URL的所有twitter

2:获取这些twitter的follower

3:去掉重复的follower

4:计算每个URL的follower

一个真实的计算需要数千次的数据库交互和上百万的flowwer记录的计算。这是非常非常碉堡的计算。但正如你所看到的,基于storm实现这个功能却非常的简单。在单台机器上,reach要花上数分钟来计算;但是在storm集群上,你可以在数秒钟就计算出最难算的URL的reach。

在storm-starter上有一个reach topology(here)样例,这里告诉你如何定义一个reach topology。

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");

builder.addBolt(new GetTweeters(), 3);

builder.addBolt(new GetFollowers(), 12)

.shuffleGrouping();

builder.addBolt(new PartialUniquer(), 6)

.fieldsGrouping(new Fields("id", "follower"));

builder.addBolt(new CountAggregator(), 2)

.fieldsGrouping(new Fields("id"));

这个topology计算一共需要四步:

1:GetTweeters获取了tweeted这个URL的用户。它将输入stream[id, url]转换成了一个输出Stream[id, tweeter]。每一个URL映射了多个用户tuple。

2:获取了每一个tweeter的followers(粉丝)。它将输入stream[id, tweeter]转换成了一个输出Stream [id, follower]。经过这个过程,由于一个follower是同时tweet了同一个URL的多个用户的粉丝,就会产生一些重复的follower tuple。

3.PartialUniquer将follower Stream按照follower和id分组,保证同一个follower会进入同一个task。所以每一个PartialUniquer 的task都会接收到相互独立的follower集合。当PartialUniquer接收了所有根据request id分配给它的follower tuples,它就会将去重之后的follower集合的数量发射出去。

4.最终,CountAggregator接收了每一个PartialUniquer task发射的数量,并且通过计算总和来完成reach的计算过程。

下面来看一下PartialUniquer bolt:

public class PartialUniquer extends BaseBatchBolt {

BatchOutputCollector _collector;

Object _id;

Set<String> _followers = new HashSet<String>();

@Override

public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {

_collector = collector;

_id = id;

}

@Override

public void execute(Tuple tuple) {

_followers.add(tuple.getString(1));

}

@Override

public void finishBatch() {

_collector.emit(new Values(_id, _followers.size()));

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("id", "partial-count"));

}

}

PartialUniquer通过继承BaseBatchBolt实现了IBatchBolt接口。批处理bolt提供了一流的API可以将批量的tuples当作一个批次来处理。每一个request id都会创建一个batch bolt实例,而storm负责在适当的时候清理这些实例。

每当PartialUniquer在execute方法中接收到一个follower tuple,它就将follower放到request id对应的set集合中。

当发射到这个bolt的所有tuples都被处理之后,batch bolts中的finishBatch方法将会被调用。 PartialUniquer发射了一个包含follower数量的tuple。

在后台,CoordinatedBolt用来判断bolt是否全部接受了指定request id的所有tuple。CoordinatedBolt利用直接Stream来管理这次协调。

这个topology的剩余部分就显而易见了。就像你看到的,reach计算的每一步都是并行计算,而且实现DRPC topology是多么的简单。

Non-linear(非线性)DRPC topologies

LinearDRPCTopologyBuilder只能处理线下的DRPC topologies:整个计算可以分割为多个独立的顺序步骤。它很难处理包含有bolt分支和bolt合并的复杂topology。目前,为了实现复杂的功能,你只能通过直接使用CoordinatedBolt。

LinearDRPCTopologyBuilder如何工作

DRPCSpout发射了[args, return-info].return-info中包含DRPC的地址和端口,就像DRPC的id一样。

构建一个topology包含:

  • DRPCSpout
  • PrepareRequest(准备请求:产生一个request id并为return-info和参数分别创建一个Stream)
  • CoordinatedBolt wrappers and direct groupings(CoordinatedBolt封装和直接分组)
  • JoinResult(将result-info加入结果)
  • ReturnResult(连接DRPC server 并返回结果)

LinearDRPCTopologyBuilder是一个非常好storm高级抽象的例子

高级

KeyedFairBolt封装同时处理多个请求

如何直接使用CoordinatedBolt

时间: 2024-08-06 15:48:07

storm翻译(3)Distributed RPC(分布式远程调用)的相关文章

Atitit.分布式远程调用&#160;&#160;rpc &#160;rmi &#160;CORBA的关系

Atitit.分布式远程调用  rpc  rmi  CORBA的关系 1. 远程调用(包括rpc,rmi,rest)1 2. 分布式调用大体上就分为两类,RPC式的,REST式的1 3. RPC(远程过程调用)是什么 1 4. 传输的数据2 5. 序列化与反序列化3 6. ref  谁能用通俗的语言解释一下什么是 RPC 框架? - Java - 知乎.html3 1. 远程调用(包括rpc,rmi,rest) RPC的协议有很多,比如最早的CORBA,Java RMI,Web Service的

[UE4]RPC,远程调用

RPC 一.Remote Procedure Call:远程程序调用 二.一个进程调用另外一个进程上的函数 由于“Server-shoot”方法被标记为“在服务器上运行”,所以尽管是在第二个窗口(客户端)开火,输出的信息是:Server:准备射击,表明这是在服务器上运行. 在服务器上“Server-shoot”方法又调用调用了“MutiCast-Shoot”方法,“MutiCast-Shoot”(被标记为“多路传送”),所以服务器会通知所有客户端都去执行这个方法. 结果就显示:Server:全体

Spring Boot 2 整合 Dubbo 框架 ,实现 RPC 服务远程调用

一.Dubbo框架简介 1.框架依赖 图例说明: 1)图中小方块 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表层或模块,蓝色的表示与业务有交互,绿色的表示只对 Dubbo 内部交互. 2)图中背景方块 Consumer, Provider, Registry, Monitor 代表部署逻辑拓扑节点. 3)图中蓝色虚线为初始化时调用,红色虚线为运行时异步调用,红色实线为运行时同步调用. 4)图中只包含 RPC

远程调用时的分布式事物问题

前提 所有服务均有独立的事物管理机制,相互间没有任何关联. 所有业务接口都有对应的补偿方法,用于将已经更新的数据还原到上一次的状态. 本次实例为同步业务,理想状态下,只有全部成功或全部失败两种情况. 正式开始 正常流程 一切安好. 中途异常 - 补偿成功 虽然发生了失败,但所有补偿都成功了.没有什么问题 中途异常 - 补偿失败 此时,主服务有三种处理方法 主服务无限重试补偿方法,直到补偿成功. 这里有很麻烦的问题,如果下游的服务器已经停机,此时主服务的无限重试已经没有意义.在最坏的情况下,如果主

Java[2] 分布式服务架构之java远程调用技术浅析(转http://www.uml.org.cn/zjjs/201208011.asp)

转自:http://www.uml.org.cn/zjjs/201208011.asp 在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI.MINA.ESB.Burlap.Hessian.SOAP.EJB和JMS等,这些名词之间到底是些什么关系呢,它们背后到底是基于什么原理实现的呢,了解这些是实现分布式服务框架的基础知识,而如果在性能上有高的要求的话,那深入了解这些技术背后的机制就是必须的了,在这篇blog中我们将来一探究竟,抛

【PHP】远程调用以及RPC框架

前言 一个项目,从开始到版本更新,一直到最后的版本维护.功能在不断增多,对应的代码量也在不断增加,也就意味着项目变得更不可维护,这时候,我们需要用拆分的方式将一个项目打散,以便开发团队更好的对项目进行维护. 分模块 这个阶段,一般也是项目的初级阶段,由于人手不够,一个服务端的接口项目只有一个开发进行维护,根据开发的习惯,会把项目分成若干个模块进行开发,在一个项目下进行部署. 这样做的缺点在于项目会随着版本更新而变得不可维护. 分项目 随着每个模块功能的不断完善,代码变得更加臃肿.这时候需要对项目

分布式对象和远程调用

一.引言 1.1 分布式对象技术要解决的基本问题 分布式对象技术是在面向对象技术的基础上发展起来的,它要解决的主要问题是位于不同进程中的对象之间的调用问题. 支持访问异地对象,支持访问异构对象(java平台调用C.vb,C++). 1.2 中间件 参考http://kb.cnblogs.com/page/196448/ 1.中间件是一种软件,它提供基本的通信模块和其他一些基础服务模块,使得应用程序开发提供平台. 中间件技术提供了一个编程的抽象,来屏蔽上述的异质问题. 通信协议:独立于网络底层的传

分布式服务架构之java远程调用技术浅析

分布式服务架构之java远程调用技术浅析     在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI.MINA.ESB.Burlap.Hessian.SOAP.EJB和JMS等,这些名词之间到底是些什么关系呢,它们背后到底是基于什么原理实现的呢,了解这些是实现分布式服务框架的基础知识,而如果在性能上有高的要求的话,那深入了解这些技术背后的机制就是必须的了,在这篇blog中我们将来一探究竟,抛砖引玉,欢迎大家提供更多的实现远程通讯

RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC) [转]

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie