Thrift源码分析(四)-- 方法调用模型分析

RPC调用本质上就是一种网络编程,客户端向服务器发送消息,服务器拿到消息之后做后续动作。只是RPC这种消息比较特殊,它封装了方法调用,包括方法名,方法参数。服务端拿到这个消息之后,解码消息,然后要通过方法调用模型来完成实际服务器端业务方法的调用。

这篇讲讲Thrfit的方法调用模型。Thrift的方法调用模型很简单,就是通过方法名和实际方法实现类的注册完成,没有使用反射机制,类加载机制。

和方法调用相关的几个核心类:

1. 自动生成的Iface接口,是远程方法的顶层接口

2. 自动生成的Processor类及相关父类,包括TProcessor接口,TBaseProcess抽象类

3. ProcessFunction抽象类,抽象了一个具体的方法调用,包含了方法名信息,调用方法的抽象过程等

4. TNonblcokingServer,是NIO服务器的默认实现,通过Args参数来配置Processor等信息

5. FrameBuffer类,服务器NIO的缓冲区对象,这个对象在服务器端收到全包并解码后,会调用Processor去完成实际的方法调用

6. 服务器端的方法的具体实现类,实现Iface接口

下面逐个来分析相关的类。

Iface接口是自动生成的,描述了方法的接口。 服务器端服务提供方DemoService要实现Iface接口

public class DemoService {

  public interface Iface {

    public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException;

  }
}

public class DemoServiceImpl implements DemoService.Iface{

    @Override
    public int demoMethod(String param1, Parameter param2,
            Map<String, String> param3) throws TException {
        
        return 0;
    }

}

来看TProcess相关类和接口

1. TProcessor就定义了一个顶层的调用方法process,参数是输入流和输出流

2. 抽象类TBaseProcessor提供了TProcessor的process的默认实现,先读消息头,拿到要调用的方法名,然后从维护的一个Map中取ProcessFunction对象。ProcessFunction对象是实际方法的抽象,调用它的process方法,实际是调用了实际的方法。

3. Processor类是自动生成了,它依赖Iface接口,负责把实际的方法实现和方法的key关联起来,放到Map中维护

public interface TProcessor {
  public boolean process(TProtocol in, TProtocol out)
    throws TException;
}

public abstract class TBaseProcessor<I> implements TProcessor {
  private final I iface;
  private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;

  protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {
    this.iface = iface;
    this.processMap = processFunctionMap;
  }

  @Override
  public boolean process(TProtocol in, TProtocol out) throws TException {
    TMessage msg = in.readMessageBegin();
    ProcessFunction fn = processMap.get(msg.name);
    if (fn == null) {
      TProtocolUtil.skip(in, TType.STRUCT);
      in.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
      x.write(out);
      out.writeMessageEnd();
      out.getTransport().flush();
      return true;
    }
    fn.process(msg.seqid, in, out, iface);
    return true;
  }
}

public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
    public Processor(I iface) {
      super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
    }

    protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      super(iface, getProcessMap(processMap));
    }

    private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      processMap.put("demoMethod", new demoMethod());
      return processMap;
    }

    private static class demoMethod<I extends Iface> extends org.apache.thrift.ProcessFunction<I, demoMethod_args> {
      public demoMethod() {
        super("demoMethod");
      }

      protected demoMethod_args getEmptyArgsInstance() {
        return new demoMethod_args();
      }

      protected demoMethod_result getResult(I iface, demoMethod_args args) throws org.apache.thrift.TException {
        demoMethod_result result = new demoMethod_result();
        result.success = iface.demoMethod(args.param1, args.param2, args.param3);
        result.setSuccessIsSet(true);
        return result;
      }
    }

  }

自动生成的demoMethod类继承了ProcessFunction类,它负载把方法参数,iface, 方法返回值这些抽象的概念组合在一起,通过抽象模型来完成实际方法的调用。实际方法的实现者实现了Iface接口。

