Thrift源码分析(五)-- FrameBuffer类分析

FrameBuffer是Thrift NIO服务器端的一个核心组件,它一方面承担了NIO编程中的缓冲区的功能,另一方面还承担了RPC方法调用的职责。

FrameBufferState定义了FrameBuffer作为缓冲区的读写状态

private enum FrameBufferState {
    // in the midst of reading the frame size off the wire
    // 读Frame消息头,实际是4字节表示Frame长度
    READING_FRAME_SIZE,
    // reading the actual frame data now, but not all the way done yet
    // 读Frame消息体
    READING_FRAME,
    // completely read the frame, so an invocation can now happen
    // 读满包
    READ_FRAME_COMPLETE,
    // waiting to get switched to listening for write events
    // 等待注册写
    AWAITING_REGISTER_WRITE,
    // started writing response data, not fully complete yet
    // 写半包
    WRITING,
    // another thread wants this framebuffer to go back to reading
    // 等待注册读
    AWAITING_REGISTER_READ,
    // we want our transport and selection key invalidated in the selector
    // thread
    // 等待关闭
    AWAITING_CLOSE
  }

值得注意的是,FrameBuffer读数据时,

1. 先读4字节的Frame消息头,

2. 然后改变FrameBufferState,从READING_FRMAE_SIZE到READING_FRAME,并根据读到的Frame长度修改Buffer的长度

3. 再次读Frame消息体,如果读完就修改状态到READ_FRAME_COMPLETE,否则还是把FrameBuffer绑定到SelectionKey,下次继续读

public boolean read() {
      if (state_ == FrameBufferState.READING_FRAME_SIZE) {
        // try to read the frame size completely
        if (!internalRead()) {
          return false;
        }

        // if the frame size has been read completely, then prepare to read the
        // actual frame.
        if (buffer_.remaining() == 0) {
          // pull out the frame size as an integer.
          int frameSize = buffer_.getInt(0);
          if (frameSize <= 0) {
            LOGGER.error("Read an invalid frame size of " + frameSize
                + ". Are you using TFramedTransport on the client side?");
            return false;
          }

          // if this frame will always be too large for this server, log the
          // error and close the connection.
          if (frameSize > MAX_READ_BUFFER_BYTES) {
            LOGGER.error("Read a frame size of " + frameSize
                + ", which is bigger than the maximum allowable buffer size for ALL connections.");
            return false;
          }

          // if this frame will push us over the memory limit, then return.
          // with luck, more memory will free up the next time around.
          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
            return true;
          }

          // increment the amount of memory allocated to read buffers
          readBufferBytesAllocated.addAndGet(frameSize);

          // reallocate the readbuffer as a frame-sized buffer
          buffer_ = ByteBuffer.allocate(frameSize);

          state_ = FrameBufferState.READING_FRAME;
        } else {
          // this skips the check of READING_FRAME state below, since we can't
          // possibly go on to that state if there's data left to be read at
          // this one.
          return true;
        }
      }

      // it is possible to fall through from the READING_FRAME_SIZE section
      // to READING_FRAME if there's already some frame data available once
      // READING_FRAME_SIZE is complete.

      if (state_ == FrameBufferState.READING_FRAME) {
        if (!internalRead()) {
          return false;
        }

        // since we're already in the select loop here for sure, we can just
        // modify our selection key directly.
        if (buffer_.remaining() == 0) {
          // get rid of the read select interests
          selectionKey_.interestOps(0);
          state_ = FrameBufferState.READ_FRAME_COMPLETE;
        }

        return true;
      }

      // if we fall through to this point, then the state must be invalid.
      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
      return false;
    }

internalRead方法实际调用了SocketChannel来读数据。注意SocketChannel返回值小于0的情况:

n 有数据的时候返回读取到的字节数。

0 没有数据并且没有达到流的末端时返回0。

-1 当达到流末端的时候返回-1。

当Channel有数据时并且是最后的数据 时,实际会读两次,第一次返回字节数,第二次返回-1。这个是底层Selector实现的。

 private boolean internalRead() {
      try {
        if (trans_.read(buffer_) < 0) {
          return false;
        }
        return true;
      } catch (IOException e) {
        LOGGER.warn("Got an IOException in internalRead!", e);
        return false;
      }
    }

在看写缓冲时的情况

1. 写之前必须把FrameBuffer的状态改成WRITING,后面会有具体例子

2. 如果没写任何数据,就返回false

