Thrift源码分析(二)-- 协议和编解码

协议和编解码是一个网络应用程序的核心问题之一,客户端和服务器通过约定的协议来传输消息(数据),通过特定的格式来编解码字节流,并转化成业务消息,提供给上层框架调用。

Thrift的协议比较简单,它把协议和编解码整合在了一起。抽象类TProtocol定义了协议和编解码的顶层接口。个人感觉采用抽象类而不是接口的方式来定义顶层接口并不好,TProtocol关联了一个TTransport传输对象,而不是提供一个类似getTransport()的接口,导致抽象类的扩展性比接口差。

TProtocol主要做了两个事情:

1. 关联TTransport对象

2.定义一系列读写消息的编解码接口,包括两类,一类是复杂数据结构比如readMessageBegin, readMessageEnd,  writeMessageBegin, writMessageEnd.还有一类是基本数据结构,比如readI32, writeI32, readString, writeString

public abstract class TProtocol {

  /**
   * Transport
   */
  protected TTransport trans_;

 public abstract void writeMessageBegin(TMessage message) throws TException;

  public abstract void writeMessageEnd() throws TException;

  public abstract void writeStructBegin(TStruct struct) throws TException;

  public abstract void writeStructEnd() throws TException;

  public abstract void writeFieldBegin(TField field) throws TException;

  public abstract void writeFieldEnd() throws TException;

  public abstract void writeFieldStop() throws TException;

  public abstract void writeMapBegin(TMap map) throws TException;

  public abstract void writeMapEnd() throws TException;

  public abstract void writeListBegin(TList list) throws TException;

  public abstract void writeListEnd() throws TException;

  public abstract void writeSetBegin(TSet set) throws TException;

  public abstract void writeSetEnd() throws TException;

  public abstract void writeBool(boolean b) throws TException;

  public abstract void writeByte(byte b) throws TException;

  public abstract void writeI16(short i16) throws TException;

  public abstract void writeI32(int i32) throws TException;

  public abstract void writeI64(long i64) throws TException;

  public abstract void writeDouble(double dub) throws TException;

  public abstract void writeString(String str) throws TException;

  public abstract void writeBinary(ByteBuffer buf) throws TException;

  /**
   * Reading methods.
   */

  public abstract TMessage readMessageBegin() throws TException;

  public abstract void readMessageEnd() throws TException;

  public abstract TStruct readStructBegin() throws TException;

  public abstract void readStructEnd() throws TException;

  public abstract TField readFieldBegin() throws TException;

  public abstract void readFieldEnd() throws TException;

  public abstract TMap readMapBegin() throws TException;

  public abstract void readMapEnd() throws TException;

  public abstract TList readListBegin() throws TException;

  public abstract void readListEnd() throws TException;

  public abstract TSet readSetBegin() throws TException;

  public abstract void readSetEnd() throws TException;

  public abstract boolean readBool() throws TException;

  public abstract byte readByte() throws TException;

  public abstract short readI16() throws TException;

  public abstract int readI32() throws TException;

  public abstract long readI64() throws TException;

  public abstract double readDouble() throws TException;

  public abstract String readString() throws TException;

  public abstract ByteBuffer readBinary() throws TException;

  /**
   * Reset any internal state back to a blank slate. This method only needs to
   * be implemented for stateful protocols.
   */
  public void reset() {}
 
  /**
   * Scheme accessor
   */
  public Class<? extends IScheme> getScheme() {
    return StandardScheme.class;
  }

}

所谓协议就是客户端和服务器端约定传输什么数据,如何解析传输的数据。对于一个RPC调用的协议来说,要传输的数据主要有:

调用方

1. 方法的名称,包括类的名称和方法的名称

2. 方法的参数,包括类型和参数值

3.一些附加的数据,比如附件,超时事件,自定义的控制信息等等

返回方

1. 调用的返回码

2. 返回值

3.异常信息

从TProtocol的定义我们可以看出Thrift的协议约定如下事情:

1. 先writeMessageBegin表示开始传输消息了,写消息头。Message里面定义了方法名,调用的类型,版本号,消息seqId

2. 接下来是写方法的参数,实际就是写消息体。如果参数是一个类,就writeStructBegin

