avro入门之rpc

关于avro的rpc入门使用,官方给的资料实在是太少了,这个链接https://github.com/phunt/avro-rpc-quickstart有具体说明

现在对在java下使用总结如下:

参考:http://www.iteblog.com/archives/1008

http://my.oschina.net/zhzhenqin/blog/151040

http://www.bianceng.cn/Servers/web/201411/46469.htm

我这里没有使用maven,直接在项目中加入使用到的jar包有:avro-1.7.7.jar、avro-tools-1.7.7.jar、 jackson-core-asl-1.8.8.jar、jackson-mapper-asl-1.8.8.jar

当然,如果你需要,你也可以在Avro源码中进行编译,获取avro-1.7.7.jar和avro-tools-1.7.7.jar

Avro协议是以JSON结构性描述文本。协议定义了基本的通信的数据类型,名称。并且还包含可调用的方法等。首先定义协议文件

{
    "namespace":"avro",
    "doc":"This is a message.",
    "protocol":"messageProtocol",
    "name":"HelloWorld",
    "types":[
        {
            "name":"nameMessage",
            "type":"record",
            "fields":[ {"name":"name", "type":"string"} ]
        }
    ],
    "messages":{
        "sayHello":{
            "doc":"say Hello to manbers",
            "request":[ { "name":"name", "type":"string" } ],
            "response":"nameMessage"
        }
    }
}

保存在d盘 a.avro

然后编写服务端代码:

import java.io.File;

