RPC调用本质上就是一种网络编程,客户端向服务器发送消息,服务器拿到消息之后做后续动作。只是RPC这种消息比较特殊,它封装了方法调用,包括方法名,方法参数。服务端拿到这个消息之后,解码消息,然后要通过方法调用模型来完成实际服务器端业务方法的调用。
这篇讲讲Thrfit的方法调用模型。Thrift的方法调用模型很简单,就是通过方法名和实际方法实现类的注册完成,没有使用反射机制,类加载机制。
和方法调用相关的几个核心类:
1. 自动生成的Iface接口,是远程方法的顶层接口
2. 自动生成的Processor类及相关父类,包括TProcessor接口,TBaseProcess抽象类
3. ProcessFunction抽象类,抽象了一个具体的方法调用,包含了方法名信息,调用方法的抽象过程等
4. TNonblcokingServer,是NIO服务器的默认实现,通过Args参数来配置Processor等信息
5. FrameBuffer类,服务器NIO的缓冲区对象,这个对象在服务器端收到全包并解码后,会调用Processor去完成实际的方法调用
6. 服务器端的方法的具体实现类,实现Iface接口
下面逐个来分析相关的类。
Iface接口是自动生成的,描述了方法的接口。 服务器端服务提供方DemoService要实现Iface接口
public class DemoService { public interface Iface { public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException; } } public class DemoServiceImpl implements DemoService.Iface{ @Override public int demoMethod(String param1, Parameter param2, Map<String, String> param3) throws TException { return 0; } }
来看TProcess相关类和接口
1. TProcessor就定义了一个顶层的调用方法process,参数是输入流和输出流
2. 抽象类TBaseProcessor提供了TProcessor的process的默认实现,先读消息头,拿到要调用的方法名,然后从维护的一个Map中取ProcessFunction对象。ProcessFunction对象是实际方法的抽象,调用它的process方法,实际是调用了实际的方法。
3. Processor类是自动生成了,它依赖Iface接口,负责把实际的方法实现和方法的key关联起来,放到Map中维护
public interface TProcessor { public boolean process(TProtocol in, TProtocol out) throws TException; } public abstract class TBaseProcessor<I> implements TProcessor { private final I iface; private final Map<String,ProcessFunction<I, ? extends TBase>> processMap; protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) { this.iface = iface; this.processMap = processFunctionMap; } @Override public boolean process(TProtocol in, TProtocol out) throws TException { TMessage msg = in.readMessageBegin(); ProcessFunction fn = processMap.get(msg.name); if (fn == null) { TProtocolUtil.skip(in, TType.STRUCT); in.readMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'"); out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); x.write(out); out.writeMessageEnd(); out.getTransport().flush(); return true; } fn.process(msg.seqid, in, out, iface); return true; } }
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor { public Processor(I iface) { super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>())); } protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { super(iface, getProcessMap(processMap)); } private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { processMap.put("demoMethod", new demoMethod()); return processMap; } private static class demoMethod<I extends Iface> extends org.apache.thrift.ProcessFunction<I, demoMethod_args> { public demoMethod() { super("demoMethod"); } protected demoMethod_args getEmptyArgsInstance() { return new demoMethod_args(); } protected demoMethod_result getResult(I iface, demoMethod_args args) throws org.apache.thrift.TException { demoMethod_result result = new demoMethod_result(); result.success = iface.demoMethod(args.param1, args.param2, args.param3); result.setSuccessIsSet(true); return result; } } }
自动生成的demoMethod类继承了ProcessFunction类,它负载把方法参数,iface, 方法返回值这些抽象的概念组合在一起,通过抽象模型来完成实际方法的调用。实际方法的实现者实现了Iface接口。
TNonblockingServer是NIO服务器的实现,它通过Selector来检查IO就绪状态,进而调用相关的Channel。就方法调用而言,它处理的是读事件,用handelRead()来进一步处理
private void select() { try { // wait for io events. selector.select(); // process the io events we received Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { cleanupSelectionKey(key); continue; } // if the key is marked Accept, then it has to be the server // transport. if (key.isAcceptable()) { handleAccept(); } else if (key.isReadable()) { // deal with reads handleRead(key); } else if (key.isWritable()) { // deal with writes handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } } protected void handleRead(SelectionKey key) { FrameBuffer buffer = (FrameBuffer) key.attachment(); if (!buffer.read()) { cleanupSelectionKey(key); return; } // if the buffer's frame read is complete, invoke the method. <strong>if (buffer.isFrameFullyRead()) { if (!requestInvoke(buffer)) { cleanupSelectionKey(key); } }</strong> } protected boolean requestInvoke(FrameBuffer frameBuffer) { frameBuffer.invoke(); return true; }
非阻塞同步IO的NIO服务器都会使用缓冲区来存放读写的中间状态。FrameBuffer就是这样的一个缓冲区,它由于涉及到方法调用,所以提供了invoke()方法来实现对Processor的调用。
public void invoke() { TTransport inTrans = getInputTransport(); TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); try { processorFactory_.getProcessor(inTrans).process(inProt, outProt); responseReady(); return; } catch (TException te) { LOGGER.warn("Exception while invoking!", te); } catch (Throwable t) { LOGGER.error("Unexpected throwable while invoking!", t); } // This will only be reached when there is a throwable. state_ = FrameBufferState.AWAITING_CLOSE; requestSelectInterestChange(); }
FrameBuffer使用了processorFactory来获得Processor。ProcessorFactory是在创建服务器的时候传递过来的,只是对Processor的简单封装。
protected TServer(AbstractServerArgs args) { processorFactory_ = args.processorFactory; serverTransport_ = args.serverTransport; inputTransportFactory_ = args.inputTransportFactory; outputTransportFactory_ = args.outputTransportFactory; inputProtocolFactory_ = args.inputProtocolFactory; outputProtocolFactory_ = args.outputProtocolFactory; } public class TProcessorFactory { private final TProcessor processor_; public TProcessorFactory(TProcessor processor) { processor_ = processor; } public TProcessor getProcessor(TTransport trans) { return processor_; } } public T processor(TProcessor processor) { this.processorFactory = new TProcessorFactory(processor); return (T) this; }
下面是一个实际的TNonblockingServer的配置实例
除了配置服务器运行的基本参数,最重要的就是把实际的服务提供者通过服务器参数的方式作为Processor传递给TNonblockingServer,供FrameBuffer调用。
public class DemoServiceImpl implements DemoService.Iface{ @Override public int demoMethod(String param1, Parameter param2, Map<String, String> param3) throws TException { return 0; } public static void main(String[] args){ TNonblockingServerSocket socket; try { socket = new TNonblockingServerSocket(9090); TNonblockingServer.Args options = new TNonblockingServer.Args(socket); TProcessor processor = new DemoService.Processor(new DemoServiceImpl()); options.processor(processor); options.protocolFactory(new TCompactProtocol.Factory()); TServer server = new TNonblockingServer(options); server.serve(); } catch (Exception e) { throw new RuntimeException(e); } } }