3. 接下来写字段,writeFieldBegin, 这个方法会写接下来的字段的数据类型和顺序号。这个顺序号是Thrfit对要传输的字段的一个编码,从1开始

4. 如果是一个集合就writeListBegin/writeMapBegin,如果是一个基本数据类型,比如int, 就直接writeI32

5. 每个复杂数据类型写完都调用writeXXXEnd,直到writeMessageEnd结束

6. 读消息时根据数据类型读取相应的长度

每个writeXXX都是采用消息头+消息体的方式。我们来看TBinaryProtocol的实现。

1. writeMessgeBegin方法写了消息头,包括4字节的版本号和类型信息,字符串类型的方法名,4字节的序列号seqId

2. writeFieldBegin,写了1个字节的字段数据类型,和2个字节字段的顺序号

3. writeI32,写了4个字节的字节数组

4. writeString,先写4字节消息头表示字符串长度,再写字符串字节

5. writeBinary,先写4字节消息头表示字节数组长度,再写字节数组内容

6.readMessageBegin时,先读4字节版本和类型信息,再读字符串,再读4字节序列号

7.readFieldBegin,先读1个字节的字段数据类型,再读2个字节的字段顺序号

8. readString时,先读4字节字符串长度,再读字符串内容。字符串统一采用UTF-8编码

 public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
      int version = VERSION_1 | message.type;
      writeI32(version);
      writeString(message.name);
      writeI32(message.seqid);
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
  }

public void writeFieldBegin(TField field) throws TException {
    writeByte(field.type);
    writeI16(field.id);
  }

private byte[] i32out = new byte[4];
  public void writeI32(int i32) throws TException {
    i32out[0] = (byte)(0xff & (i32 >> 24));
    i32out[1] = (byte)(0xff & (i32 >> 16));
    i32out[2] = (byte)(0xff & (i32 >> 8));
    i32out[3] = (byte)(0xff & (i32));
    trans_.write(i32out, 0, 4);
  }

public void writeString(String str) throws TException {
    try {
      byte[] dat = str.getBytes("UTF-8");
      writeI32(dat.length);
      trans_.write(dat, 0, dat.length);
    } catch (UnsupportedEncodingException uex) {
      throw new TException("JVM DOES NOT SUPPORT UTF-8");
    }
  }

public void writeBinary(ByteBuffer bin) throws TException {
    int length = bin.limit() - bin.position();
    writeI32(length);
    trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length);
  }

public TMessage readMessageBegin() throws TException {
    int size = readI32();
    if (size < 0) {
      int version = size & VERSION_MASK;
      if (version != VERSION_1) {
        throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
      }
      return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());
    } else {
      if (strictRead_) {
        throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
      }
      return new TMessage(readStringBody(size), readByte(), readI32());
    }
  }

public TField readFieldBegin() throws TException {
    byte type = readByte();
    short id = type == TType.STOP ? 0 : readI16();
    return new TField("", type, id);
  }

public String readString() throws TException {
    int size = readI32();

    if (trans_.getBytesRemainingInBuffer() >= size) {
      try {
        String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
        trans_.consumeBuffer(size);
        return s;
      } catch (UnsupportedEncodingException e) {
        throw new TException("JVM DOES NOT SUPPORT UTF-8");
      }
    }

    return readStringBody(size);
  }

TProtocol定义了基本的协议信息,包括传输什么数据,如何解析传输的数据的基本方法。

还存在一个问题,就是服务器端如何知道客户端发送过来的数据是怎么组合的,比如第一个字段是字符串类型,第二个字段是int。这个信息是在IDL生成客户端时生成的代码时提供了。Thrift生成的客户端代码提供了读写参数的方法,这两个方式是一一对应的,包括字段的序号,类型等等。客户端使用写参数的方法,服务器端使用读参数的方法。

关于IDL生成的客户端代码会在后面的文章具体描述。下面简单看一下自动生成的代码

1. 方法的调用从writeMessageBegin开始,发送了消息头信息

2. 写方法的参数,也就是写消息体。方法参数由一个统一的接口TBase描述,提供了read和write的统一接口。自动生成的代码提供了read, write方法参数的具体实现