3. 如果写完了,就需要把SelectionKey注册的写事件取消。Thrift是直接把SelectionKey注册事件改成读了,而常用的做法一般是把写事件取消就行了。关于更多NIO写事件的注册问题,看这篇:
http://blog.csdn.net/iter_zc/article/details/39291129

  public boolean write() {
      if (state_ == FrameBufferState.WRITING) {
        try {
          if (trans_.write(buffer_) < 0) {
            return false;
          }
        } catch (IOException e) {
          LOGGER.warn("Got an IOException during write!", e);
          return false;
        }

        // we're done writing. now we need to switch back to reading.
        if (buffer_.remaining() == 0) {
          prepareRead();
        }
        return true;
      }

      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
      return false;
    }

FrameBuffer可以根据SelectionKey的状态来切换自身状态,也可以根据自身状态来选择注册的Channel事件

public void changeSelectInterests() {
      if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
        // set the OP_WRITE interest
        selectionKey_.interestOps(SelectionKey.OP_WRITE);
        state_ = FrameBufferState.WRITING;
      } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
        prepareRead();
      } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
        close();
        selectionKey_.cancel();
      } else {
        LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
      }
    }

说完了FrameBuffer作为NIO缓冲区的功能,再看看它作为RPC方法调用模型的重要组件的功能。

FrameBuffer提供了invoker方法,当读满包时,从消息头拿到要调用的方法,然后通过它管理的Processor来完成实际方法调用。然后切换到写模式来写消息体

具体的调用模型看这篇: http://blog.csdn.net/iter_zc/article/details/39692951

public void invoke() {
      TTransport inTrans = getInputTransport();
      TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
      TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

      try {
        processorFactory_.getProcessor(inTrans).process(inProt, outProt);
        responseReady();
        return;
      } catch (TException te) {
        LOGGER.warn("Exception while invoking!", te);
      } catch (Throwable t) {
        LOGGER.error("Unexpected throwable while invoking!", t);
      }
      // This will only be reached when there is a throwable.
      state_ = FrameBufferState.AWAITING_CLOSE;
      requestSelectInterestChange();
    }

 public void responseReady() {
      // the read buffer is definitely no longer in use, so we will decrement
      // our read buffer count. we do this here as well as in close because
      // we'd like to free this read memory up as quickly as possible for other
      // clients.
      readBufferBytesAllocated.addAndGet(-buffer_.array().length);

      if (response_.len() == 0) {
        // go straight to reading again. this was probably an oneway method
        state_ = FrameBufferState.AWAITING_REGISTER_READ;
        buffer_ = null;
      } else {
        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());

        // set state that we're waiting to be switched to write. we do this
        // asynchronously through requestSelectInterestChange() because there is
        // a possibility that we're not in the main thread, and thus currently
        // blocked in select(). (this functionality is in place for the sake of
        // the HsHa server.)
        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
      }
      requestSelectInterestChange();
    }

写消息体responseReday()方法时,我们看到Thrift是如何处理写的

1. 创建ByteBuffer

2. 修改状态到AWAITING_REGISTER_WRITE

3. 调用requestSelecInteresetChange()方法来注册Channel的写事件

4. 当Selector根据isWriteable状态来调用要写的Channel时,会调用FrameBuffer的write方法,上面说了write方法写满包后,会取消注册的写事件。

时间: 2024-10-27 08:10:48

Thrift源码分析(五)-- FrameBuffer类分析的相关文章

老李推荐:第6章3节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-命令翻译类

老李推荐:第6章3节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览-命令翻译类 每个来自网络的字串命令都需要进行解析执行,只是有些是在解析的过程中直接执行了事,而有些是需要在解析后创建相应的事件类实例并添加到命令队列里面排队执行.负责这部分工作的就是命令翻译类.那么我们往下还是继续在MonkeySourceNetwork这个范畴中MonkeyCommand类是怎么一回事: 图6-3-1 MonkeyCommand族谱 图中间的MonkeyCommand是一个接口,

老李推荐:第6章5节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-事件

老李推荐:第6章5节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览-事件 从网络过来的命令字串需要解析翻译出来,有些命令会在翻译好后直接执行然后返回,但有一大部分命令在翻译后需要转换成对应的事件,然后放入到命令队列里面等待执行.Monkey在取出一个事件执行的时候主要是执行其injectEvent方法来注入事件,而注入事件根据是否需要往系统注入事件分为两种: 需要通过系统服务往系统注入事件:如MonkeyKeyEvent事件会通过系统的InputManager往系

boost.asio源码剖析(五) ---- 泛型与面向对象的完美结合