import org.apache.avro.Protocol;
import org.apache.avro.Protocol.Message;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.generic.GenericResponder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class AvroHttpServer extends GenericResponder {

    private static Log log = LogFactory.getLog(AvroHttpServer.class);

    public AvroHttpServer(Protocol protocol) {
        super(protocol);
    }

    public Object respond(Message message, Object request) throws Exception {
        GenericRecord req = (GenericRecord) request;
        GenericRecord reMessage = null;
        if (message.getName().equals("sayHello")) {
            Object name = req.get("name");
            //  do something...
            //取得返回值的类型
            reMessage = new GenericData.Record(super.getLocal().getType("nameMessage"));
            //直接构造回复
            reMessage.put("name", "Hello, " + name.toString());
            log.info(reMessage);
        }
        return reMessage;
    }

    public static void main(String[] args) throws Exception {
        int port = 8088;
        try {
            Server server = new HttpServer(
                    new AvroHttpServer(Protocol.parse(
//                            new File("helloword.json"))),
                    		new File("d:/a.avro"))),
                    port);
            server.start();
            server.join();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接下来编写客户端代码:

import java.io.File;
import java.net.URL;

import org.apache.avro.Protocol;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.generic.GenericRequestor;
import org.junit.Before;
import org.junit.Test;

public class b {
	private Protocol protocol;

    private GenericRequestor requestor = null;

    @Before
    public void setUp() throws Exception {
        protocol = Protocol.parse(new File("d:/a.avro"));
        Transceiver t = new HttpTransceiver(new URL("http://localhost:8088"));  //这里如果要在两台机器上运行记得把localhost改成服务端的ip
        requestor = new GenericRequestor(protocol, t);
    }

    @Test
    public void testSendMessage() throws Exception {
        GenericRecord requestData = new GenericData.Record(protocol.getType("nameMessage"));
        // initiate the request data
        requestData.put("name", "zhenqin");

        System.out.println(requestData);
        Object result = requestor.request("sayHello", requestData);
        if (result instanceof GenericData.Record) {
            GenericData.Record record = (GenericData.Record) result;
            System.out.println(record.get("name"));
        }
        System.out.println(result);
    }

}

上面先运行服务端在运行客户端,可以看到客户端收到消息。

{"name": "zhenqin"}
Hello, zhenqin
{"name": "Hello, zhenqin"}

以上参考http://my.oschina.net/zhzhenqin/blog/151040

至于用python编写跨语言的客户端,具体内容还有待研究

时间: 2024-10-16 16:53:23

avro入门之rpc的相关文章

rpc框架之avro教程1-hello world

avro是hadoop的一个子项目,提供的功能与thrift.Protocol Buffer类似,都支持二进制高效序列化,也自带RPC机制,但是avro使用起来更简单,无需象thrift那样生成目标语言源代码,目前支持的语言有java.c#.php.c++等(详情见:https://cwiki.apache.org/confluence/display/AVRO/Supported+Languages),hadoop生态圈中的hive.pig已经在使用avro pom.xml 1 <?xml v

Apache Avro? 1.7.6 Specification

Apache Avro? 1.7.6 Specification Introduction Schema Declaration Primitive Types Complex Types Records Enums Arrays Maps Unions Fixed Names Aliases Data Serialization Encodings Binary Encoding Primitive Types Complex Types JSON Encoding Sort Order Ob

Hadoop 学习笔记五 ---Hadoop系统通信协议介绍

本文约定: DN: DataNode TT: TaskTracker NN: NameNode SNN: Secondry NameNode JT: JobTracker 本文介绍Hadoop各节点和Client之间通信协议. Hadoop的通信是建立在RPC的基础上,关于RPC的详解介绍大家可以参照 "hadoop rpc机制 && 将avro引入hadoop rpc机制初探" Hadoop中节点之间的通信是比较复杂的一个网络,若可以把它们之间的通信网络了解清楚,那么

Hadoop 学习笔记四 ---Hadoop系统通信协议介绍

本文约定: DN: DataNode TT: TaskTracker NN: NameNode SNN: Secondry NameNode JT: JobTracker 本文介绍Hadoop各节点和Client之间通信协议. Hadoop的通信是建立在RPC的基础上,关于RPC的详解介绍大家可以参照 "hadoop rpc机制 && 将avro引入hadoop rpc机制初探" Hadoop中节点之间的通信是比较复杂的一个网络,若可以把它们之间的通信网络了解清楚,那么

日志收集系统Flume调研笔记第1篇 - Flume简介

用户行为数据的收集无疑是构建推荐系统的先决条件,而Apache基金会下的Flume项目正是为分布式的日志收集量身打造的,本文是Flume调研笔记的第1篇,主要介绍Flume的基本架构,下篇笔记将会以实例说明Flume的部署和使用步骤. 本文所用的Flume版本为目前最新版的ver1.5.2,它属于Flume-NG,在系统架构上与Flume-OG有所区别,二者的不同可以参考FlumeWiki文档的说明. 1. Flume是什么 Flume是Apache基金会下的一个开源项目,它实现了一套分布式的.

Java开源Apache项目

Commons-Pool Commons-Pool 提供了通用对象池接口,一个用于创建模块化对象池的工具包,以及通常的对象池实 更多Commons-Pool信息 Commons-Math Math 是一个轻量的,自包含的数学和统计组件,解决了许多非常通用但没有及时出现在Java标准语言中的实践问题. 更多Commons-Math信息 Commons-Jelly Jelly能够把XML转换成可执行代码,所以Jelly是一个基于XML与Java的脚本和处理引擎. Jelly借鉴了JSP定指标签,Ve

Hadoop系统通信协议介绍(转)

转载自 ---- http://weilaiyxj.iteye.com/blog/913166 本文约定: DN: DataNode TT: TaskTracker NN: NameNode SNN: Secondry NameNode JT: JobTracker 本文介绍Hadoop各节点和Client之间通信协议. Hadoop的通信是建立在RPC的基础上,关于RPC的详解介绍大家可以参照 "hadoop rpc机制 && 将avro引入hadoop rpc机制初探&quo

flume参考文档

问题导读: 1.node向master发送心跳之后等待反馈的最大时长由哪个参数来决定,默认多长时间? 2.当primary sink(可以认为是第一collector)故障后,重启primary sink的一个延迟时间,在此期间,agent将把数据发送到secondary sink(可能是第二collector)由哪个参数来决定? 3.collector的默认发送目录通过哪个参数可以配置? 4.flume command执行命令的方式有几种? Flume配置文件(flume-site.conf)

flume1.8 使用指南学习感悟(一)

1. 系统要求 1. Java运行环境 -- Java 1.8及以上 2. 内存 -- 足够的内存供配置的sources,channels 或者sinks使用 3. 硬盘空间 -- 足够的硬盘空间供配置的channels或者sinks使用 4. 文件权限 -- agent使用的文件夹读写权限 2. 架构及数据流模型 模型介绍详情参考:http://www.cnblogs.com/swordfall/p/8093464.html 3. 创建 3.1 创建一个Agent Flume agent配置存