3. 写完结束

 public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("handle", org.apache.thrift.protocol.TMessageType.CALL, 0));
        handle_args args = new handle_args();
        args.setIdentity(identity);
        args.setUid(uid);
        args.setSid(sid);
        args.setType(type);
        args.setMessage(message);
        args.setParams(params);
        args.write(prot);
        prot.writeMessageEnd();
      }

public interface TBase<T extends TBase<?,?>, F extends TFieldIdEnum> extends Comparable<T>,  Serializable {

  public void read(TProtocol iprot) throws TException;

  public void write(TProtocol oprot) throws TException;
}

public static class handle_args <strong>implements org.apache.thrift.TBase</strong><handle_args, handle_args._Fields>, java.io.Serializable, Cloneable   {
    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("handle_args");

    private static final org.apache.thrift.protocol.TField IDENTITY_FIELD_DESC = new org.apache.thrift.protocol.TField("identity", org.apache.thrift.protocol.TType.STRING, (short)1);
    private static final org.apache.thrift.protocol.TField UID_FIELD_DESC = new org.apache.thrift.protocol.TField("uid", org.apache.thrift.protocol.TType.I64, (short)2);
    private static final org.apache.thrift.protocol.TField SID_FIELD_DESC = new org.apache.thrift.protocol.TField("sid", org.apache.thrift.protocol.TType.STRING, (short)3);
    private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)4);
    private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)5);
    private static final org.apache.thrift.protocol.TField PARAMS_FIELD_DESC = new org.apache.thrift.protocol.TField("params", org.apache.thrift.protocol.TType.MAP, (short)6);

    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
    static {
      schemes.put(StandardScheme.class, new handle_argsStandardSchemeFactory());
      schemes.put(TupleScheme.class, new handle_argsTupleSchemeFactory());
    }

    public String identity; // required
    public long uid; // required
    public String sid; // required
    public int type; // required
    public String message; // required
    public Map<String,String> params; // required

    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
      IDENTITY((short)1, "identity"),
      UID((short)2, "uid"),
      SID((short)3, "sid"),
      TYPE((short)4, "type"),
      MESSAGE((short)5, "message"),
      PARAMS((short)6, "params");
}

//  自动生成的写方法参数的方法,按照字段顺序写,给客户端代码使用
      public void write(org.apache.thrift.protocol.TProtocol oprot, handle_args struct) throws org.apache.thrift.TException {
        struct.validate();

        oprot.writeStructBegin(STRUCT_DESC);
        if (struct.identity != null) {
          oprot.writeFieldBegin(IDENTITY_FIELD_DESC);
          oprot.writeString(struct.identity);
          oprot.writeFieldEnd();
        }
        oprot.writeFieldBegin(UID_FIELD_DESC);
        oprot.writeI64(struct.uid);
        oprot.writeFieldEnd();
        if (struct.sid != null) {
          oprot.writeFieldBegin(SID_FIELD_DESC);
          oprot.writeString(struct.sid);
          oprot.writeFieldEnd();
        }
        oprot.writeFieldBegin(TYPE_FIELD_DESC);
        oprot.writeI32(struct.type);
        oprot.writeFieldEnd();
        if (struct.message != null) {
          oprot.writeFieldBegin(MESSAGE_FIELD_DESC);
          oprot.writeString(struct.message);
          oprot.writeFieldEnd();
        }
}

<pre name="code" class="java">//  自动生成的读方法参数的方法,按照字段顺序读,给服务器端代码使用

public void read(org.apache.thrift.protocol.TProtocol iprot, handle_args struct) throws org.apache.thrift.TException {

org.apache.thrift.protocol.TField schemeField;

iprot.readStructBegin();

while (true)

{

schemeField = iprot.readFieldBegin();

if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {

break;

}

switch (schemeField.id) {

case 1: // IDENTITY

if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {

struct.identity = iprot.readString();

struct.setIdentityIsSet(true);

} else {

org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);

}

break;

case 2: // UID

if (schemeField.type == org.apache.thrift.protocol.TType.I64) {

struct.uid = iprot.readI64();

struct.setUidIsSet(true);

} else {

org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);

}

break;

case 3: // SID

if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {

struct.sid = iprot.readString();

struct.setSidIsSet(true);

} else {

org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);

}