有人说C++是带类的C:有人说C++是面向对象编程语言:有人说C++是面向过程与面向对象结合的语言.类似的评论网上有很多,虽然正确,却片面,是断章取义之言. C++是实践的产物,C++并没有为了成为某某类型的语言而设计,而是一切以工程实践为目的,一切以提升语言能力为目的. 1983年C++诞生之时,由于兼容C语言而天生拥有了面向过程编程的能力:       1989年推出的2.0版,C++完善了对面向对象编程范式的支持:       1993年的3.0版,C++中引入了模板(template),

Spring 源码解析之DispatcherServlet源码解析(五)

Spring 源码解析之DispatcherServlet源码解析(五) 前言 本文需要有前四篇文章的基础,才能够清晰易懂,有兴趣可以先看看详细的流程,这篇文章可以说是第一篇文章,也可以说是前四篇文章的的汇总,Spring的整个请求流程都是围绕着DispatcherServlet进行的 类结构图 根据类的结构来说DispatcherServlet本身也是继承了HttpServlet的,所有的请求都是根据这一个Servlet来进行转发的,同时解释了为什么需要在web.xml进行如下配置,因为Spr

ThinkPHP6源码:从Http类的实例化看依赖注入是如何实现的

ThinkPHP 6 从原先的 App 类中分离出 Http 类,负责应用的初始化和调度等功能,而 App 类则专注于容器的管理,符合单一职责原则. 以下源码分析,我们可以从 App,Http 类的实例化过程,了解类是如何实现自动实例化的,依赖注入是怎么实现的. 从入口文件出发 当访问一个 ThinkPHP 搭建的站点,框架最先是从入口文件开始的,然后才是应用初始化.路由解析.控制器调用和响应输出等操作. 入口文件主要代码如下: // 引入自动加载器,实现类的自动加载功能(PSR4标准) //

Netty 源码(五)NioEventLoop

Netty 源码(五)NioEventLoop Netty 基于事件驱动模型,使用不同的事件来通知我们状态的改变或者操作状态的改变.它定义了在整个连接的生命周期里当有事件发生的时候处理的核心抽象. Channel 为 Netty 网络操作抽象类,EventLoop 主要是为 Channel 处理 I/O 操作,两者配合参与 I/O 操作. 下图是 Channel.EventLoop.Thread.EventLoopGroup 之间的关系. 一个 EventLoopGroup 包含一个或多个 Ev

Java源码转C#源码的五款最佳工具

Java源码转C#源码的五款最佳工具 作者:chszs,转载需注明.博客主页:http://blog.csdn.net/chszs 出于某些需要,你可能会遇到把Java源码转换成C#源码的任务.如果是自己一边理解源码,再一边手工翻译,那效率肯定是很低的.有鉴于此,本文推荐了五款最佳的源码转换工具,以解决你的烦恼.工具1#:Java语言转换器助手地址:http://www.microsoft.com/en-us/download/details.aspx?id=14349 Java语言转换器助手是

神经网络caffe框架源码解析--softmax_layer.cpp类代码研究

// Copyright 2013 Yangqing Jia // #include <algorithm> #include <vector> #include "caffe/layer.hpp" #include "caffe/vision_layers.hpp" #include "caffe/util/math_functions.hpp" using std::max; namespace caffe { /**

神经网络caffe框架源码解析--data_layer.cpp类代码研究

dataLayer作为整个网络的输入层, 数据从leveldb中取.leveldb的数据是通过图片转换过来的. 网络建立的时候, datalayer主要是负责设置一些参数,比如batchsize,channels,height,width等. 这次会通过读leveldb一个数据块来获取这些信息. 然后启动一个线程来预先从leveldb拉取一批数据,这些数据是图像数据和图像标签. 正向传播的时候, datalayer就把预先拉取好数据拷贝到指定的cpu或者gpu的内存. 然后启动新线程再预先拉取数

Thrift源码分析(四)-- 方法调用模型分析

RPC调用本质上就是一种网络编程,客户端向服务器发送消息,服务器拿到消息之后做后续动作.只是RPC这种消息比较特殊,它封装了方法调用,包括方法名,方法参数.服务端拿到这个消息之后,解码消息,然后要通过方法调用模型来完成实际服务器端业务方法的调用. 这篇讲讲Thrfit的方法调用模型.Thrift的方法调用模型很简单,就是通过方法名和实际方法实现类的注册完成,没有使用反射机制,类加载机制. 和方法调用相关的几个核心类: 1. 自动生成的Iface接口,是远程方法的顶层接口 2. 自动生成的Proc