RPC与hadoop

rlgdj的这样的话,真正的实现类在Server端,客户端调用方法的时候,只能得到得到从Server端的返回值。看来接口中的抽象方法必须要有返回值啊。ps。右下角的Client端的main()中rpc.refer()方法,返回一个继承了Proxy实现了HelloService的一个代理类。再调用里面的invoke方法。具体可以去看代理模式。

附上源码:

 1 package test.PRC;
 2
 3 public class Client {
 4     public static void main(String[] args) throws Exception {
 5         HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
 6         for (int i = 0; i < Integer.MAX_VALUE; i ++) {
 7             String hello = service.hello("World" + i);
 8             System.out.println(hello);
 9             Thread.sleep(1000);
10         }
11     }
12
13 }

服务端:

 1 package test.PRC;
 2
 3 public class Server {
 4
 5
 6     public static void main(String []args) throws Exception {
 7             HelloService service = new HelloServiceImpl();
 8             RpcFramework.export(service, 1234);
 9     }
10
11
12 }

rpc框架:

  1 package test.PRC;
  2
  3 import java.io.ObjectInputStream;
  4 import java.io.ObjectOutputStream;
  5 import java.lang.reflect.InvocationHandler;
  6 import java.lang.reflect.Method;
  7 import java.lang.reflect.Proxy;
  8 import java.net.ServerSocket;
  9 import java.net.Socket;
 10
 11 public class RpcFramework {
 12     /**
 13     * 暴露服务
 14     *
 15     * @param service 服务实现
 16     * @param port 服务端口
 17     * @throws Exception
 18     */
 19    public static void export(final Object service, int port) throws Exception {
 20        if (service == null)
 21             throw new IllegalArgumentException("service instance == null");
 22         if (port <= 0 || port > 65535)
 23             throw new IllegalArgumentException("Invalid port " + port);
 24         System.out.println("Export service " + service.getClass().getName() + " on port " + port);
 25         ServerSocket server = new ServerSocket(port);
 26         for(;;) {
 27             try {
 28                 final Socket socket = server.accept();//服务器端一旦收到消息,就创建一个线程进行处理
 29                 new Thread(new Runnable() {
 30                     public void run() {
 31                         try {
 32                             try {
 33                                 ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
 34                                 try {
 35                                     String methodName = input.readUTF();//service是服务器端提供服务的对象,但是,要通过获取到的调用方法的名称,参数类型,以及参数来选择对象的方法,并调用。获得方法的名称
 36                                     Class<?>[] parameterTypes = (Class<?>[])input.readObject();//获得参数的类型
 37                                     Object[] arguments = (Object[])input.readObject();//获得参数
 38                                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
 39                                     try {
 40                                         Method method = service.getClass().getMethod(methodName, parameterTypes);//通过反射机制获得方法
 41                                         Object result = method.invoke(service, arguments);//通过反射机制获得类的方法,并调用这个方法
 42                                         output.writeObject(result);//将结果发送
 43                                     } catch (Throwable t) {
 44                                         output.writeObject(t);
 45                                     } finally {
 46                                         output.close();
 47                                     }
 48                                 } finally {
 49                                     input.close();
 50                                 }
 51                             } finally {
 52                                 socket.close();
 53                             }
 54                         } catch (Exception e) {
 55                             e.printStackTrace();
 56                         }
 57                     }
 58                 }).start();
 59             } catch (Exception e) {
 60                 e.printStackTrace();
 61             }
 62         }
 63     }
 64
 65     /**
 66      * 引用服务
 67      *
 68      * @param <T> 接口泛型
 69      * @param interfaceClass 接口类型
 70      * @param host 服务器主机名
 71      * @param port 服务器端口
 72      * @return 远程服务
 73      * @throws Exception
 74      *///原理是通过代理,获得服务器端接口的一个“代理”的对象。对这个对象的所有操作都会调用invoke函数,在invoke函数中,是将被调用的函数名,参数列表和参数发送到服务器,并接收服务器处理的结果
 75     @SuppressWarnings("unchecked")
 76     public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
 77         if (interfaceClass == null)
 78             throw new IllegalArgumentException("Interface class == null");
 79         if (! interfaceClass.isInterface())
 80             throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
 81         if (host == null || host.length() == 0)
 82             throw new IllegalArgumentException("Host == null!");
 83         if (port <= 0 || port > 65535)
 84             throw new IllegalArgumentException("Invalid port " + port);
 85         System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
 86         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {
 87             public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
 88                 Socket socket = new Socket(host, port);
 89                 try {
 90                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
 91                     try {
 92                         output.writeUTF(method.getName());
 93                         output.writeObject(method.getParameterTypes());
 94                         output.writeObject(arguments);
 95                         ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
 96                         try {
 97                             Object result = input.readObject();
 98                             if (result instanceof Throwable) {
 99                                 throw (Throwable) result;
100                             }
101                             return result;
102                         } finally {
103                             input.close();
104                         }
105                     } finally {
106                         output.close();
107                     }
108                 } finally {
109                     socket.close();
110                  }
111              }
112          });
113      }
114
115
116 }