break;

case 4: // TYPE

if (schemeField.type == org.apache.thrift.protocol.TType.I32) {

struct.type = iprot.readI32();

struct.setTypeIsSet(true);

} else {

org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);

}

break;

				
时间: 2024-11-29 09:27:11

Thrift源码分析(二)-- 协议和编解码的相关文章

Kubernetes 源码分析 -- API Server之编解码

--------------------- 作者:weixin_34037977 来源:CSDN 原文:https://blog.csdn.net/weixin_34037977/article/details/87058105 在Kubernetes源码分析-- API Server之API Install篇中,我们了解到K8S可以支持多版本的API,但是Rest API的不同版本中接口的输入输出参数的格式是有差别的,Kubernetes是怎么处理这个问题的呢?另外Kubernetes支持ya

Tomcat源码分析二:先看看Tomcat的整体架构

Tomcat源码分析二:先看看Tomcat的整体架构 Tomcat架构图 我们先来看一张比较经典的Tomcat架构图: 从这张图中,我们可以看出Tomcat中含有Server.Service.Connector.Container等组件,接下来我们一起去大致的看看这些组件的作用和他们之间的相互联系.在这之前,我们先补充一个知识点,也就是Tomcat它实现的功能点是什么呢?通过查找一些资料,这里参考下极客时间<深入拆解Tomcat_Jetty>中的总结,即Tomcat 要实现 2 个核心功能:

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

[Android]Volley源码分析(二)Cache

Cache作为Volley最为核心的一部分,Volley花了重彩来实现它.本章我们顺着Volley的源码思路往下,来看下Volley对Cache的处理逻辑. 我们回想一下昨天的简单代码,我们的入口是从构造一个Request队列开始的,而我们并不直接调用new来构造,而是将控制权反转给Volley这个静态工厂来构造. com.android.volley.toolbox.Volley: public static RequestQueue newRequestQueue(Context conte

哇!板球 源码分析二

游戏主页面布局 创建屏下Score标签 pLabel = CCLabelTTF::create("Score", "Arial", TITLE_FONT_SIZE); //分数标签 //设置标签字体的颜色 pLabel->setColor (ccc3(0, 0, 0)); //设置文本标签的位置 pLabel->setPosition ( ccp ( SCORE_X, //X坐标 SCORE_Y //Y坐标 ) ); //将文本标签添加到布景中 this

baksmali和smali源码分析(二)

这一节,主要介绍一下 baksmali代码的框架. 我们经常在反编译android apk包的时候使用apktool这个工具,其实本身这个工具里面对于dex文件解析和重新生成就是使用的baksmali 和smali这两个jar包其中 baksmali是将 dex文件转换成便于阅读的smali文件的,具体使用命令如下:java -jar baksmali.jar classes.dex -o myout其中myout是输出的文件夹 而smali是将smali文件重新生成回 dex文件的具体使用的命

【梦幻连连连】源码分析(二)

转载请注明出处:http://blog.csdn.net/oyangyufu/article/details/24736711 GameLayer场景界面效果: 源码分析: GameLayer场景初始化,主要是初始化加载界面及背景音乐 bool GameLayer::init() { float dt=0.0f; if ( !CCLayerColor::initWithColor(ccc4(255, 255, 255, 255))) { return false; } this->initLoa

[Android]Fragment源码分析(二) 状态

我们上一讲,抛出来一个问题,就是当Activity的onCreateView的时候,是如何构造Fragment中的View参数.要回答这个问题我们先要了解Fragment的状态,这是Fragment管理中非常重要的一环.我们先来看一下FragmentActivity提供的一些核心回调: @Override protected void onCreate(Bundle savedInstanceState) { mFragments.attachActivity(this, mContainer,

JAVA Collection 源码分析(二)之SubList

昨天我们分析了ArrayList的源码,我们可以看到,在其中还有一个类,名为SubList,其继承了AbstractList. // AbstractList类型的引用,所有继承了AbstractList都可以传进来 private final AbstractList<E> parent; // 这个是其实就是parent的偏移量,从parent中的第几个元素开始的 private final int parentOffset; private final int offset; int s