storm DRPC例子

1,DRPC原理

客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函 数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结
果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

2,例子

package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class ManualDRPC {
  public static class ExclamationBolt extends BaseBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("result", "return-info"));
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String arg = tuple.getString(0);
      Object retInfo = tuple.getValue(1);
      collector.emit(new Values(arg + "!!!", retInfo));
    }

  }

  public static void main(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
    LocalDRPC drpc = new LocalDRPC();

    DRPCSpout spout = new DRPCSpout("exclamation", drpc);
    builder.setSpout("drpc", spout);
    builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
    builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");

    LocalCluster cluster = new LocalCluster();
    Config conf = new Config();
    cluster.submitTopology("exclaim", conf, builder.createTopology());

    System.out.println(drpc.execute("exclamation", "aaa"));
    System.out.println(drpc.execute("exclamation", "bbb"));

  }
}

备注:DRPCSpout的名字与drpc.execute指定运行的名字一致

时间: 2024-10-08 10:17:16

storm DRPC例子的相关文章

Storm入门(十二)Twitter Storm: DRPC简介

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: http://xumingming.sinaapp.com/756/twitter-storm-drpc/ 本文翻译自: https://github.com/nathanmarz/storm/wiki/Distributed-RPC . Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算.DRPC的storm topology以函数的参数

storm经典例子的wordcount的实现

storm有个经典的例子wordcount,其实这几乎可以说是大数据的经典例子了,mapreduce也会有这个例子.但是storm给的例子包里的WordCountTopology用到了python的调用,直接用eclipse跑起来的话会报错,这里做了个小改动. 1.WordCountTopology.java package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; impor

Storm完整例子

import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.utils.Utils; import backtype.storm.Config; import backtype.storm.LocalCluster; impo

storm的例子,一个非常好的网址

https://insight.io/github.com/apache/storm/tree/HEAD/examples/storm-elasticsearch-examples/src/main/java/org/ http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/ https://www.ctolib.com/ https://www.bennadel.

Storm集群的DRPC模式

storm的DRPC模式的作用是实现从远程调用storm集群的计算资源,而不需要连接到集群的某一个节点.OK.那么storm实现DRPC主要是使用LinearDRPCTopologyBuilder这个类.下面就先来看看一个简单的例子,它的源码的github上. import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.stor

Storm高级原语(二) -- DRPC详解

Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU密集型(CPU intensive)的计算任务.DRPC的stormtopology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流. DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语stream.spout.bolt. topology而成的一种模式(pattern).本来应该把DRPC单独打成一个包的, 但是DRPC实在是太有用了,所以我们我们把它和storm捆绑

Storm实验 -- DRPC的使用

1. 修改所有节点的 storm.yaml 配置文件,设置 drpc server [[email protected] ~]$ vim storm-0.9.4/conf/storm.yaml drpc.servers: - "hadoop4" 2. 启动drcp服务 storm drpc & 3. 编写服务端程序 //来源于 https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starte

storm 事务和DRPC结合

示例代码: package com.lky.topology; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import b

Storm 中drpc调用

package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.DRPCSpout; import backtype.storm.task.ShellBolt; import backtyp