hadoop2.0 rpc分析-1.0

1. 使用的实例:

package com.rpc;
import org.apache.hadoop.ipc.VersionedProtocol;
public interface  MyProtocol extends VersionedProtocol{
public static final long versionID=1L;
String getlength(String str);
int add(int a, int b);
}
package com.rpc;
import java.io.IOException;
import org.apache.hadoop.ipc.ProtocolSignature;
public class MyProtocolImp  implements MyProtocol{
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
// TODO Auto-generated method stub
return MyProtocol.versionID;
}
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(MyProtocol.versionID,null);
}
public String getlength(String str) {
return String.valueOf(str.length());
}
public int add(int a, int b) {
return a+b;
}
}
// rpc server
package com.rpc;
import java.io.IOException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
public class MyServer {
public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
Configuration conf = new Configuration();
Server server = new RPC.Builder(conf).setProtocol(MyProtocol.class)
.setInstance(new MyProtocolImp()).setBindAddress("0.0.0.0").setPort(8080)
.setNumHandlers(10).build();
server.start();
}
}
// rpc客户端
package com.rpc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
public class MyClient {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
InetSocketAddress ip = new InetSocketAddress(8080);
MyProtocol proxy = (MyProtocol)RPC.getProxy(MyProtocol.class, MyProtocol.versionID, ip,conf );
int sum = proxy.add(10, 2);
System.out.println(sum);
}
}

2. 分析:

(1)

rpc 提供了可以修改序列化的方式。默认是:WritableRpcEngine

代码:

Rpc.class:
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
      Configuration conf) {
    RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
    if (engine == null) {
      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
                                    WritableRpcEngine.class);
      engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
      PROTOCOL_ENGINES.put(protocol, engine);
    }
    return engine;
  }

(2)

Rpc.Listener.class 负责监听
 public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }
时间: 2024-08-22 15:27:26

hadoop2.0 rpc分析-1.0的相关文章

Hadoop2源码分析-YARN RPC 示例介绍

1.概述 之前在<Hadoop2源码分析-RPC探索实战>一文当中介绍了Hadoop的RPC机制,今天给大家分享关于YARN的RPC的机制.下面是今天的分享目录: YARN的RPC介绍 YARN的RPC示例 截图预览 下面开始今天的内容分享. 2.YARN的RPC介绍 我们知道在Hadoop的RPC当中,其主要由RPC,Client及Server这三个大类组成,分别实现对外提供编程接口.客户端实现及服务端实现.如下图所示: 图中是Hadoop的RPC的一个类的关系图,大家可以到<Hado

蓝牙4.0技术分析1-广播者角色

第1章  BlueTooth Roles-Broadcaster 1.1    广播类型 广播可设置以下几种类型: 1)   Connectable Undirected Event Type(可连接无定向广播) 2)   Connectable Directed Event Type(可连接定向广播) 3)   Scannable Undirected Event Type(可扫描无定向广播) 4)   Non-connectable Undirected Event Type(不可连接无定向

Nutch 1.0 源代码分析[8] CrawlDb

Nutch 1.0 源代码分析[8] CrawlDb 24MAR 2010 18:44:08 +0800 ---------------------------------------------------------------------------- 再接下来Crawl类中的重要的一行就是: http://c.tieba.baidu.com/p/3312872854 http://c.tieba.baidu.com/p/3312894881 http://c.tieba.baidu.co

Lua1.0 代码分析 库

Lua1.0 代码分析 库库的代码相对比较简单.这里以数学库为例进行说明.比如看下这个取绝对值的数学函数 static void math_abs (void) {  double d;  lua_Object o = lua_getparam (1);  if (o == NULL)  { lua_error ("too few arguments to function `abs'"); return; }  if (!lua_isnumber(o))  { lua_error (

实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 1

公司当年有一个自己缓存集群用户session的Java library,是基于hazlcast2.0.3实现的,最近在customer site集群环境中某个blade报了Out of Memory Exception, 其他blades都正常,马上用jrockit jrcmd命令dump了堆和线程进行分析. printf "##################### heap ##################\n" su -p occas -c "/opt/jrocki

实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 2

hazelcast 提供了3中方法调用startCleanup: 第一种是在ConcuurentMapManager的构造函数中,通过调用node的executorManager中的ScheduledExecutorService来创建每秒执行一次cleanup操作的线程(代码如下).由于这是ConcuurentMapManager构造函数的代码,所以这种调用startCleanup的操作是默认就会有的. node.executorManager.getScheduledExecutorServ

Hadoop2.0构成之HDFS2.0

HDFS2.0之HA 主备NameNode: 1.主NameNode对外提供服务,备NameNode同步主NameNode元数据,以待切换: 2.主NameNode的信息发生变化后,会将信息写到共享数据存储系统中让备NameNode合并到自己的内存中: 3.所有DataNode同时向两个NameNode发送心跳信息(块信息): 两种切换方式: 1.手动切换:通过命令实现主备之间的切换,可以用于HDFS升级等场合: 2.自动切换:基于Zookeeper实现: Zookeeper Failover

Nutch 1.0 源代码分析[3] Plugin(2)

 Nutch 1.0 源代码分析[3] Plugin(2)  来自: http://c.tieba.baidu.com/p/3439551436 在URLNormalizers构造函数中,有一句没有看: this.extensionPoint =PluginRepository.get(conf).getExtensionPoint( URLNormalizer.X_POINT_ID); 看一下PluginRepository.get函数: public static synchronizedP

在 NetBeans IDE 6.0 中分析 Java 应用程序性能

NetBeans IDE 6.0 包含一个强大的性能分析工具,可提供与应用程序运行时行为有关的重要信息.通过 NetBeans 性能分析工具,我们可以方便地在 IDE 中监控应用程序的线程状态.CPU 性能以及内存使用情况,而且产生的开销相对较少. 本文将概述 IDE 中包含的性能分析工具,并指导您快速开始分析 NetBeans 项目的性能.本文旨在演示 IDE 中可用的各种性能分析任务以及分析项目性能时可以获得的分析结果.但并不覆盖 IDE 中包含的所有性能分析功能,也不会深入探索如何研究性能