接口:

package test.PRC;

public interface HelloService {

    String hello(String name);
}

实现类:

 1 package test.PRC;
 2
 3 public class HelloServiceImpl implements HelloService{
 4
 5     public String hello(String name) {
 6         return "Hello " + name;
 7     }
 8
 9
10 }

具体情况具体部署。

时间: 2024-08-08 09:41:35

RPC与hadoop的相关文章

RPC与Hadoop RPC机制

一.什么是RPC? (1)Remote Procdure call ,远程方法调用,它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的.经常用于分布式网络通信中. (2)Hadoop的进程间交互都死通过RPC来进行的,比如Namenode与Datanode直接,Jobtracker与Tasktracker之间等. 流程: (1)RPC采用了C/S的模式: (2)Client端发送一个带有参数的请求信息到Server: (3)Server接收到这

Hadoop源码解析之 rpc通信 client到server通信

rpc是Hadoop分布式底层通信的基础,无论是client和namenode,namenode和datanode,以及yarn新框架之间的通信模式等等都是采用的rpc方式. 下面我们来概要分析一下Hadoop2的rpc. Hadoop通信模式主要是C/S方式,及客户端和服务端的模式. 客户端采用传统的socket通信方式向服务端发送信息,并等待服务端的返回. 服务端采用reactor的模式(Java nio)的方式来处理客户端的请求并给予响应. 一.客户端到服务端的通信 下面我们先分析客户端到

Hadoop之——RPC通信实例

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45922715 一. RPC(remote procedure call) 不同java进程间的对象方法的调用. 一方称作服务端(server),一方称作客户端(client). server端提供对象,供客户端调用的,被调用的对象的方法的执行发生在server端. RPC是hadoop框架运行的基础. 二.通过rpc小例子获得的认识 1.服务端提供的对象必须是一个接口,接口ext

Hadoop RPC

第一部分:什么是RPC RPC (Remote Procedure Call Protocol) – 远程过程协议调用 .通过 RPC 我们可以从网络上的计算机请求服务,而不需要了 解底层网络协议. Hadoop 底层的交互都是通过 rpc 进行的.例 如: datanode 和 namenode . tasktracker 和 jobtracker . secondary namenode 和 namenode 之间的通信都是通过 rpc 实 现的. RPC 模式 RPC 采用客户机 / 服务

Hadoop源码学习笔记(4) ——Socket到RPC调用

Hadoop源码学习笔记(4) ——Socket到RPC调用 Hadoop是一个分布式程序,分布在多台机器上运行,事必会涉及到网络编程.那这里如何让网络编程变得简单.透明的呢? 网络编程中,首先我们要学的就是Socket编程,这是网络编程中最底层的程序接口,分为服务器端和客户端,服务器负责监听某个端口,客户端负责连接服务器上的某个端口,一旦连接通过后,服务器和客户端就可以双向通讯了,我们看下示例代码: ServerSocket server = new ServerSocket(8111); S

hadoop rpc基础

第一部分: hadoop rpc基础 RPC,远程程序调用,分布式计算中C/S模型的一个应用实例. 同其他RPC框架一样,Hadoop分为四个部分: 序列化层:支持多种框架实现序列化与反序列化 函数调用层:利用java反射与动态代理实现 网络传输层:基于TCP/IP的Socket机制 服务的处理框架:基于Reactor模式的事件驱动IO模型 Hadoop RPC主要对外提供2种接口 public static ProtocolProxy getProxy/waitForProxy: 构造一个客户

Hadoop中RPC协议小例子报错java.lang.reflect.UndeclaredThrowableException解决方法

最近在学习传智播客吴超老师的Hadoop视频,里面他在讲解RPC通信原理的过程中给了一个RPC的小例子,但是自己编写的过程中遇到一个小错误,整理如下: log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).log4j:WARN Please initialize the log4j system properly.log4j:WARN See

Hadoop RPC通信Server端的流程分析

前2天刚刚小小的分析下Client端的流程,走的还是比较通顺的,但是RPC的服务端就显然没有那么简单了,毕竟C-S这种模式的,压力和重点都是放在Server端的,所以我也只能做个大概的分析,因为里面细节的东西太多,我也不可能理清所有细节,但是我会集合源代码把主要的流程理理清.如果读者想进一步学习的话,可自行查阅源码. Server服务端和Client客户端在某些变量的定义上还是一致的,比如服务端也有Call,和Connection,这个很好理解,Call回调,和Connection连接是双向的.

Hadoop学习&lt;四&gt;--HDFS的RPC通信原理总结

这里先写下自己学习RPC的笔记总结,下面将详细介绍学习过程: RPC(remote procedure call) 不同java进程间的对象方法的调用. 一方称作服务端(server),一方称作客户端(client). server端提供对象,供客户端调用的,被调用的对象的方法的执行发生在server端. RPC是hadoop框架运行的基础. 通过rpc小例子获得的认识? 1. 服务端提供的对象必须是一个接口,接口extends VersioinedProtocal 2. 客户端能够的对象中的方