大发888平台开发实现原理图:
1、Service API对应服务接口。
HelloService.java代码如下:
package com.shan.rpc.service;
public interface HelloService {
public String sayHello(String content);
}
2、Service Impl对应服务接口的实现:
HelloServiceImpl.java代码如下:
package com.shan.rpc.service.impl;
import com.shan.rpc.service.HelloService;
public class HelloServiceImpl implements HelloService {
public String sayHello(String content) {
return "hello," + content;
}
}
3、Consumer Proxy 角色对应服务消费者代理类。通过实现服务接口的动态代理对象获得服务接口的动态代理实例Proxy.newProxyInstance,通过实现InvocationHandler接口中的invoke方法来完成远程RPC的调用。具体通过输出流将调用接口的方法及参数写入Socket,发起远程调用。之后通过Java对象输入流从Socket获得返回结果。
ConsumerProxy.java代码如下:
package com.shan.rpc.framework;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
public class ConsumerProxy {
/**
* @Title: consume
* @Description:服务消费代理
* @author fuss
* @date 2018年7月12日
* @param interfaceClass
* @param host
* @param port
* @return
* @throws Exception
*/
public static <T> T consume(final Class<T> interfaceClass, final String host, final int port) throws Exception {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
output.writeUTF(method.getName());
output.writeObject(args);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
}
}
4、Provider Reflect角色对应服务发布实现。通过输入流从Socket中按照ConsumerProxy的写入顺序注意获取调动方法
名称及方法参数,通过MethodUtils.invokeExactMethod对服务实现类发起反射调用,将调用结果写入Socket返回给对方。
ProviderReflect.java代码如下:
package com.shan.rpc.framework;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.reflect.MethodUtils;
public class ProviderReflect {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
/**
* @Title: provider
* @Description:服务发布
* @author fuss
* @date 2018年7月12日
* @param service
* @param port
* @throws Exception
*/
public static void provider(final Object service, int port) throws Exception {
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
final Socket socket = serverSocket.accept();
EXECUTOR_SERVICE.execute(new Runnable() {
@Override
public void run() {
try {
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
try {
String methodName = input.readUTF();//方法名称
Object[] args = (Object[]) input.readObject();//方法参数
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
Object result = MethodUtils.invokeExactMethod(service, methodName, args);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
然后写main方法通过ProviderReflect发布服务,代码如下:
package com.shan.rpc.invoke;
import com.shan.rpc.framework.ProviderReflect;
import com.shan.rpc.service.HelloService;
import com.shan.rpc.service.impl.HelloServiceImpl;
public class RpcProviderMain {
public static void main(String[] args) throws Exception {
HelloService service = new HelloServiceImpl();
ProviderReflect.provider(service, 8083);
}
}
客户端调用远程服务的main方法,代码如下:
package com.shan.rpc.invoke;
import com.shan.rpc.framework.ConsumerProxy;
import com.shan.rpc.service.HelloService;
public class RpcConsumerMain {
public static void main(String[] args) throws Exception {
HelloService service = ConsumerProxy.consume(HelloService.class, "127.0.0.1", 8083);
for (int i = 0; i < 1000; i++) {
String hello = service.sayHello("fuss_" + i);
System.out.println(hello);
Thread.sleep(1000);
}
}
}
执行结果如图:
原文地址:http://blog.51cto.com/13883597/2149033
时间: 2024-11-09 03:31:35