TNonblockingServer是NIO服务器的实现,它通过Selector来检查IO就绪状态,进而调用相关的Channel。就方法调用而言,它处理的是读事件,用handelRead()来进一步处理

 private void select() {
      try {
        // wait for io events.
        selector.select();

        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          // skip if not valid
          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }

          // if the key is marked Accept, then it has to be the server
          // transport.
          if (key.isAcceptable()) {
            handleAccept();
          } else if (key.isReadable()) {
            // deal with reads
            handleRead(key);
          } else if (key.isWritable()) {
            // deal with writes
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

   protected void handleRead(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (!buffer.read()) {
        cleanupSelectionKey(key);
        return;
      }

      // if the buffer's frame read is complete, invoke the method.
      <strong>if (buffer.isFrameFullyRead()) {
        if (!requestInvoke(buffer)) {
          cleanupSelectionKey(key);
        }
      }</strong>
    }

   protected boolean requestInvoke(FrameBuffer frameBuffer) {
    frameBuffer.invoke();
    return true;
  }

非阻塞同步IO的NIO服务器都会使用缓冲区来存放读写的中间状态。FrameBuffer就是这样的一个缓冲区,它由于涉及到方法调用,所以提供了invoke()方法来实现对Processor的调用。

public void invoke() {
      TTransport inTrans = getInputTransport();
      TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
      TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

      try {
        processorFactory_.getProcessor(inTrans).process(inProt, outProt);
        responseReady();
        return;
      } catch (TException te) {
        LOGGER.warn("Exception while invoking!", te);
      } catch (Throwable t) {
        LOGGER.error("Unexpected throwable while invoking!", t);
      }
      // This will only be reached when there is a throwable.
      state_ = FrameBufferState.AWAITING_CLOSE;
      requestSelectInterestChange();
    }

FrameBuffer使用了processorFactory来获得Processor。ProcessorFactory是在创建服务器的时候传递过来的,只是对Processor的简单封装。

protected TServer(AbstractServerArgs args) {
    processorFactory_ = args.processorFactory;
    serverTransport_ = args.serverTransport;
    inputTransportFactory_ = args.inputTransportFactory;
    outputTransportFactory_ = args.outputTransportFactory;
    inputProtocolFactory_ = args.inputProtocolFactory;
    outputProtocolFactory_ = args.outputProtocolFactory;
  }

public class TProcessorFactory {

  private final TProcessor processor_;

  public TProcessorFactory(TProcessor processor) {
    processor_ = processor;
  }

  public TProcessor getProcessor(TTransport trans) {
    return processor_;
  }
}

 public T processor(TProcessor processor) {
      this.processorFactory = new TProcessorFactory(processor);
      return (T) this;
    }

下面是一个实际的TNonblockingServer的配置实例

除了配置服务器运行的基本参数,最重要的就是把实际的服务提供者通过服务器参数的方式作为Processor传递给TNonblockingServer,供FrameBuffer调用。

public class DemoServiceImpl implements DemoService.Iface{

	@Override
	public int demoMethod(String param1, Parameter param2,
			Map<String, String> param3) throws TException {

		return 0;
	}

	public static void main(String[] args){
		TNonblockingServerSocket socket;
		try {
			socket = new TNonblockingServerSocket(9090);
			TNonblockingServer.Args options = new TNonblockingServer.Args(socket);
			TProcessor processor = new DemoService.Processor(new DemoServiceImpl());
			options.processor(processor);
			options.protocolFactory(new TCompactProtocol.Factory());
			TServer server = new TNonblockingServer(options);
			server.serve();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

}

时间: 2024-10-28 20:45:06

Thrift源码分析(四)-- 方法调用模型分析的相关文章

Thrift源码分析(五)-- FrameBuffer类分析

FrameBuffer是Thrift NIO服务器端的一个核心组件,它一方面承担了NIO编程中的缓冲区的功能,另一方面还承担了RPC方法调用的职责. FrameBufferState定义了FrameBuffer作为缓冲区的读写状态 private enum FrameBufferState { // in the midst of reading the frame size off the wire // 读Frame消息头,实际是4字节表示Frame长度  READING_FRAME_SIZ

分析开源项目源码,我们该如何入手分析?(授人以渔)

1 前言 本文接上篇文章跟大家聊聊我们为什么要学习源码?学习源码对我们有用吗?,那么本篇文章再继续跟小伙伴们聊聊源码这个话题. 在工作之余开始写SpringBoot源码分析专栏前,跟小伙伴们聊聊"分析开源项目源码,我们该如何入手分析?"这个话题,我们就随便扯皮,反正是跟小伙伴们一起学习交流,没必要太正式. 小伙伴们看完本文后,若有自己的源码阅读心得可以在下面进行评论或私聊我进行分享,让我从小伙伴们身上GET多点源码阅读的一些技巧,嘿嘿. 2 学习开源框架源码到底难不难? 那么,先跟小伙

Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析

类结构图: 不了解Executor接口原理的可以查看concurrent包中的api介绍,这里只介绍Netty中EventExecutorGroup的主要功能! 从类的结构图中可以看到EventExecutorGroup是直接继承ScheduledExecutorService这个接口的,为了说明白Group的原理这里顺便提一下ScheduledExecutorService的用途! java.util.concurrent.ScheduledExecutorService An Executo

Spring 源码解析之ViewResolver源码解析(四)

Spring 源码解析之ViewResolver源码解析(四) 1 ViewResolver类功能解析 1.1 ViewResolver Interface to be implemented by objects that can resolve views by name. View state doesn't change during the running of the application, so implementations are free to cache views. I

Mybatis源码解析(四) —— SqlSession是如何实现数据库操作的?

Mybatis源码解析(四) -- SqlSession是如何实现数据库操作的? ??如果拿一次数据库请求操作做比喻,那么前面3篇文章就是在做请求准备,真正执行操作的是本篇文章要讲述的内容.正如标题一样,本篇文章最最核心的要点就是 SqlSession实现数据库操作的源码解析.但按照惯例,我这边依然列出如下的问题: 1. SqlSession 是如何被创建的? 每次的数据库操作都会创建一个新的SqlSession么?(也许有很多同学会说SqlSession是通过 SqlSessionFactor

Android 4.4 全套源码及子模块源码的下载方法

博文<Android源码下载--用git clone实现单个目录下载>介绍了采用git clone方法下载Android单个目录源码的方法,这篇文章已经有四年的历史,这期间Google对源代码的管理网站已经进行了更改,直接采用原来的方法下载源代码已经失效.本文介绍了在ubuntu下(在Windows下安装Cygwin,通过Cygwin也可在Windows里通过本文的下载步骤下载Android源码)获取目前最新的Android 4.4 全套源码以及单个自模块源码的下载方法,可根据本文方法下载全套

28 GroupSock(NetAddress)——live555源码阅读(四)网络

28 GroupSock(NetAddress)——live555源码阅读(四)网络 28 GroupSock(NetAddress)——live555源码阅读(四)网络 简介 1) NetAddress网络地址类简述 下面是其定义 assign方法(分配空间) NetAddress的构造 clean方法(清理)与析构 operate= 重载赋值操作 本文由乌合之众 lym瞎编,欢迎转载 blog.cnblogs.net/oloroso 本文由乌合之众 lym瞎编,欢迎转载 my.oschina

39 网络相关函数(七)——live555源码阅读(四)网络

39 网络相关函数(七)——live555源码阅读(四)网络 39 网络相关函数(七)——live555源码阅读(四)网络 简介 14)readSocket从套接口读取数据 recv/recvfrom 函数 函数原型: 参数说明: 返回说明: 本文由乌合之众 lym瞎编,欢迎转载 blog.cnblogs.net/oloroso 本文由乌合之众 lym瞎编,欢迎转载 my.oschina.net/oloroso 简介 网络相关函数是一系列用于操作网络数据的函数.在多个文件中都有相关的函数的定义.

29 GroupSock(NetAddressList)——live555源码阅读(四)网络

29 GroupSock(NetAddressList)——live555源码阅读(四)网络 29 GroupSock(NetAddressList)——live555源码阅读(四)网络 简介 NetAddressList的定义 assign方法 NetAddressList的构造 clean方法与析构 拷贝构造与赋值运算符重载 NetAddressList::Iterator迭代器 本文由乌合之众 lym瞎编,欢迎转载 blog.cnblogs.net/oloroso 本文由乌合之众 lym瞎