首先看一下Thrift的整体架构,如下图:
如图所示,黄色部分是用户实现的业务逻辑,褐色部分是根据thrift定义的服务接口描述文件生成的客户端和服务器端代码框架(前篇2中已分析了thrift service生成代码),红色部分是根据Thrift文件生成代码实现数据的读写操作。红色部分以下是Thrift的协议,传输体系以及底层的IO通信,使用thrift可以很方便的定义一个服务并且选择不同的传输协议和传输层而不用重新生成代码(Thrift提供的是一种大而全的服务,它认为没有统一的标准,用户根据自己需要,组合使用之;而avro排斥多方案引起的混乱,提倡建立统一的标准,后篇在来分析avro,其作为hadoop rpc框架,在大数据量传输方面有一定的优势。)。
数据类型:
Thrift可定义的数据类型包括以下几种(说句后话,为什么这些框架protobuf, thrift, avro都定义自己的数据类型???他们都作为多语言支持的,内部类型的定义,不同语言支持的数据类型统一映射到框架内部支持数据类型,方便处理,在数据读写传输过程中按统一方式处理。)
- 基本类型
- bool : 布尔,true or false,对应java的boolean
- byte : 8位有符号整数,对应java的byte
- i16 : 16位有符号整数,对应java的short
- i32 : 32位有符号整数,对应java的int
- i64 : 64位有符号整数,对应java的long
- double : 64位浮点数 ,对应java的double
- string : 未知编码文本或二进制字符串,对应java的string
- void.
- 结构体类型
- struct : 定义公共对象,类似于C预压中的结构体定义,在java中是一个javabean.
- 容器类型
- list : 对应java的arraylist
- set : 对应java的hashset
- map: 对应java的HashMap
- enum :
- 异常类型 (在java中,TException为基类)
- 服务类型 (在Java中,统一为Iface, AsyncIface接口)
- Stop (占位符,一个byte,标志读写完毕,或者空内容,e.g.空消息)
协议:
Thrift可以让用户选择客户端和服务器端之间的传输通信协议的类别(用户不同的需求,不同应用可以根据自己需求选择适合自己的传输协议),一般情况下使用二进制类型的传输协议(提高传输效率,多数用于内部系统之间的通信传输),还可以使用基于文本类型的协议(json),json,xml作为通用网络数据传输协议,可以实现外部系统调用。
- TBinaryProtocol- 二进制编码格式进行数据传输(默认)
- TCompactProtocol- 高效率,密集的二进制编码格式进行数据传输(了解protocol buffer内部编码实现的话,就不足为奇了)
- TJSONProtocol - 使用JSON的数据编码协议进行数据传输。
- TSimpleJSONProtocol- 只提供JSON只写的协议,使用与通过脚本语言解析
其中TProtocolDecorator,装饰者,抽象类,其中典型实现TMultiplexedProtocol,允许客户端连接多功能server.
TBinaryProtocol:
该协议作为thrift默认的二进制协议,通过它,所有数据都是以二进制形式读写,没有什么特殊处理,除了tag外,基本都是数据本身的二进制,不过值得了解的是Thrift的读写message的过程(tag的运用);
这里列出协议抽象基类TProtocol的一部分方法,可以看出各种tagBeging(),tagEnd()方法,read方法一样。
还是以上篇Thrift 代码生成分析篇解析(Hello.thrift)开始,看一下客户端调用service方法开始引入TBinaryProtocol,没看过的朋友可以先了解一下。方法如下:
string helloString(1:string para)
进Thrift为我们生成的Hello类里面看看吧。
public String helloString(String para) throws org.apache.thrift.TException { send_helloString(para); return recv_helloString(); } public void send_helloString(String para) throws org.apache.thrift.TException { helloString_args args = new helloString_args(); args.setPara(para); sendBase("helloString", args); }
helloString_args上篇分析过,直接进其父类TServiceClient中看下sendBase():
protected void sendBase(String methodName, TBase args) throws TException { oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_)); args.write(oprot_); oprot_.writeMessageEnd(); oprot_.getTransport().flush(); }
协议层先写入messageBeginTag,然后写message(对应其方法参数的封装类) ,最后messageEndTag,传输层flush。再来看下message的结构吧。
public final class TMessage { public TMessage() { this("", TType.STOP, 0);//占位符,1byte,没实际内容 } public TMessage(String n, byte t, int s) { name = n; //方法名 type = t; //消息类型 seqid = s; //消息 seq number } public final String name; public final byte type; public final int seqid; @Override public String toString() { return "<TMessage name:‘" + name + "‘ type: " + type + " seqid:" + seqid + ">"; }
Tmessage三个成员,RPC调用方法名,消息类型,消息递增序列化。接着看下消息类型:
public final class TMessageType { public static final byte CALL = 1; public static final byte REPLY = 2; public static final byte EXCEPTION = 3; public static final byte ONEWAY = 4; }
四种消息类型,RPC request(客户端请求),RPC正常repsonse(服务器响应),RPC exception(服务器端返回异常),单向RPC(客户端发出request,但不要求服务器端给出响应).
<****************************************************************************************************************************************>
进入TBinaryProtocol中的writeMessageBegin()瞧瞧:
1 public void writeMessageBegin(TMessage message) throws TException { 2 if (strictWrite_) { //是否严格写 3 int version = VERSION_1 | message.type; //版本号,消息类型。 4 writeI32(version); 5 writeString(message.name); //消息name属性,即方法名。 6 writeI32(message.seqid);//序列号 7 } else { //非严格写,无版本号和消息类型 8 writeString(message.name); 9 writeByte(message.type); 10 writeI32(message.seqid); 11 } 12 }
版本号如下:
protected static final int VERSION_MASK = 0xffff0000; protected static final int VERSION_1 = 0x80010000;
继续writeI32():
1 private byte[] i32out = new byte[4]; 2 public void writeI32(int i32) throws TException { 3 i32out[0] = (byte)(0xff & (i32 >> 24)); 4 i32out[1] = (byte)(0xff & (i32 >> 16)); 5 i32out[2] = (byte)(0xff & (i32 >> 8)); 6 i32out[3] = (byte)(0xff & (i32)); 7 trans_.write(i32out, 0, 4); 8 }
大端写入int的字节数组。writeString():
public void writeString(String str) throws TException { try { byte[] dat = str.getBytes("UTF-8"); writeI32(dat.length); trans_.write(dat, 0, dat.length); } catch (UnsupportedEncodingException uex) { throw new TException("JVM DOES NOT SUPPORT UTF-8"); } }
string进UTF-8后获得其字节数组,写入数组长度,在写string bytes,(string写,统一utf-8编码后,先写其字节数组长度,再写实际内容)。再来看一下写实际消息:
args.write(oprot_); // TServiceClient中的write,调用生成hellostring_args的write.下面是其实现。 public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { schemes.get(oprot.getScheme()).getScheme().write(oprot, this); //调用schema对应的的write,上篇有分析 }
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { schemes.put(StandardScheme.class, new helloString_argsStandardSchemeFactory()); schemes.put(TupleScheme.class, new helloString_argsTupleSchemeFactory()); }
private static class helloString_argsStandardScheme extends StandardScheme<helloString_args> { public void write(org.apache.thrift.protocol.TProtocol oprot, helloString_args struct) throws org.apache.thrift.TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); //先写structbeginTag. if (struct.para != null) { //struct中参数不为null oprot.writeFieldBegin(PARA_FIELD_DESC); //写fieldbegingTag oprot.writeString(struct.para); //写参数 oprot.writeFieldEnd(); //写fieldEndTag } oprot.writeFieldStop();//写fieldStopTag(猜测应该是当远程RPC调用方法中有多个参数时,用于标记所有参数写完标志Tag,fieldEndtag只代表每个参数写完,因为本例就一个参数不好验证,朋友确定的话,不吝赐教) oprot.writeStructEnd(); //写structEndTag(方法参数在Thrift中被视为struct结构,即java中javabean,其中成员为具体方法参数值。方法返回值也一样。) } }
上面注解可以了解大概步骤:
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("helloString_args"); private static final org.apache.thrift.protocol.TField PARA_FIELD_DESC = new org.apache.thrift.protocol.TField("para", org.apache.thrift.protocol.TType.STRING, (short)1);
TStruct结构再来瞧瞧:
public final class TStruct { public TStruct() { this(""); } public TStruct(String n) { name = n; } public final String name; }
简单的string属性,struct名,即自动生成的代码类名。再瞅瞅TField吧:
public class TField { public TField() { this("", TType.STOP, (short)0);//空成员,没赋值的情况。 } public TField(String n, byte t, short i) { name = n; //RPC方法调用参数名 type = t; //参数类型 id = i;//thrift文件定义的参数顺序 } public final String name; public final byte type; public final short id; public String toString() { return "<TField name:‘" + name + "‘ type:" + type + " field-id:" + id + ">"; }
public final class TType { //thrift内部数据类型 public static final byte STOP = 0; public static final byte VOID = 1; public static final byte BOOL = 2; public static final byte BYTE = 3; public static final byte DOUBLE = 4; public static final byte I16 = 6; public static final byte I32 = 8; public static final byte I64 = 10; public static final byte STRING = 11; public static final byte STRUCT = 12; public static final byte MAP = 13; public static final byte SET = 14; public static final byte LIST = 15; public static final byte ENUM = 16; }
OK,继续跳回TBinaryProtocol中,跳来跳去的,大家有点累了吧,坚持就是胜利^!^
public void writeMessageEnd() {} public void writeStructBegin(TStruct struct) {} public void writeStructEnd() {}
fuck,这三个为空操作,蛋蛋伤。
public void writeFieldBegin(TField field) throws TException { writeByte(field.type); writeI16(field.id); } public void writeFieldEnd() {}
fieldBeginTag中,先写入参数类型,参数序列号。至此消息写完毕(读操作就不讲了,反操作,差不多,不过异步操作,准备后面单独开一篇来讲下),我们再来看看TBinaryProtocol中其他方法:
boolean:一个字节1或0.
public void writeBool(boolean b) throws TException { writeByte(b ? (byte)1 : (byte)0); //一个字节 }
i16,i64:依旧大端。
public void writeI16(short i16) throws TException { //2字节 i16out[0] = (byte)(0xff & (i16 >> 8)); i16out[1] = (byte)(0xff & (i16)); trans_.write(i16out, 0, 2); } private byte[] i64out = new byte[8]; public void writeI64(long i64) throws TException {//8字节 i64out[0] = (byte)(0xff & (i64 >> 56)); i64out[1] = (byte)(0xff & (i64 >> 48)); i64out[2] = (byte)(0xff & (i64 >> 40)); i64out[3] = (byte)(0xff & (i64 >> 32)); i64out[4] = (byte)(0xff & (i64 >> 24)); i64out[5] = (byte)(0xff & (i64 >> 16)); i64out[6] = (byte)(0xff & (i64 >> 8)); i64out[7] = (byte)(0xff & (i64)); trans_.write(i64out, 0, 8); }
double:先转化为long字节分布,然后按I64写,(没有float哦!):
public void writeDouble(double dub) throws TException { writeI64(Double.doubleToLongBits(dub)); }
Map tag:先写map key类型(1字节),然后map value类型(1字节),最后写键值对长度(4字节),扯句后话,不想avro中的map,其key type只能为string.
public void writeMapBegin(TMap map) throws TException { writeByte(map.keyType); writeByte(map.valueType); writeI32(map.size); } public void writeMapEnd() {}
List Tag: 先写list 值类型(1字节),在写list长度(4字节)。
public void writeListBegin(TList list) throws TException { writeByte(list.elemType); writeI32(list.size); } public void writeListEnd() {}
set Tag: 同上。
public void writeSetBegin(TSet set) throws TException { writeByte(set.elemType); writeI32(set.size); } public void writeSetEnd() {}
read操作就不细谈了,朋友们可以自己去看看。